1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use std::path::Path;

use async_trait::async_trait;
use bytes::Bytes;
use tokio::fs::{create_dir, read, read_dir, remove_dir_all, remove_file, write, ReadDir};
use url::Url;

use crate::error::Error;
use crate::tokio_async::traits::{DataPool, S3Folder};
use crate::utils::S3Object;

#[async_trait]
impl S3Folder for ReadDir {
    async fn next_object(&mut self) -> Result<Option<S3Object>, Error> {
        Ok(self.next_entry().await?.map(|e| S3Object {
            key: e.path().to_str().map(|s| s.to_string()),
            ..Default::default()
        }))
    }
}

#[derive(Clone, Debug)]
pub struct FilePool {
    /// use "/" for *nix, "C://" for windows (not tested)
    pub drive: String,
}
impl Default for FilePool {
    fn default() -> Self {
        Self { drive: "/".into() }
    }
}

impl FilePool {
    pub fn new(path: &str) -> Result<Self, Error> {
        let mut fp = FilePool::default();
        if path.starts_with("/") {
            fp.drive = "/".to_string();
        } else {
            match Url::parse(path) {
                Ok(r) => {
                    if ["s3", "S3"].contains(&r.scheme()) {
                        return Err(Error::SchemeError());
                    }
                }
                _ => {}
            }
        }
        Ok(fp)
    }
}

unsafe impl Send for FilePool {}
unsafe impl Sync for FilePool {}

#[async_trait]
impl DataPool for FilePool {
    async fn push(&self, desc: S3Object, object: Bytes) -> Result<(), Error> {
        if let Some(b) = desc.bucket {
            let r = if let Some(k) = desc.key {
                write(Path::new(&format!("{}{}{}", self.drive, b, k)), object).await
            } else {
                create_dir(Path::new(&b)).await
            };
            r.or_else(|e| Err(e.into()))
        } else {
            Err(Error::ModifyEmptyBucketError())
        }
    }

    async fn pull(&self, desc: S3Object) -> Result<Bytes, Error> {
        if let S3Object {
            bucket: Some(b),
            key: Some(k),
            ..
        } = desc
        {
            return match read(Path::new(&format!("{}{}{}", self.drive, b, k))).await {
                // TODO: figure ouput how to use Bytes in tokio
                Ok(c) => Ok(Bytes::copy_from_slice(&c)),
                Err(e) => Err(e.into()),
            };
        }
        Err(Error::PullEmptyObjectError())
    }

    async fn list(&self, index: Option<S3Object>) -> Result<Box<dyn S3Folder>, Error> {
        match index {
            Some(S3Object {
                bucket: Some(b),
                key: None,
                ..
            }) => Ok(Box::new(
                read_dir(Path::new(&format!("{}{}", self.drive, b))).await?,
            )),
            Some(S3Object {
                bucket: Some(b),
                key: Some(k),
                ..
            }) => Ok(Box::new(
                read_dir(Path::new(&format!("{}{}{}", self.drive, b, k))).await?,
            )),
            Some(S3Object { bucket: None, .. }) | None => Ok(Box::new(
                read_dir(Path::new(&format!("{}", self.drive))).await?,
            )),
        }
    }

    async fn remove(&self, desc: S3Object) -> Result<(), Error> {
        if let Some(b) = desc.bucket {
            let r = if let Some(k) = desc.key {
                remove_file(Path::new(&format!("{}{}{}", self.drive, b, k))).await
            } else {
                remove_dir_all(Path::new(&b)).await
            };
            r.or_else(|e| Err(e.into()))
        } else {
            Err(Error::ModifyEmptyBucketError())
        }
    }

    fn check_scheme(&self, _scheme: &str) -> Result<(), Error> {
        panic!("file pool use new to create a valid, without this function")
    }
}