s3handler/tokio_async/primitives/
canal.rs

1use super::file::FilePool;
2use crate::error::Error;
3use crate::tokio_async::traits::{DataPool, Filter, S3Folder};
4use crate::utils::S3Object;
5use url::Url;
6
7#[derive(Debug)]
8pub enum PoolType {
9    UpPool,
10    DownPool,
11}
12
13#[derive(Debug)]
14pub struct Canal {
15    pub up_pool: Option<Box<dyn DataPool>>,
16    pub upstream_object: Option<S3Object>,
17    pub down_pool: Option<Box<dyn DataPool>>,
18    pub downstream_object: Option<S3Object>,
19    pub(crate) default: PoolType,
20    pub filter: Option<Filter>,
21    // TODO: feature: data transformer
22    // it may do encrypt, or format transformation here
23    // upstream_obj_lambda:
24    // downstream_obj_lambda:
25
26    // TODO: folder/bucket upload feature:
27    // index & key of S3Object transformer
28    // upstream_obj_desc_lambda:
29    // downstream_obj_desc_lambda:
30}
31
32/// A canal presets a object link for two object from resource pool to pool.
33/// If everything is set, the async api can pull/push the objects.
34///
35/// The `download_file`, and `upload_file` api will setup the up pool as s3 pool,
36/// and set up the down pool as file pool for the most usage case.
37///
38/// The terms `object`, `key`, `bucket`, `folder`, `path` may be easiler for readness in coding,
39/// so there are several similar methods help you to setup things.
40/// If you down want these duplicate functions, you can enable the `slim` feature.
41impl Canal {
42    /// Check the two pools are set or not
43    pub fn is_connect(&self) -> bool {
44        self.up_pool.is_some() && self.down_pool.is_some()
45    }
46
47    /// Set downd pool as file pool, and toward to the `resource_location`
48    pub fn toward(mut self, resource_location: &str) -> Result<Self, Error> {
49        self.toward_pool(Box::new(FilePool::new(resource_location)?));
50        self.upstream_object = Some(resource_location.into());
51        Ok(self)
52    }
53
54    /// Set up pool as file pool, and from to the `resource_location`
55    pub fn from(mut self, resource_location: &str) -> Result<Self, Error> {
56        self.from_pool(Box::new(FilePool::new(resource_location)?));
57        self.downstream_object = Some(resource_location.into());
58        Ok(self)
59    }
60
61    /// Download object from s3 pool to file pool
62    /// This function set file pool as down pool and s3 pool as up pool
63    /// then toward to the `resource_location`,
64    /// pull the object from uppool into down pool.
65    pub async fn download_file(mut self, resource_location: &str) -> Result<(), Error> {
66        if let Ok(r) = Url::parse(resource_location) {
67            self.toward_pool(Box::new(FilePool::new(&r.scheme())?)); // for C://
68        } else {
69            self.toward_pool(Box::new(FilePool::new("/")?));
70        }
71        self.downstream_object = Some(resource_location.into());
72        match self.downstream_object.take() {
73            Some(S3Object { bucket, key, .. }) if key.is_none() => {
74                self.downstream_object = Some(S3Object {
75                    bucket,
76                    key: self.upstream_object.clone().unwrap().key,
77                    ..Default::default()
78                });
79            }
80            Some(obj) => {
81                self.downstream_object = Some(obj);
82            }
83            None => {
84                panic!("never be here")
85            }
86        }
87        Ok(self.pull().await?)
88    }
89
90    /// Upload object from file pool to s3 pool
91    /// This function set file pool as down pool and s3 pool as up pool
92    /// then toward to the `resource_location`,
93    /// push the object from uppool into down pool.
94    pub async fn upload_file(mut self, resource_location: &str) -> Result<(), Error> {
95        if let Ok(r) = Url::parse(resource_location) {
96            self.toward_pool(Box::new(FilePool::new(&r.scheme())?)); // for C://
97        } else {
98            self.toward_pool(Box::new(FilePool::new("/")?));
99        }
100        self.downstream_object = Some(resource_location.into());
101        match self.downstream_object.take() {
102            Some(S3Object { bucket, key, .. }) if key.is_none() => {
103                self.downstream_object = Some(S3Object {
104                    bucket: Some(std::env::current_dir()?.to_string_lossy()[1..].into()),
105                    key: Some(format!("/{}", bucket.unwrap_or_default())),
106                    ..Default::default()
107                });
108            }
109            Some(obj) => {
110                self.downstream_object = Some(obj);
111            }
112            None => {
113                panic!("never be here")
114            }
115        }
116        Ok(self.push().await?)
117    }
118    // End of short cut api to file pool
119
120    // Begin of setting api
121    /// Setup the up pool
122    pub fn from_pool(&mut self, pool: Box<dyn DataPool>) {
123        self.up_pool = Some(pool);
124    }
125
126    /// Setup the down pool
127    pub fn toward_pool(&mut self, pool: Box<dyn DataPool>) {
128        self.down_pool = Some(pool);
129    }
130
131    #[inline]
132    pub fn _object(mut self, object_name: &str) -> Self {
133        let mut o = match self.default {
134            PoolType::UpPool => self.upstream_object.take(),
135            PoolType::DownPool => self.downstream_object.take(),
136        }
137        .unwrap_or_default();
138        o.key = if object_name.starts_with('/') {
139            Some(object_name.to_string())
140        } else {
141            Some(format!("/{}", object_name))
142        };
143        match self.default {
144            PoolType::UpPool => self.upstream_object = Some(o),
145            PoolType::DownPool => self.downstream_object = Some(o),
146        };
147        self
148    }
149
150    #[inline]
151    pub fn _bucket(mut self, bucket_name: &str) -> Self {
152        let mut o = match self.default {
153            PoolType::UpPool => self.upstream_object.take(),
154            PoolType::DownPool => self.downstream_object.take(),
155        }
156        .unwrap_or_default();
157        o.bucket = Some(bucket_name.to_string());
158        match self.default {
159            PoolType::UpPool => self.upstream_object = Some(o),
160            PoolType::DownPool => self.downstream_object = Some(o),
161        };
162        self
163    }
164
165    /// Setup the object for the first pool connected by canal,
166    /// This api can be used without fully setting up two pools,
167    /// and just set up the object as you what you think.
168    pub fn object(self, object_name: &str) -> Self {
169        self._object(object_name)
170    }
171
172    /// The same as `object()`
173    #[cfg(not(feature = "slim"))]
174    pub fn key(self, key_name: &str) -> Self {
175        self._object(key_name)
176    }
177
178    /// Setup the bucket for the first pool connected by canal,
179    /// This api can be used without fully setting up two pools,
180    /// and just set up the object as you what you think.
181    pub fn bucket(self, bucket_name: &str) -> Self {
182        self._bucket(bucket_name)
183    }
184
185    #[cfg(not(feature = "slim"))]
186    /// The same as `bucket()`
187    pub fn folder(self, folder_name: &str) -> Self {
188        self._bucket(folder_name)
189    }
190
191    pub fn prefix(mut self, prefix_str: &str) -> Self {
192        self.filter = Some(Filter::Prefix(prefix_str.into()));
193        self
194    }
195
196    #[inline]
197    pub fn _toward_object(&mut self, object_name: &str) {
198        let mut o = self.downstream_object.take().unwrap_or_default();
199        o.key = if object_name.starts_with('/') {
200            Some(object_name.to_string())
201        } else {
202            Some(format!("/{}", object_name))
203        };
204        self.downstream_object = Some(o);
205    }
206
207    /// Setup the object in the down pool
208    pub fn toward_object(&mut self, object_name: &str) {
209        self._toward_object(object_name)
210    }
211
212    /// The same as `toward_object()`
213    #[cfg(not(feature = "slim"))]
214    pub fn toward_key(&mut self, object_name: &str) {
215        self._toward_object(object_name)
216    }
217
218    #[inline]
219    pub fn _toward_bucket(&mut self, bucket_name: &str) {
220        let mut o = self.downstream_object.take().unwrap_or_default();
221        o.bucket = Some(bucket_name.to_string());
222        self.downstream_object = Some(o);
223    }
224
225    /// Setup the bucket in the down pool
226    pub fn toward_bucket(&mut self, bucket_name: &str) {
227        self._toward_bucket(bucket_name)
228    }
229
230    /// The same as `toward_bucket()`
231    #[cfg(not(feature = "slim"))]
232    pub fn toward_folder(&mut self, folder_name: &str) {
233        self._toward_bucket(folder_name)
234    }
235
236    /// Setup the path in the down pool
237    #[cfg(not(feature = "slim"))]
238    pub fn toward_path(&mut self, path: &str) {
239        self.downstream_object = Some(path.into());
240    }
241
242    #[inline]
243    pub fn _from_object(&mut self, object_name: &str) {
244        let mut o = self.upstream_object.take().unwrap_or_default();
245        o.key = if object_name.starts_with('/') {
246            Some(object_name.to_string())
247        } else {
248            Some(format!("/{}", object_name))
249        };
250        self.upstream_object = Some(o);
251    }
252
253    /// Setup the object in the up pool
254    pub fn from_object(&mut self, object_name: &str) {
255        self._from_object(object_name)
256    }
257
258    /// The same as `from_object()`
259    #[cfg(not(feature = "slim"))]
260    pub fn from_key(&mut self, object_name: &str) {
261        self._from_object(object_name)
262    }
263
264    #[inline]
265    pub fn _from_bucket(&mut self, bucket_name: &str) {
266        let mut o = self.upstream_object.take().unwrap_or_default();
267        o.bucket = Some(bucket_name.to_string());
268        self.upstream_object = Some(o);
269    }
270
271    /// Setup the bucket in the up pool
272    pub fn from_bucket(&mut self, bucket_name: &str) {
273        self._from_bucket(bucket_name)
274    }
275
276    /// The same as `from_bucket()`
277    #[cfg(not(feature = "slim"))]
278    pub fn from_folder(&mut self, folder_name: &str) {
279        self._from_bucket(folder_name)
280    }
281
282    /// Setup the path in the up pool
283    #[cfg(not(feature = "slim"))]
284    pub fn from_path(&mut self, path: &str) {
285        self.upstream_object = Some(path.into());
286    }
287    // End of setting api
288
289    // Begin of IO api
290    /// Push the object from down pool to up pool.
291    pub async fn push(self) -> Result<(), Error> {
292        match (self.up_pool, self.down_pool) {
293            (Some(up_pool), Some(down_pool)) => {
294                if let Some(downstream_object) = self.downstream_object {
295                    let b = down_pool.pull(downstream_object.clone()).await?;
296                    up_pool
297                        .push(self.upstream_object.unwrap_or(downstream_object), b)
298                        .await?;
299                    Ok(())
300                } else {
301                    Err(Error::NoObject())
302                }
303            }
304            _ => Err(Error::PoolUninitializeError()),
305        }
306    }
307
308    /// Push a specified object from up pool to down pool
309    pub async fn push_obj(&self, obj: S3Object) -> Result<(), Error> {
310        match (&self.up_pool, &self.down_pool) {
311            (Some(up_pool), Some(down_pool)) => {
312                let b = down_pool.pull(obj.clone()).await?;
313                up_pool.push(obj, b).await?;
314                Ok(())
315            }
316            _ => Err(Error::PoolUninitializeError()),
317        }
318    }
319
320    /// Pull the object from up pool to down pool.
321    pub async fn pull(self) -> Result<(), Error> {
322        match (self.up_pool, self.down_pool) {
323            (Some(up_pool), Some(down_pool)) => {
324                if let Some(upstream_object) = self.upstream_object {
325                    let b = up_pool.pull(upstream_object.clone()).await?;
326                    down_pool
327                        .push(self.downstream_object.unwrap_or(upstream_object), b)
328                        .await?;
329                    Ok(())
330                } else {
331                    Err(Error::NoObject())
332                }
333            }
334            _ => Err(Error::PoolUninitializeError()),
335        }
336    }
337
338    /// Pull a specified object from up pool to down pool
339    pub async fn pull_obj(&self, obj: S3Object) -> Result<(), Error> {
340        match (&self.up_pool, &self.down_pool) {
341            (Some(up_pool), Some(down_pool)) => {
342                let b = up_pool.pull(obj.clone()).await?;
343                down_pool.push(obj, b).await?;
344                Ok(())
345            }
346            _ => Err(Error::PoolUninitializeError()),
347        }
348    }
349
350    /// Remove the object in the up pool.
351    pub async fn upstream_remove(self) -> Result<(), Error> {
352        if let Some(upstream_object) = self.upstream_object {
353            Ok(self
354                .up_pool
355                .expect("upstream pool should exist") // TODO customize Error
356                .remove(upstream_object)
357                .await?)
358        } else {
359            Err(Error::ResourceUrlError(
360                "can not remove on an object withouput setup".to_string(),
361            ))
362        }
363    }
364
365    /// Remove the object in the down pool.
366    pub async fn downstream_remove(self) -> Result<(), Error> {
367        if let Some(downstream_object) = self.downstream_object {
368            Ok(self
369                .down_pool
370                .expect("downstream pool should exist") // TODO customize Error
371                .remove(downstream_object)
372                .await?)
373        } else {
374            Err(Error::ResourceUrlError(
375                "can not remove on an object withouput setup".to_string(),
376            ))
377        }
378    }
379
380    /// Remove the object depence on the first pool connected by the canal
381    /// This api can be used without fully setting up two pools,
382    /// and remove object as you what you think.
383    pub async fn remove(self) -> Result<(), Error> {
384        match self.default {
385            PoolType::UpPool => self.upstream_remove().await,
386            PoolType::DownPool => self.downstream_remove().await,
387        }
388    }
389
390    /// List the objects in the up pool.
391    pub async fn upstream_list(self) -> Result<Box<dyn S3Folder>, Error> {
392        Ok(self
393            .up_pool
394            .expect("upstream pool should exist")
395            .list(self.upstream_object, &self.filter)
396            .await?)
397    }
398
399    /// List the objects in the down pool.
400    pub async fn downstream_list(self) -> Result<Box<dyn S3Folder>, Error> {
401        Ok(self
402            .down_pool
403            .expect("downstream pool should exist")
404            .list(self.downstream_object, &self.filter)
405            .await?)
406    }
407
408    /// List the objects depence on the first pool connected by the canal
409    /// This api can be used without fully setting up two pools,
410    /// and list objects as you what you think.
411    pub async fn list(self) -> Result<Box<dyn S3Folder>, Error> {
412        match self.default {
413            PoolType::UpPool => self.upstream_list().await,
414            PoolType::DownPool => self.downstream_list().await,
415        }
416    }
417
418    // pub async fn sync(self)
419    // End of IO api
420}