1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use std::path::Path;
use async_trait::async_trait;
use bytes::Bytes;
use tokio::fs::{create_dir, read, read_dir, remove_dir_all, remove_file, write, ReadDir};
use url::Url;
use crate::error::Error;
use crate::tokio_async::traits::{DataPool, S3Folder};
use crate::utils::S3Object;
#[async_trait]
impl S3Folder for ReadDir {
async fn next_object(&mut self) -> Result<Option<S3Object>, Error> {
Ok(self.next_entry().await?.map(|e| S3Object {
key: e.path().to_str().map(|s| s.to_string()),
..Default::default()
}))
}
}
#[derive(Clone, Debug)]
pub struct FilePool {
pub drive: String,
}
impl Default for FilePool {
fn default() -> Self {
Self { drive: "/".into() }
}
}
impl FilePool {
pub fn new(path: &str) -> Result<Self, Error> {
let mut fp = FilePool::default();
if path.starts_with("/") {
fp.drive = "/".to_string();
} else {
match Url::parse(path) {
Ok(r) => {
if ["s3", "S3"].contains(&r.scheme()) {
return Err(Error::SchemeError());
}
}
_ => {}
}
}
Ok(fp)
}
}
unsafe impl Send for FilePool {}
unsafe impl Sync for FilePool {}
#[async_trait]
impl DataPool for FilePool {
async fn push(&self, desc: S3Object, object: Bytes) -> Result<(), Error> {
if let Some(b) = desc.bucket {
let r = if let Some(k) = desc.key {
write(Path::new(&format!("{}{}{}", self.drive, b, k)), object).await
} else {
create_dir(Path::new(&b)).await
};
r.or_else(|e| Err(e.into()))
} else {
Err(Error::ModifyEmptyBucketError())
}
}
async fn pull(&self, desc: S3Object) -> Result<Bytes, Error> {
if let S3Object {
bucket: Some(b),
key: Some(k),
..
} = desc
{
return match read(Path::new(&format!("{}{}{}", self.drive, b, k))).await {
Ok(c) => Ok(Bytes::copy_from_slice(&c)),
Err(e) => Err(e.into()),
};
}
Err(Error::PullEmptyObjectError())
}
async fn list(&self, index: Option<S3Object>) -> Result<Box<dyn S3Folder>, Error> {
match index {
Some(S3Object {
bucket: Some(b),
key: None,
..
}) => Ok(Box::new(
read_dir(Path::new(&format!("{}{}", self.drive, b))).await?,
)),
Some(S3Object {
bucket: Some(b),
key: Some(k),
..
}) => Ok(Box::new(
read_dir(Path::new(&format!("{}{}{}", self.drive, b, k))).await?,
)),
Some(S3Object { bucket: None, .. }) | None => Ok(Box::new(
read_dir(Path::new(&format!("{}", self.drive))).await?,
)),
}
}
async fn remove(&self, desc: S3Object) -> Result<(), Error> {
if let Some(b) = desc.bucket {
let r = if let Some(k) = desc.key {
remove_file(Path::new(&format!("{}{}{}", self.drive, b, k))).await
} else {
remove_dir_all(Path::new(&b)).await
};
r.or_else(|e| Err(e.into()))
} else {
Err(Error::ModifyEmptyBucketError())
}
}
fn check_scheme(&self, _scheme: &str) -> Result<(), Error> {
panic!("file pool use new to create a valid, without this function")
}
}