use crate::{create, Config, Result};
use dyn_clone::DynClone;
use std::{any::Any, ops::Range};
use tokio::io::DuplexStream;
use tokio::sync::{mpsc, oneshot};
use url::Url;
mod s3;
#[async_trait::async_trait]
pub(crate) trait ObjectStorage: DynClone + std::fmt::Debug + Sync + Send + 'static {
async fn parse_url(
&self,
url: &Url,
) -> Result<(Box<dyn Bucket>, Option<String>, Option<String>)>;
async fn extract_bucket_from_url(&self, url: &Url) -> Result<Box<dyn Bucket>>;
}
dyn_clone::clone_trait_object!(ObjectStorage);
#[async_trait::async_trait]
pub(crate) trait Bucket: DynClone + std::fmt::Debug + Sync + Send + 'static {
#[doc(hidden)]
fn as_any(&self) -> &(dyn Any + Sync + Send);
fn objstore(&self) -> Box<dyn ObjectStorage>;
fn name(&self) -> &str;
async fn get_object_size(&self, key: String, version_id: Option<String>) -> Result<u64>;
async fn list_matching_objects(
&self,
selector: create::ObjectSelector,
) -> Result<Vec<create::InputObject>>;
async fn read_object_part(
&self,
key: String,
version_id: Option<String>,
byte_range: Range<u64>,
) -> Result<bytes::Bytes>;
async fn read_object(
&self,
key: String,
version_id: Option<String>,
byte_range: Range<u64>,
) -> Result<mpsc::Receiver<Result<bytes::Bytes>>>;
fn partition_for_multipart_upload(
&self,
key: &str,
size: u64,
) -> Result<Option<Vec<Range<u64>>>>;
fn start_multipart_upload(
&self,
key: String,
parts: Vec<Range<u64>>,
) -> Box<dyn MultipartUploader>;
async fn put_small_object(&self, key: String, data: bytes::Bytes) -> Result<()>;
async fn create_object_writer(
&self,
key: String,
size_hint: Option<u64>,
) -> Result<(
DuplexStream,
mpsc::UnboundedReceiver<usize>,
oneshot::Receiver<Result<u64>>,
)>;
}
dyn_clone::clone_trait_object!(Bucket);
#[async_trait::async_trait]
pub(crate) trait MultipartUploader: DynClone + Sync + Send + 'static {
async fn init(&self) -> Result<()>;
fn parts(&self) -> &[Range<u64>];
async fn upload_part(&self, range: Range<u64>, bytes: bytes::Bytes) -> Result<()>;
async fn finish(&self) -> Result<()>;
}
dyn_clone::clone_trait_object!(MultipartUploader);
#[derive(Debug)]
pub(crate) struct ObjectStorageFactory;
impl ObjectStorageFactory {
#[allow(clippy::wrong_self_convention)] pub async fn from_url(config: Config, url: &Url) -> Result<Box<dyn ObjectStorage>> {
if url.scheme() == "s3" {
Self::s3(config).await
} else {
crate::error::UnsupportedObjectStorageSnafu { url: url.clone() }.fail()
}
}
pub async fn s3(config: Config) -> Result<Box<dyn ObjectStorage>> {
Ok(Box::new(s3::S3::new(config.clone()).await?))
}
}