1use std::collections::HashMap;
8use std::path::Path;
9use std::sync::Arc;
10
11use async_trait::async_trait;
12use chrono::{DateTime, Utc};
13
14use crate::infra::error::InfraError;
15
16pub type ProgressFn = Arc<dyn Fn(&str) + Send + Sync>;
23
24#[derive(Debug, Clone)]
33pub struct RemoteFile {
34 pub path: String,
35 pub size: Option<u64>,
36 pub modified_at: Option<DateTime<Utc>>,
38}
39
40#[async_trait]
45pub trait StorageBackend: Send + Sync {
46 async fn push(&self, local_path: &Path, remote_path: &str) -> Result<(), InfraError>;
48
49 async fn pull(&self, remote_path: &str, local_path: &Path) -> Result<(), InfraError>;
51
52 async fn list(&self, remote_path: &str) -> Result<Vec<RemoteFile>, InfraError>;
54
55 async fn exists(&self, remote_path: &str) -> Result<bool, InfraError>;
57
58 async fn delete(&self, remote_path: &str) -> Result<(), InfraError> {
64 Err(InfraError::Transfer {
65 reason: format!(
66 "delete not supported by {} backend for path: {remote_path}",
67 self.backend_type()
68 ),
69 })
70 }
71
72 async fn push_batch(
80 &self,
81 src_root: &Path,
82 dest_root: &str,
83 relative_paths: &[String],
84 ) -> HashMap<String, Result<(), InfraError>> {
85 let mut results = HashMap::with_capacity(relative_paths.len());
86 for rel in relative_paths {
87 let local_path = src_root.join(rel);
88 let remote_path = if dest_root.is_empty() {
89 rel.clone()
90 } else {
91 format!("{dest_root}/{rel}")
92 };
93 let result = self.push(&local_path, &remote_path).await;
94 results.insert(rel.clone(), result);
95 }
96 results
97 }
98
99 async fn pull_batch(
107 &self,
108 src_root: &str,
109 dest_root: &Path,
110 relative_paths: &[String],
111 ) -> HashMap<String, Result<(), InfraError>> {
112 let mut results = HashMap::with_capacity(relative_paths.len());
113 for rel in relative_paths {
114 let remote_path = if src_root.is_empty() {
115 rel.clone()
116 } else {
117 format!("{src_root}/{rel}")
118 };
119 let local_path = dest_root.join(rel);
120 let result = self.pull(&remote_path, &local_path).await;
121 results.insert(rel.clone(), result);
122 }
123 results
124 }
125
126 async fn delete_batch(
134 &self,
135 remote_root: &str,
136 relative_paths: &[String],
137 ) -> HashMap<String, Result<(), InfraError>> {
138 let mut results = HashMap::with_capacity(relative_paths.len());
139 for rel in relative_paths {
140 let remote_path = if remote_root.is_empty() {
141 rel.clone()
142 } else {
143 format!("{remote_root}/{rel}")
144 };
145 let result = self.delete(&remote_path).await;
146 results.insert(rel.clone(), result);
147 }
148 results
149 }
150
151 fn supports_batch(&self) -> bool {
156 false
157 }
158
159 fn backend_type(&self) -> &str;
161
162 fn set_progress_callback(&self, _callback: Option<ProgressFn>) {}
169
170 async fn ensure(&self) -> Result<(), InfraError> {
177 self.list("").await.map(|_| ())
178 }
179}
180
181#[cfg(any(test, feature = "test-utils"))]
183pub mod memory {
184 use super::*;
185 use std::collections::HashMap;
186 use tokio::sync::Mutex;
187
188 pub struct InMemoryBackend {
190 pub log: Mutex<Vec<Op>>,
191 pub fail_next: Mutex<bool>,
192 pub files: Mutex<HashMap<String, Vec<u8>>>,
193 }
194
195 impl Default for InMemoryBackend {
196 fn default() -> Self {
197 Self {
198 log: Mutex::new(Vec::new()),
199 fail_next: Mutex::new(false),
200 files: Mutex::new(HashMap::new()),
201 }
202 }
203 }
204
205 #[derive(Debug, Clone)]
206 pub enum Op {
207 Push { local: String, remote: String },
208 Pull { remote: String, local: String },
209 List { path: String },
210 Exists { path: String },
211 Delete { path: String },
212 }
213
214 #[async_trait]
215 impl StorageBackend for InMemoryBackend {
216 async fn push(&self, local_path: &Path, remote_path: &str) -> Result<(), InfraError> {
217 self.log.lock().await.push(Op::Push {
218 local: local_path.display().to_string(),
219 remote: remote_path.into(),
220 });
221 let mut guard = self.fail_next.lock().await;
222 if *guard {
223 *guard = false;
224 return Err(InfraError::Transfer {
225 reason: "mock push error".into(),
226 }
227 .into());
228 }
229 Ok(())
230 }
231
232 async fn pull(&self, remote_path: &str, local_path: &Path) -> Result<(), InfraError> {
233 self.log.lock().await.push(Op::Pull {
234 remote: remote_path.into(),
235 local: local_path.display().to_string(),
236 });
237 let mut guard = self.fail_next.lock().await;
238 if *guard {
239 *guard = false;
240 return Err(InfraError::Transfer {
241 reason: "mock pull error".into(),
242 }
243 .into());
244 }
245 Ok(())
246 }
247
248 async fn list(&self, remote_path: &str) -> Result<Vec<RemoteFile>, InfraError> {
249 self.log.lock().await.push(Op::List {
250 path: remote_path.into(),
251 });
252 let files = self.files.lock().await;
253 Ok(files
254 .iter()
255 .map(|(path, data)| RemoteFile {
256 path: path.clone(),
257 size: Some(data.len() as u64),
258 modified_at: None,
259 })
260 .collect())
261 }
262
263 async fn exists(&self, remote_path: &str) -> Result<bool, InfraError> {
264 self.log.lock().await.push(Op::Exists {
265 path: remote_path.into(),
266 });
267 Ok(self.files.lock().await.contains_key(remote_path))
268 }
269
270 async fn delete(&self, remote_path: &str) -> Result<(), InfraError> {
271 self.log.lock().await.push(Op::Delete {
272 path: remote_path.into(),
273 });
274 let mut guard = self.fail_next.lock().await;
275 if *guard {
276 *guard = false;
277 return Err(InfraError::Transfer {
278 reason: "mock delete error".into(),
279 }
280 .into());
281 }
282 self.files.lock().await.remove(remote_path);
283 Ok(())
284 }
285
286 fn backend_type(&self) -> &str {
287 "memory"
288 }
289 }
290
291 #[async_trait]
295 impl StorageBackend for std::sync::Arc<InMemoryBackend> {
296 async fn push(&self, local_path: &Path, remote_path: &str) -> Result<(), InfraError> {
297 (**self).push(local_path, remote_path).await
298 }
299 async fn pull(&self, remote_path: &str, local_path: &Path) -> Result<(), InfraError> {
300 (**self).pull(remote_path, local_path).await
301 }
302 async fn list(&self, remote_path: &str) -> Result<Vec<RemoteFile>, InfraError> {
303 (**self).list(remote_path).await
304 }
305 async fn exists(&self, remote_path: &str) -> Result<bool, InfraError> {
306 (**self).exists(remote_path).await
307 }
308 async fn delete(&self, remote_path: &str) -> Result<(), InfraError> {
309 (**self).delete(remote_path).await
310 }
311 async fn push_batch(
312 &self,
313 src_root: &Path,
314 dest_root: &str,
315 relative_paths: &[String],
316 ) -> HashMap<String, Result<(), InfraError>> {
317 (**self)
318 .push_batch(src_root, dest_root, relative_paths)
319 .await
320 }
321 async fn delete_batch(
322 &self,
323 remote_root: &str,
324 relative_paths: &[String],
325 ) -> HashMap<String, Result<(), InfraError>> {
326 (**self).delete_batch(remote_root, relative_paths).await
327 }
328 fn supports_batch(&self) -> bool {
329 (**self).supports_batch()
330 }
331 fn backend_type(&self) -> &str {
332 (**self).backend_type()
333 }
334 async fn ensure(&self) -> Result<(), InfraError> {
335 (**self).ensure().await
336 }
337 fn set_progress_callback(&self, callback: Option<ProgressFn>) {
338 (**self).set_progress_callback(callback);
339 }
340 }
341}