use std::fmt::Debug;
use crate::infrastructure::storage::{BackendStorage, ElementInfo, FSStorage};
pub mod errors;
use axum::body::Body;
use errors::BucketStorageError;
use futures::{StreamExt, TryStreamExt};
use tokio_util::io::{ReaderStream, StreamReader};
use tracing::{error, warn};
use wasmio_aws_types::types::{
CreateBucketOutput, CreateBucketOutputBuilder, CreateBucketRequest,
DeleteObjectOutput, DeleteObjectOutputBuilder, DeleteObjectRequest,
GetObjectOutput, GetObjectRequest, ListObjectsV2Output,
ListObjectsV2Request, Object, PutObjectOutput, PutObjectOutputBuilder,
PutObjectRequest,
};
pub trait BackendDriver:
BackendStorage + Debug + Send + Sync + Clone + 'static
{
}
impl BackendDriver for FSStorage {}
#[derive(Debug, Clone)]
pub struct BucketStorage<T: BackendDriver> {
backend_storage: T,
}
impl<T> BucketStorage<T>
where
T: BackendDriver,
BucketStorageError: From<<T as BackendStorage>::Error>,
{
pub fn new(storage: T) -> Self {
Self {
backend_storage: storage,
}
}
pub async fn create_new_bucket(
&self,
CreateBucketRequest { bucket, .. }: CreateBucketRequest,
) -> Result<CreateBucketOutput, BucketStorageError> {
let db_info = self.backend_storage.new_database(&bucket).await?;
CreateBucketOutputBuilder::default()
.location(format!("/{name}", name = db_info.name()))
.build()
.map_err(|_err| BucketStorageError::Unknown)
}
pub async fn put_object(
&self,
PutObjectRequest {
bucket,
key,
body,
metadata,
..
}: PutObjectRequest,
) -> Result<PutObjectOutput, BucketStorageError> {
let body = body.ok_or(BucketStorageError::Unknown)?;
let body_err = body
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err));
let mut body_reader = StreamReader::new(body_err);
self.backend_storage
.insert_element_in_database(
&bucket,
&key,
metadata.unwrap_or_default(),
&mut body_reader,
)
.await?;
PutObjectOutputBuilder::default()
.e_tag(Some("unimplemented".to_string()))
.build()
.map_err(|_err| BucketStorageError::Unknown)
}
pub async fn delete_object(
&self,
DeleteObjectRequest { bucket, key, .. }: DeleteObjectRequest,
) -> Result<DeleteObjectOutput, BucketStorageError> {
self.backend_storage
.delete_element_in_database(&bucket, &key)
.await
.map_err(|err| {
error!("{err:?}");
BucketStorageError::Unknown
})?;
DeleteObjectOutputBuilder::default()
.build()
.map_err(|_err| BucketStorageError::Unknown)
}
pub async fn list_object_v2(
&self,
ListObjectsV2Request { bucket, .. }: ListObjectsV2Request,
) -> Result<ListObjectsV2Output, BucketStorageError> {
let mut s = self
.backend_storage
.list_element_in_database(&bucket, None)
.await
.map_err(|_| BucketStorageError::Unknown)?;
let mut contents: Vec<Object> = Vec::new();
while let Some(elt) = s.next().await {
match elt {
Ok(ElementInfo {
name,
last_modified,
size,
checksum,
..
}) => {
contents.push(Object {
key: Some(name),
size: Some(size as i64),
last_modified: Some(last_modified.to_rfc3339()),
e_tag: Some(checksum),
..Default::default()
});
}
Err(err) => {
warn!("{err:?}");
}
}
}
let result = ListObjectsV2Output {
contents: Some(contents),
..Default::default()
};
Ok(result)
}
pub async fn get_object(
&self,
GetObjectRequest { bucket, key, .. }: GetObjectRequest,
) -> Result<GetObjectOutput, BucketStorageError> {
let (mut asyncwriter, asyncreader) = tokio::io::duplex(8192);
let s = self.clone();
let b = bucket.clone();
let k = key.clone();
tokio::spawn(async move {
if let Err(err) = s
.backend_storage
.get_element_in_database(&b, &k, &mut asyncwriter)
.await
{
warn!("{err:?}");
}
});
let ElementInfo {
size,
last_modified,
checksum,
metadatas,
..
} = match self
.backend_storage
.get_element_metadata_in_database(&bucket, &key)
.await?
{
Some(elt) => elt,
None => {
return Err(BucketStorageError::NoKey);
}
};
let body = Body::from_stream(ReaderStream::new(asyncreader));
Ok(GetObjectOutput {
accept_ranges: None,
body: Some(body),
bucket_key_enabled: None,
cache_control: None,
content_disposition: None,
content_encoding: None,
content_language: None,
content_length: Some(size as i64),
content_range: None,
content_type: None,
delete_marker: None,
e_tag: Some(checksum),
expiration: None,
expires: None,
last_modified: Some(last_modified.to_rfc3339()),
metadata: Some(metadatas),
missing_meta: None,
object_lock_legal_hold_status: None,
object_lock_mode: None,
object_lock_retain_until_date: None,
parts_count: None,
replication_status: None,
request_charged: None,
restore: None,
sse_customer_algorithm: None,
sse_customer_key_md5: None,
ssekms_key_id: None,
server_side_encryption: None,
storage_class: None,
tag_count: None,
version_id: None,
website_redirect_location: None,
})
}
}