1use crate::{
2 buckets::{GCSConfig, S3Config},
3 error,
4};
5use bytes::Buf;
6use futures::{stream, Future, Stream, StreamExt};
7pub use opendal::EntryMode;
8use opendal::{Entry, ErrorKind, Metakey, Operator};
9use std::{io::Read, path::Path, pin::Pin};
10use tokio::{
11 fs::File,
12 io::{AsyncReadExt, BufReader},
13};
14
15type Result<T> = std::result::Result<T, error::Client>;
16
17pub type StatEntry = (String, String, String, EntryMode);
18
19const DEFAULT_LIST_LIMIT: usize = 10;
20
21#[derive(Clone)]
22pub struct Client {
23 inner: Operator,
24}
25
26impl Client {
27 pub async fn stat(&self, path: &str) -> Result<StatEntry> {
28 let meta = self
29 .inner
30 .stat(path)
31 .await
32 .map_err(|err| error::Client::ListMetadata(path.to_string(), err))?;
33 match meta.mode() {
34 EntryMode::Unknown => Err(error::Client::StatUnknownMode(path.to_string())),
35 EntryMode::FILE => Ok((
36 path.to_string(),
37 meta.content_type().unwrap_or_default().to_string(),
38 meta.content_length().to_string(),
39 EntryMode::FILE,
40 )),
41 EntryMode::DIR => Ok((
42 path.to_string(),
43 meta.content_type().unwrap_or_default().to_string(),
44 String::from(""),
45 EntryMode::DIR,
46 )),
47 }
48 }
49
50 async fn stat_entries(&self, path: &str, entries: Vec<Entry>) -> Vec<StatEntry> {
51 let mut list = vec![];
52
53 for entry in entries {
54 let meta = self
55 .inner
56 .stat(entry.path())
57 .await
58 .map_err(|err| error::Client::ListMetadata(path.to_string(), err));
59
60 if let Ok(meta) = meta {
61 match meta.mode() {
62 EntryMode::Unknown => continue,
63 EntryMode::FILE => {
64 list.push((
65 entry.name().to_string(),
66 meta.content_type().unwrap_or_default().to_string(),
67 meta.content_length().to_string(),
68 EntryMode::FILE,
69 ));
70 }
71 EntryMode::DIR => {
72 list.push((
73 entry.name().to_string(),
74 meta.content_type().unwrap_or_default().to_string(),
75 String::from(""),
76 EntryMode::DIR,
77 ));
78 }
79 }
80 } else {
81 println!("{:?}", meta.unwrap_err());
82 }
83 }
84
85 list
86 }
87
88 pub async fn list<'a>(
89 &'a self,
90 path: &'a str,
91 limit: Option<usize>,
92 ) -> Result<Pin<Box<dyn Stream<Item = impl Future<Output = Vec<StatEntry>> + '_> + '_>>> {
93 let should_paginate = limit.is_some();
94 let limit = limit.unwrap_or(DEFAULT_LIST_LIMIT);
95
96 let client = self.inner.clone();
97 let entries = client
98 .list_with(path)
99 .metakey(Metakey::ContentLength)
100 .await
101 .map_err(|err| match err.kind() {
102 ErrorKind::NotADirectory => error::Client::ListNotDirectory(path.to_string()),
103 _ => error::Client::Unhandled(err),
104 })?;
105
106 let stream = stream::iter(entries).chunks(limit);
107
108 if should_paginate {
109 Ok(stream.map(|chunk| self.stat_entries(path, chunk)).boxed())
110 } else {
111 Ok(stream
112 .take(1)
113 .map(|chunk| self.stat_entries(path, chunk))
114 .boxed())
115 }
116 }
117
118 pub async fn download(&self, path: &str) -> Result<Vec<u8>> {
119 self.inner
120 .read(path)
121 .await
122 .map_err(error::Client::Download)
123 .and_then(|b| {
124 let mut buffer = vec![];
125 let mut reader = b.reader();
126 reader.read_to_end(&mut buffer).map_err(|err| {
127 error::Client::Download(opendal::Error::new(
128 opendal::ErrorKind::Unexpected,
129 err.to_string(),
130 ))
131 })?;
132 Ok(buffer)
133 })
134 }
135
136 pub async fn upload(&self, src: &str, dest: &str, content_type: Option<&str>) -> Result<()> {
137 let filepath = Path::new(src);
138 let filename = filepath
139 .file_name()
140 .ok_or_else(|| error::Client::UploadInvalidFilePath(src.to_string()))?;
141
142 let file = File::open(filepath)
143 .await
144 .map_err(error::Client::UploadFileNotFound)?;
145 let mut buffer: Vec<u8> = vec![];
146 BufReader::new(file)
147 .read_to_end(&mut buffer)
148 .await
149 .map_err(|err| error::Client::UploadLoad(src.to_string(), err))?;
150
151 let dest = Path::new(dest).join(filename);
152 let dest = dest.to_str().unwrap();
153 match content_type {
154 None => {
155 self.inner
156 .write(dest, buffer)
157 .await
158 .map_err(|err| error::Client::UploadWrite(dest.to_string(), err))?;
159 }
160 Some(content_type) => {
161 self.inner
162 .write_with(dest, buffer)
163 .content_type(content_type)
164 .await
165 .map_err(|err| error::Client::UploadWrite(dest.to_string(), err))?;
166 }
167 }
168
169 Ok(())
170 }
171
172 pub async fn delete(&self, path: &str) -> Result<()> {
173 println!("{}", path);
174 self.inner
175 .remove_all(path)
176 .await
177 .map_err(|error| error::Client::Delete {
178 path: path.to_string(),
179 error,
180 })
181 }
182}
183
184impl TryFrom<GCSConfig> for Client {
185 type Error = error::Client;
186
187 fn try_from(value: GCSConfig) -> std::result::Result<Self, Self::Error> {
188 Ok(Self {
189 inner: value.try_into().map_err(error::Client::Initialization)?,
190 })
191 }
192}
193
194impl TryFrom<S3Config> for Client {
195 type Error = error::Client;
196
197 fn try_from(value: S3Config) -> std::result::Result<Self, Self::Error> {
198 Ok(Self {
199 inner: value.try_into().map_err(error::Client::Initialization)?,
200 })
201 }
202}