use futures::io::AsyncRead;
use futures::stream::Stream;
use osauth::client::NO_PATH;
use osauth::services::OBJECT_STORAGE;
use reqwest::{Method, StatusCode};
use super::super::session::Session;
use super::super::utils::Query;
use super::super::Result;
use super::objects::ObjectHeaders;
use super::protocol::*;
use super::utils::{async_read_to_body, body_to_async_read};
pub async fn create_container<C>(session: &Session, container: C) -> Result<bool>
where
C: AsRef<str>,
{
let c_id = container.as_ref();
debug!("Creating container {}", c_id);
let result = session.put(OBJECT_STORAGE, &[c_id]).send().await?;
if result.status() == StatusCode::CREATED {
debug!("Successfully created container {}", c_id);
Ok(true)
} else {
debug!("Container {} already exists", c_id);
Ok(false)
}
}
pub async fn create_object<C, O, R>(
session: &Session,
container: C,
object: O,
body: R,
headers: ObjectHeaders,
) -> Result<Object>
where
C: AsRef<str>,
O: AsRef<str>,
R: AsyncRead + Send + Sync + 'static,
{
let c_id = container.as_ref();
let o_id = object.as_ref();
debug!("Creating object {} in container {}", o_id, c_id);
let mut req = session.put(OBJECT_STORAGE, &[c_id, o_id]);
if let Some(delete_after) = headers.delete_after {
req = req.header("X-Delete-After", delete_after);
}
if let Some(delete_at) = headers.delete_at {
req = req.header("X-Delete-At", delete_at);
}
for (key, value) in headers.metadata {
req = req.header(&format!("X-Object-Meta-{key}"), value);
}
let _ = req.body(async_read_to_body(body)).send().await?;
debug!("Successfully created object {} in container {}", o_id, c_id);
get_object(session, c_id, o_id).await
}
pub async fn delete_container<C>(session: &Session, container: C) -> Result<()>
where
C: AsRef<str>,
{
let c_id = container.as_ref();
debug!("Deleting container {}", c_id);
let _ = session.delete(OBJECT_STORAGE, &[c_id]).send().await?;
debug!("Successfully deleted container {}", c_id);
Ok(())
}
pub async fn delete_object<C, O>(session: &Session, container: C, object: O) -> Result<()>
where
C: AsRef<str>,
O: AsRef<str>,
{
let c_id = container.as_ref();
let o_id = object.as_ref();
debug!("Deleting object {} in container {}", o_id, c_id);
let _ = session.delete(OBJECT_STORAGE, &[c_id, o_id]).send().await?;
debug!("Successfully deleted object {} in container {}", o_id, c_id);
Ok(())
}
pub async fn get_container<C>(session: &Session, container: C) -> Result<Container>
where
C: AsRef<str>,
{
let c_id = container.as_ref();
trace!("Requesting container {}", c_id);
let resp = session
.request(OBJECT_STORAGE, Method::HEAD, &[c_id])
.send()
.await?;
let result = Container::from_headers(c_id, resp.headers())?;
trace!("Received {:?}", result);
Ok(result)
}
pub async fn get_object<C, O>(session: &Session, container: C, object: O) -> Result<Object>
where
C: AsRef<str>,
O: AsRef<str>,
{
let c_id = container.as_ref();
let o_id = object.as_ref();
trace!("Requesting object {} from container {}", o_id, c_id);
let resp = session
.request(OBJECT_STORAGE, Method::HEAD, &[c_id, o_id])
.send()
.await?;
let result = Object::from_headers(o_id, resp.headers())?;
trace!("Received {:?}", result);
Ok(result)
}
pub async fn download_object<C, O>(
session: &Session,
container: C,
object: O,
) -> Result<impl AsyncRead + Send + 'static>
where
C: AsRef<str>,
O: AsRef<str>,
{
let c_id = container.as_ref();
let o_id = object.as_ref();
trace!("Downloading object {} from container {}", o_id, c_id);
let resp = session.get(OBJECT_STORAGE, &[c_id, o_id]).send().await?;
Ok(body_to_async_read(resp))
}
pub async fn list_containers(
session: &Session,
mut query: Query,
limit: Option<usize>,
marker: Option<String>,
) -> Result<impl Stream<Item = Result<Container>>> {
query.push_str("format", "json");
trace!("Listing containers with {:?}", query);
Ok(session
.get(OBJECT_STORAGE, NO_PATH)
.query(&query)
.fetch_paginated(limit, marker)
.await)
}
pub async fn list_objects<C>(
session: &Session,
container: C,
mut query: Query,
limit: Option<usize>,
marker: Option<String>,
) -> Result<impl Stream<Item = Result<Object>>>
where
C: AsRef<str> + 'static,
{
query.push_str("format", "json");
let id = container.as_ref();
trace!("Listing objects in container {} with {:?}", id, query);
Ok(session
.get(OBJECT_STORAGE, &[id])
.query(&query)
.fetch_paginated(limit, marker)
.await)
}