use async_trait::async_trait;
use futures::{pin_mut, Stream, TryStreamExt};
use super::super::common::{ContainerRef, Refresh};
use super::super::session::Session;
use super::super::utils::{try_one, Query};
use super::super::{ErrorKind, Result};
use super::objects::{Object, ObjectQuery};
use super::{api, protocol};
#[derive(Clone, Debug)]
pub struct ContainerQuery {
session: Session,
query: Query,
limit: Option<usize>,
marker: Option<String>,
}
#[derive(Clone, Debug)]
pub struct Container {
session: Session,
inner: protocol::Container,
}
impl Container {
pub(crate) fn new(session: Session, inner: protocol::Container) -> Container {
Container { session, inner }
}
pub(crate) async fn create<Id: AsRef<str>>(session: Session, name: Id) -> Result<Container> {
let c_id = name.as_ref();
let _ = api::create_container(&session, c_id).await?;
let inner = api::get_container(&session, c_id).await?;
Ok(Container::new(session, inner))
}
pub(crate) async fn load<Id: AsRef<str>>(session: Session, name: Id) -> Result<Container> {
let inner = api::get_container(&session, name).await?;
Ok(Container::new(session, inner))
}
pub async fn delete(self, delete_objects: bool) -> Result<()> {
if delete_objects {
let iter = self.find_objects().into_stream().await?;
pin_mut!(iter);
while let Some(obj) = iter.try_next().await? {
obj.delete().await.or_else(|err| {
if err.kind() == ErrorKind::ResourceNotFound {
Ok(())
} else {
Err(err)
}
})?;
}
}
api::delete_container(&self.session, self.inner.name).await
}
#[inline]
pub fn find_objects(&self) -> ObjectQuery {
ObjectQuery::new(self.session.clone(), self.inner.name.clone())
}
#[inline]
pub async fn list_objects(&self) -> Result<Vec<Object>> {
self.find_objects().all().await
}
transparent_property! {
#[doc = "Total size of the container."]
bytes: u64
}
transparent_property! {
#[doc = "Container name."]
name: ref String
}
transparent_property! {
#[doc = "Number of objects in the container."]
object_count: u64
}
}
#[async_trait]
impl Refresh for Container {
async fn refresh(&mut self) -> Result<()> {
self.inner = api::get_container(&self.session, &self.inner.name).await?;
Ok(())
}
}
impl ContainerQuery {
pub(crate) fn new(session: Session) -> ContainerQuery {
ContainerQuery {
session,
query: Query::new(),
limit: None,
marker: None,
}
}
pub fn with_marker<T: Into<String>>(mut self, marker: T) -> Self {
self.marker = Some(marker.into());
self
}
pub fn with_limit(mut self, limit: usize) -> Self {
self.limit = Some(limit);
self
}
query_filter! {
#[doc = "Filter by prefix."]
with_prefix -> prefix
}
pub async fn into_stream(self) -> Result<impl Stream<Item = Result<Container>>> {
debug!("Fetching containers with {:?}", self.query);
Ok(
api::list_containers(&self.session, self.query, self.limit, self.marker)
.await?
.map_ok({
let session = self.session;
move |c| Container::new(session.clone(), c)
}),
)
}
pub async fn all(self) -> Result<Vec<Container>> {
self.into_stream().await?.try_collect().await
}
pub async fn one(mut self) -> Result<Container> {
debug!("Fetching one container with {:?}", self.query);
self.limit = Some(2);
try_one(self.into_stream().await?).await
}
}
impl From<Container> for ContainerRef {
fn from(value: Container) -> ContainerRef {
ContainerRef::new_verified(value.inner.name)
}
}
#[cfg(feature = "object-storage")]
impl ContainerRef {
#[allow(unused)]
pub(crate) async fn into_verified(self, _session: &Session) -> Result<Self> {
Ok(self)
}
}