remote_files/
client.rs

1use crate::{
2    buckets::{GCSConfig, S3Config},
3    error,
4};
5use bytes::Buf;
6use futures::{stream, Future, Stream, StreamExt};
7pub use opendal::EntryMode;
8use opendal::{Entry, ErrorKind, Metakey, Operator};
9use std::{io::Read, path::Path, pin::Pin};
10use tokio::{
11    fs::File,
12    io::{AsyncReadExt, BufReader},
13};
14
15type Result<T> = std::result::Result<T, error::Client>;
16
17pub type StatEntry = (String, String, String, EntryMode);
18
19const DEFAULT_LIST_LIMIT: usize = 10;
20
21#[derive(Clone)]
22pub struct Client {
23    inner: Operator,
24}
25
26impl Client {
27    pub async fn stat(&self, path: &str) -> Result<StatEntry> {
28        let meta = self
29            .inner
30            .stat(path)
31            .await
32            .map_err(|err| error::Client::ListMetadata(path.to_string(), err))?;
33        match meta.mode() {
34            EntryMode::Unknown => Err(error::Client::StatUnknownMode(path.to_string())),
35            EntryMode::FILE => Ok((
36                path.to_string(),
37                meta.content_type().unwrap_or_default().to_string(),
38                meta.content_length().to_string(),
39                EntryMode::FILE,
40            )),
41            EntryMode::DIR => Ok((
42                path.to_string(),
43                meta.content_type().unwrap_or_default().to_string(),
44                String::from(""),
45                EntryMode::DIR,
46            )),
47        }
48    }
49
50    async fn stat_entries(&self, path: &str, entries: Vec<Entry>) -> Vec<StatEntry> {
51        let mut list = vec![];
52
53        for entry in entries {
54            let meta = self
55                .inner
56                .stat(entry.path())
57                .await
58                .map_err(|err| error::Client::ListMetadata(path.to_string(), err));
59
60            if let Ok(meta) = meta {
61                match meta.mode() {
62                    EntryMode::Unknown => continue,
63                    EntryMode::FILE => {
64                        list.push((
65                            entry.name().to_string(),
66                            meta.content_type().unwrap_or_default().to_string(),
67                            meta.content_length().to_string(),
68                            EntryMode::FILE,
69                        ));
70                    }
71                    EntryMode::DIR => {
72                        list.push((
73                            entry.name().to_string(),
74                            meta.content_type().unwrap_or_default().to_string(),
75                            String::from(""),
76                            EntryMode::DIR,
77                        ));
78                    }
79                }
80            } else {
81                println!("{:?}", meta.unwrap_err());
82            }
83        }
84
85        list
86    }
87
88    pub async fn list<'a>(
89        &'a self,
90        path: &'a str,
91        limit: Option<usize>,
92    ) -> Result<Pin<Box<dyn Stream<Item = impl Future<Output = Vec<StatEntry>> + '_> + '_>>> {
93        let should_paginate = limit.is_some();
94        let limit = limit.unwrap_or(DEFAULT_LIST_LIMIT);
95
96        let client = self.inner.clone();
97        let entries = client
98            .list_with(path)
99            .metakey(Metakey::ContentLength)
100            .await
101            .map_err(|err| match err.kind() {
102                ErrorKind::NotADirectory => error::Client::ListNotDirectory(path.to_string()),
103                _ => error::Client::Unhandled(err),
104            })?;
105
106        let stream = stream::iter(entries).chunks(limit);
107
108        if should_paginate {
109            Ok(stream.map(|chunk| self.stat_entries(path, chunk)).boxed())
110        } else {
111            Ok(stream
112                .take(1)
113                .map(|chunk| self.stat_entries(path, chunk))
114                .boxed())
115        }
116    }
117
118    pub async fn download(&self, path: &str) -> Result<Vec<u8>> {
119        self.inner
120            .read(path)
121            .await
122            .map_err(error::Client::Download)
123            .and_then(|b| {
124                let mut buffer = vec![];
125                let mut reader = b.reader();
126                reader.read_to_end(&mut buffer).map_err(|err| {
127                    error::Client::Download(opendal::Error::new(
128                        opendal::ErrorKind::Unexpected,
129                        err.to_string(),
130                    ))
131                })?;
132                Ok(buffer)
133            })
134    }
135
136    pub async fn upload(&self, src: &str, dest: &str, content_type: Option<&str>) -> Result<()> {
137        let filepath = Path::new(src);
138        let filename = filepath
139            .file_name()
140            .ok_or_else(|| error::Client::UploadInvalidFilePath(src.to_string()))?;
141
142        let file = File::open(filepath)
143            .await
144            .map_err(error::Client::UploadFileNotFound)?;
145        let mut buffer: Vec<u8> = vec![];
146        BufReader::new(file)
147            .read_to_end(&mut buffer)
148            .await
149            .map_err(|err| error::Client::UploadLoad(src.to_string(), err))?;
150
151        let dest = Path::new(dest).join(filename);
152        let dest = dest.to_str().unwrap();
153        match content_type {
154            None => {
155                self.inner
156                    .write(dest, buffer)
157                    .await
158                    .map_err(|err| error::Client::UploadWrite(dest.to_string(), err))?;
159            }
160            Some(content_type) => {
161                self.inner
162                    .write_with(dest, buffer)
163                    .content_type(content_type)
164                    .await
165                    .map_err(|err| error::Client::UploadWrite(dest.to_string(), err))?;
166            }
167        }
168
169        Ok(())
170    }
171
172    pub async fn delete(&self, path: &str) -> Result<()> {
173        println!("{}", path);
174        self.inner
175            .remove_all(path)
176            .await
177            .map_err(|error| error::Client::Delete {
178                path: path.to_string(),
179                error,
180            })
181    }
182}
183
184impl TryFrom<GCSConfig> for Client {
185    type Error = error::Client;
186
187    fn try_from(value: GCSConfig) -> std::result::Result<Self, Self::Error> {
188        Ok(Self {
189            inner: value.try_into().map_err(error::Client::Initialization)?,
190        })
191    }
192}
193
194impl TryFrom<S3Config> for Client {
195    type Error = error::Client;
196
197    fn try_from(value: S3Config) -> std::result::Result<Self, Self::Error> {
198        Ok(Self {
199            inner: value.try_into().map_err(error::Client::Initialization)?,
200        })
201    }
202}