use super::file::FilePool;
use crate::error::Error;
use crate::tokio_async::traits::{DataPool, Filter, S3Folder};
use crate::utils::S3Object;
use url::Url;
#[derive(Debug)]
pub enum PoolType {
UpPool,
DownPool,
}
#[derive(Debug)]
pub struct Canal {
pub up_pool: Option<Box<dyn DataPool>>,
pub upstream_object: Option<S3Object>,
pub down_pool: Option<Box<dyn DataPool>>,
pub downstream_object: Option<S3Object>,
pub(crate) default: PoolType,
pub filter: Option<Filter>,
}
impl Canal {
pub fn is_connect(&self) -> bool {
self.up_pool.is_some() && self.down_pool.is_some()
}
pub fn toward(mut self, resource_location: &str) -> Result<Self, Error> {
self.toward_pool(Box::new(FilePool::new(resource_location)?));
self.upstream_object = Some(resource_location.into());
Ok(self)
}
pub fn from(mut self, resource_location: &str) -> Result<Self, Error> {
self.from_pool(Box::new(FilePool::new(resource_location)?));
self.downstream_object = Some(resource_location.into());
Ok(self)
}
pub async fn download_file(mut self, resource_location: &str) -> Result<(), Error> {
if let Ok(r) = Url::parse(resource_location) {
self.toward_pool(Box::new(FilePool::new(&r.scheme())?)); } else {
self.toward_pool(Box::new(FilePool::new("/")?));
}
self.downstream_object = Some(resource_location.into());
match self.downstream_object.take() {
Some(S3Object { bucket, key, .. }) if key.is_none() => {
self.downstream_object = Some(S3Object {
bucket,
key: self.upstream_object.clone().unwrap().key,
..Default::default()
});
}
Some(obj) => {
self.downstream_object = Some(obj);
}
None => {
panic!("never be here")
}
}
Ok(self.pull().await?)
}
pub async fn upload_file(mut self, resource_location: &str) -> Result<(), Error> {
if let Ok(r) = Url::parse(resource_location) {
self.toward_pool(Box::new(FilePool::new(&r.scheme())?)); } else {
self.toward_pool(Box::new(FilePool::new("/")?));
}
self.downstream_object = Some(resource_location.into());
match self.downstream_object.take() {
Some(S3Object { bucket, key, .. }) if key.is_none() => {
self.downstream_object = Some(S3Object {
bucket: Some(std::env::current_dir()?.to_string_lossy()[1..].into()),
key: Some(format!("/{}", bucket.unwrap_or_default())),
..Default::default()
});
}
Some(obj) => {
self.downstream_object = Some(obj);
}
None => {
panic!("never be here")
}
}
Ok(self.push().await?)
}
pub fn from_pool(&mut self, pool: Box<dyn DataPool>) {
self.up_pool = Some(pool);
}
pub fn toward_pool(&mut self, pool: Box<dyn DataPool>) {
self.down_pool = Some(pool);
}
#[inline]
pub fn _object(mut self, object_name: &str) -> Self {
let mut o = match self.default {
PoolType::UpPool => self.upstream_object.take(),
PoolType::DownPool => self.downstream_object.take(),
}
.unwrap_or_default();
o.key = if object_name.starts_with('/') {
Some(object_name.to_string())
} else {
Some(format!("/{}", object_name))
};
match self.default {
PoolType::UpPool => self.upstream_object = Some(o),
PoolType::DownPool => self.downstream_object = Some(o),
};
self
}
#[inline]
pub fn _bucket(mut self, bucket_name: &str) -> Self {
let mut o = match self.default {
PoolType::UpPool => self.upstream_object.take(),
PoolType::DownPool => self.downstream_object.take(),
}
.unwrap_or_default();
o.bucket = Some(bucket_name.to_string());
match self.default {
PoolType::UpPool => self.upstream_object = Some(o),
PoolType::DownPool => self.downstream_object = Some(o),
};
self
}
pub fn object(self, object_name: &str) -> Self {
self._object(object_name)
}
#[cfg(not(feature = "slim"))]
pub fn key(self, key_name: &str) -> Self {
self._object(key_name)
}
pub fn bucket(self, bucket_name: &str) -> Self {
self._bucket(bucket_name)
}
#[cfg(not(feature = "slim"))]
pub fn folder(self, folder_name: &str) -> Self {
self._bucket(folder_name)
}
pub fn prefix(mut self, prefix_str: &str) -> Self {
self.filter = Some(Filter::Prefix(prefix_str.into()));
self
}
#[inline]
pub fn _toward_object(&mut self, object_name: &str) {
let mut o = self.downstream_object.take().unwrap_or_default();
o.key = if object_name.starts_with('/') {
Some(object_name.to_string())
} else {
Some(format!("/{}", object_name))
};
self.downstream_object = Some(o);
}
pub fn toward_object(&mut self, object_name: &str) {
self._toward_object(object_name)
}
#[cfg(not(feature = "slim"))]
pub fn toward_key(&mut self, object_name: &str) {
self._toward_object(object_name)
}
#[inline]
pub fn _toward_bucket(&mut self, bucket_name: &str) {
let mut o = self.downstream_object.take().unwrap_or_default();
o.bucket = Some(bucket_name.to_string());
self.downstream_object = Some(o);
}
pub fn toward_bucket(&mut self, bucket_name: &str) {
self._toward_bucket(bucket_name)
}
#[cfg(not(feature = "slim"))]
pub fn toward_folder(&mut self, folder_name: &str) {
self._toward_bucket(folder_name)
}
#[cfg(not(feature = "slim"))]
pub fn toward_path(&mut self, path: &str) {
self.downstream_object = Some(path.into());
}
#[inline]
pub fn _from_object(&mut self, object_name: &str) {
let mut o = self.upstream_object.take().unwrap_or_default();
o.key = if object_name.starts_with('/') {
Some(object_name.to_string())
} else {
Some(format!("/{}", object_name))
};
self.upstream_object = Some(o);
}
pub fn from_object(&mut self, object_name: &str) {
self._from_object(object_name)
}
#[cfg(not(feature = "slim"))]
pub fn from_key(&mut self, object_name: &str) {
self._from_object(object_name)
}
#[inline]
pub fn _from_bucket(&mut self, bucket_name: &str) {
let mut o = self.upstream_object.take().unwrap_or_default();
o.bucket = Some(bucket_name.to_string());
self.upstream_object = Some(o);
}
pub fn from_bucket(&mut self, bucket_name: &str) {
self._from_bucket(bucket_name)
}
#[cfg(not(feature = "slim"))]
pub fn from_folder(&mut self, folder_name: &str) {
self._from_bucket(folder_name)
}
#[cfg(not(feature = "slim"))]
pub fn from_path(&mut self, path: &str) {
self.upstream_object = Some(path.into());
}
pub async fn push(self) -> Result<(), Error> {
match (self.up_pool, self.down_pool) {
(Some(up_pool), Some(down_pool)) => {
if let Some(downstream_object) = self.downstream_object {
let b = down_pool.pull(downstream_object.clone()).await?;
up_pool
.push(self.upstream_object.unwrap_or(downstream_object), b)
.await?;
Ok(())
} else {
Err(Error::NoObject())
}
}
_ => Err(Error::PoolUninitializeError()),
}
}
pub async fn push_obj(&self, obj: S3Object) -> Result<(), Error> {
match (&self.up_pool, &self.down_pool) {
(Some(up_pool), Some(down_pool)) => {
let b = down_pool.pull(obj.clone()).await?;
up_pool.push(obj, b).await?;
Ok(())
}
_ => Err(Error::PoolUninitializeError()),
}
}
pub async fn pull(self) -> Result<(), Error> {
match (self.up_pool, self.down_pool) {
(Some(up_pool), Some(down_pool)) => {
if let Some(upstream_object) = self.upstream_object {
let b = up_pool.pull(upstream_object.clone()).await?;
down_pool
.push(self.downstream_object.unwrap_or(upstream_object), b)
.await?;
Ok(())
} else {
Err(Error::NoObject())
}
}
_ => Err(Error::PoolUninitializeError()),
}
}
pub async fn pull_obj(&self, obj: S3Object) -> Result<(), Error> {
match (&self.up_pool, &self.down_pool) {
(Some(up_pool), Some(down_pool)) => {
let b = up_pool.pull(obj.clone()).await?;
down_pool.push(obj, b).await?;
Ok(())
}
_ => Err(Error::PoolUninitializeError()),
}
}
pub async fn upstream_remove(self) -> Result<(), Error> {
if let Some(upstream_object) = self.upstream_object {
Ok(self
.up_pool
.expect("upstream pool should exist") .remove(upstream_object)
.await?)
} else {
Err(Error::ResourceUrlError(
"can not remove on an object withouput setup".to_string(),
))
}
}
pub async fn downstream_remove(self) -> Result<(), Error> {
if let Some(downstream_object) = self.downstream_object {
Ok(self
.down_pool
.expect("downstream pool should exist") .remove(downstream_object)
.await?)
} else {
Err(Error::ResourceUrlError(
"can not remove on an object withouput setup".to_string(),
))
}
}
pub async fn remove(self) -> Result<(), Error> {
match self.default {
PoolType::UpPool => self.upstream_remove().await,
PoolType::DownPool => self.downstream_remove().await,
}
}
pub async fn upstream_list(self) -> Result<Box<dyn S3Folder>, Error> {
Ok(self
.up_pool
.expect("upstream pool should exist")
.list(self.upstream_object, &self.filter)
.await?)
}
pub async fn downstream_list(self) -> Result<Box<dyn S3Folder>, Error> {
Ok(self
.down_pool
.expect("downstream pool should exist")
.list(self.downstream_object, &self.filter)
.await?)
}
pub async fn list(self) -> Result<Box<dyn S3Folder>, Error> {
match self.default {
PoolType::UpPool => self.upstream_list().await,
PoolType::DownPool => self.downstream_list().await,
}
}
}