s3handler/tokio_async/primitives/
canal.rs1use super::file::FilePool;
2use crate::error::Error;
3use crate::tokio_async::traits::{DataPool, Filter, S3Folder};
4use crate::utils::S3Object;
5use url::Url;
6
7#[derive(Debug)]
8pub enum PoolType {
9 UpPool,
10 DownPool,
11}
12
13#[derive(Debug)]
14pub struct Canal {
15 pub up_pool: Option<Box<dyn DataPool>>,
16 pub upstream_object: Option<S3Object>,
17 pub down_pool: Option<Box<dyn DataPool>>,
18 pub downstream_object: Option<S3Object>,
19 pub(crate) default: PoolType,
20 pub filter: Option<Filter>,
21 }
31
32impl Canal {
42 pub fn is_connect(&self) -> bool {
44 self.up_pool.is_some() && self.down_pool.is_some()
45 }
46
47 pub fn toward(mut self, resource_location: &str) -> Result<Self, Error> {
49 self.toward_pool(Box::new(FilePool::new(resource_location)?));
50 self.upstream_object = Some(resource_location.into());
51 Ok(self)
52 }
53
54 pub fn from(mut self, resource_location: &str) -> Result<Self, Error> {
56 self.from_pool(Box::new(FilePool::new(resource_location)?));
57 self.downstream_object = Some(resource_location.into());
58 Ok(self)
59 }
60
61 pub async fn download_file(mut self, resource_location: &str) -> Result<(), Error> {
66 if let Ok(r) = Url::parse(resource_location) {
67 self.toward_pool(Box::new(FilePool::new(&r.scheme())?)); } else {
69 self.toward_pool(Box::new(FilePool::new("/")?));
70 }
71 self.downstream_object = Some(resource_location.into());
72 match self.downstream_object.take() {
73 Some(S3Object { bucket, key, .. }) if key.is_none() => {
74 self.downstream_object = Some(S3Object {
75 bucket,
76 key: self.upstream_object.clone().unwrap().key,
77 ..Default::default()
78 });
79 }
80 Some(obj) => {
81 self.downstream_object = Some(obj);
82 }
83 None => {
84 panic!("never be here")
85 }
86 }
87 Ok(self.pull().await?)
88 }
89
90 pub async fn upload_file(mut self, resource_location: &str) -> Result<(), Error> {
95 if let Ok(r) = Url::parse(resource_location) {
96 self.toward_pool(Box::new(FilePool::new(&r.scheme())?)); } else {
98 self.toward_pool(Box::new(FilePool::new("/")?));
99 }
100 self.downstream_object = Some(resource_location.into());
101 match self.downstream_object.take() {
102 Some(S3Object { bucket, key, .. }) if key.is_none() => {
103 self.downstream_object = Some(S3Object {
104 bucket: Some(std::env::current_dir()?.to_string_lossy()[1..].into()),
105 key: Some(format!("/{}", bucket.unwrap_or_default())),
106 ..Default::default()
107 });
108 }
109 Some(obj) => {
110 self.downstream_object = Some(obj);
111 }
112 None => {
113 panic!("never be here")
114 }
115 }
116 Ok(self.push().await?)
117 }
118 pub fn from_pool(&mut self, pool: Box<dyn DataPool>) {
123 self.up_pool = Some(pool);
124 }
125
126 pub fn toward_pool(&mut self, pool: Box<dyn DataPool>) {
128 self.down_pool = Some(pool);
129 }
130
131 #[inline]
132 pub fn _object(mut self, object_name: &str) -> Self {
133 let mut o = match self.default {
134 PoolType::UpPool => self.upstream_object.take(),
135 PoolType::DownPool => self.downstream_object.take(),
136 }
137 .unwrap_or_default();
138 o.key = if object_name.starts_with('/') {
139 Some(object_name.to_string())
140 } else {
141 Some(format!("/{}", object_name))
142 };
143 match self.default {
144 PoolType::UpPool => self.upstream_object = Some(o),
145 PoolType::DownPool => self.downstream_object = Some(o),
146 };
147 self
148 }
149
150 #[inline]
151 pub fn _bucket(mut self, bucket_name: &str) -> Self {
152 let mut o = match self.default {
153 PoolType::UpPool => self.upstream_object.take(),
154 PoolType::DownPool => self.downstream_object.take(),
155 }
156 .unwrap_or_default();
157 o.bucket = Some(bucket_name.to_string());
158 match self.default {
159 PoolType::UpPool => self.upstream_object = Some(o),
160 PoolType::DownPool => self.downstream_object = Some(o),
161 };
162 self
163 }
164
165 pub fn object(self, object_name: &str) -> Self {
169 self._object(object_name)
170 }
171
172 #[cfg(not(feature = "slim"))]
174 pub fn key(self, key_name: &str) -> Self {
175 self._object(key_name)
176 }
177
178 pub fn bucket(self, bucket_name: &str) -> Self {
182 self._bucket(bucket_name)
183 }
184
185 #[cfg(not(feature = "slim"))]
186 pub fn folder(self, folder_name: &str) -> Self {
188 self._bucket(folder_name)
189 }
190
191 pub fn prefix(mut self, prefix_str: &str) -> Self {
192 self.filter = Some(Filter::Prefix(prefix_str.into()));
193 self
194 }
195
196 #[inline]
197 pub fn _toward_object(&mut self, object_name: &str) {
198 let mut o = self.downstream_object.take().unwrap_or_default();
199 o.key = if object_name.starts_with('/') {
200 Some(object_name.to_string())
201 } else {
202 Some(format!("/{}", object_name))
203 };
204 self.downstream_object = Some(o);
205 }
206
207 pub fn toward_object(&mut self, object_name: &str) {
209 self._toward_object(object_name)
210 }
211
212 #[cfg(not(feature = "slim"))]
214 pub fn toward_key(&mut self, object_name: &str) {
215 self._toward_object(object_name)
216 }
217
218 #[inline]
219 pub fn _toward_bucket(&mut self, bucket_name: &str) {
220 let mut o = self.downstream_object.take().unwrap_or_default();
221 o.bucket = Some(bucket_name.to_string());
222 self.downstream_object = Some(o);
223 }
224
225 pub fn toward_bucket(&mut self, bucket_name: &str) {
227 self._toward_bucket(bucket_name)
228 }
229
230 #[cfg(not(feature = "slim"))]
232 pub fn toward_folder(&mut self, folder_name: &str) {
233 self._toward_bucket(folder_name)
234 }
235
236 #[cfg(not(feature = "slim"))]
238 pub fn toward_path(&mut self, path: &str) {
239 self.downstream_object = Some(path.into());
240 }
241
242 #[inline]
243 pub fn _from_object(&mut self, object_name: &str) {
244 let mut o = self.upstream_object.take().unwrap_or_default();
245 o.key = if object_name.starts_with('/') {
246 Some(object_name.to_string())
247 } else {
248 Some(format!("/{}", object_name))
249 };
250 self.upstream_object = Some(o);
251 }
252
253 pub fn from_object(&mut self, object_name: &str) {
255 self._from_object(object_name)
256 }
257
258 #[cfg(not(feature = "slim"))]
260 pub fn from_key(&mut self, object_name: &str) {
261 self._from_object(object_name)
262 }
263
264 #[inline]
265 pub fn _from_bucket(&mut self, bucket_name: &str) {
266 let mut o = self.upstream_object.take().unwrap_or_default();
267 o.bucket = Some(bucket_name.to_string());
268 self.upstream_object = Some(o);
269 }
270
271 pub fn from_bucket(&mut self, bucket_name: &str) {
273 self._from_bucket(bucket_name)
274 }
275
276 #[cfg(not(feature = "slim"))]
278 pub fn from_folder(&mut self, folder_name: &str) {
279 self._from_bucket(folder_name)
280 }
281
282 #[cfg(not(feature = "slim"))]
284 pub fn from_path(&mut self, path: &str) {
285 self.upstream_object = Some(path.into());
286 }
287 pub async fn push(self) -> Result<(), Error> {
292 match (self.up_pool, self.down_pool) {
293 (Some(up_pool), Some(down_pool)) => {
294 if let Some(downstream_object) = self.downstream_object {
295 let b = down_pool.pull(downstream_object.clone()).await?;
296 up_pool
297 .push(self.upstream_object.unwrap_or(downstream_object), b)
298 .await?;
299 Ok(())
300 } else {
301 Err(Error::NoObject())
302 }
303 }
304 _ => Err(Error::PoolUninitializeError()),
305 }
306 }
307
308 pub async fn push_obj(&self, obj: S3Object) -> Result<(), Error> {
310 match (&self.up_pool, &self.down_pool) {
311 (Some(up_pool), Some(down_pool)) => {
312 let b = down_pool.pull(obj.clone()).await?;
313 up_pool.push(obj, b).await?;
314 Ok(())
315 }
316 _ => Err(Error::PoolUninitializeError()),
317 }
318 }
319
320 pub async fn pull(self) -> Result<(), Error> {
322 match (self.up_pool, self.down_pool) {
323 (Some(up_pool), Some(down_pool)) => {
324 if let Some(upstream_object) = self.upstream_object {
325 let b = up_pool.pull(upstream_object.clone()).await?;
326 down_pool
327 .push(self.downstream_object.unwrap_or(upstream_object), b)
328 .await?;
329 Ok(())
330 } else {
331 Err(Error::NoObject())
332 }
333 }
334 _ => Err(Error::PoolUninitializeError()),
335 }
336 }
337
338 pub async fn pull_obj(&self, obj: S3Object) -> Result<(), Error> {
340 match (&self.up_pool, &self.down_pool) {
341 (Some(up_pool), Some(down_pool)) => {
342 let b = up_pool.pull(obj.clone()).await?;
343 down_pool.push(obj, b).await?;
344 Ok(())
345 }
346 _ => Err(Error::PoolUninitializeError()),
347 }
348 }
349
350 pub async fn upstream_remove(self) -> Result<(), Error> {
352 if let Some(upstream_object) = self.upstream_object {
353 Ok(self
354 .up_pool
355 .expect("upstream pool should exist") .remove(upstream_object)
357 .await?)
358 } else {
359 Err(Error::ResourceUrlError(
360 "can not remove on an object withouput setup".to_string(),
361 ))
362 }
363 }
364
365 pub async fn downstream_remove(self) -> Result<(), Error> {
367 if let Some(downstream_object) = self.downstream_object {
368 Ok(self
369 .down_pool
370 .expect("downstream pool should exist") .remove(downstream_object)
372 .await?)
373 } else {
374 Err(Error::ResourceUrlError(
375 "can not remove on an object withouput setup".to_string(),
376 ))
377 }
378 }
379
380 pub async fn remove(self) -> Result<(), Error> {
384 match self.default {
385 PoolType::UpPool => self.upstream_remove().await,
386 PoolType::DownPool => self.downstream_remove().await,
387 }
388 }
389
390 pub async fn upstream_list(self) -> Result<Box<dyn S3Folder>, Error> {
392 Ok(self
393 .up_pool
394 .expect("upstream pool should exist")
395 .list(self.upstream_object, &self.filter)
396 .await?)
397 }
398
399 pub async fn downstream_list(self) -> Result<Box<dyn S3Folder>, Error> {
401 Ok(self
402 .down_pool
403 .expect("downstream pool should exist")
404 .list(self.downstream_object, &self.filter)
405 .await?)
406 }
407
408 pub async fn list(self) -> Result<Box<dyn S3Folder>, Error> {
412 match self.default {
413 PoolType::UpPool => self.upstream_list().await,
414 PoolType::DownPool => self.downstream_list().await,
415 }
416 }
417
418 }