1use std::collections::HashMap;
8use std::path::Path;
9use std::sync::Arc;
10
11use async_trait::async_trait;
12use chrono::{DateTime, Utc};
13
14use crate::infra::error::InfraError;
15
16pub type ProgressFn = Arc<dyn Fn(&str) + Send + Sync>;
23
24#[derive(Debug, Clone)]
33pub struct RemoteFile {
34 pub path: String,
35 pub size: Option<u64>,
36 pub modified_at: Option<DateTime<Utc>>,
38}
39
40#[async_trait]
45pub trait StorageBackend: Send + Sync {
46 async fn push(&self, local_path: &Path, remote_path: &str) -> Result<(), InfraError>;
48
49 async fn pull(&self, remote_path: &str, local_path: &Path) -> Result<(), InfraError>;
51
52 async fn list(&self, remote_path: &str) -> Result<Vec<RemoteFile>, InfraError>;
54
55 async fn exists(&self, remote_path: &str) -> Result<bool, InfraError>;
57
58 async fn delete(&self, remote_path: &str) -> Result<(), InfraError> {
64 Err(InfraError::Transfer {
65 reason: format!(
66 "delete not supported by {} backend for path: {remote_path}",
67 self.backend_type()
68 ),
69 })
70 }
71
72 async fn archive_move(
83 &self,
84 src_remote_path: &str,
85 archive_remote_path: &str,
86 ) -> Result<(), InfraError> {
87 Err(InfraError::Transfer {
88 reason: format!(
89 "archive_move not supported by {} backend (src={src_remote_path}, dest={archive_remote_path})",
90 self.backend_type()
91 ),
92 })
93 }
94
95 async fn archive_move_batch(
103 &self,
104 src_root: &str,
105 archive_dest_root: &str,
106 relative_paths: &[String],
107 ) -> HashMap<String, Result<(), InfraError>> {
108 let mut results = HashMap::with_capacity(relative_paths.len());
109 for rel in relative_paths {
110 let src = if src_root.is_empty() {
111 rel.clone()
112 } else {
113 format!("{src_root}/{rel}")
114 };
115 let dest = if archive_dest_root.is_empty() {
116 rel.clone()
117 } else {
118 format!("{archive_dest_root}/{rel}")
119 };
120 let result = self.archive_move(&src, &dest).await;
121 results.insert(rel.clone(), result);
122 }
123 results
124 }
125
126 async fn push_batch(
134 &self,
135 src_root: &Path,
136 dest_root: &str,
137 relative_paths: &[String],
138 ) -> HashMap<String, Result<(), InfraError>> {
139 let mut results = HashMap::with_capacity(relative_paths.len());
140 for rel in relative_paths {
141 let local_path = src_root.join(rel);
142 let remote_path = if dest_root.is_empty() {
143 rel.clone()
144 } else {
145 format!("{dest_root}/{rel}")
146 };
147 let result = self.push(&local_path, &remote_path).await;
148 results.insert(rel.clone(), result);
149 }
150 results
151 }
152
153 async fn pull_batch(
161 &self,
162 src_root: &str,
163 dest_root: &Path,
164 relative_paths: &[String],
165 ) -> HashMap<String, Result<(), InfraError>> {
166 let mut results = HashMap::with_capacity(relative_paths.len());
167 for rel in relative_paths {
168 let remote_path = if src_root.is_empty() {
169 rel.clone()
170 } else {
171 format!("{src_root}/{rel}")
172 };
173 let local_path = dest_root.join(rel);
174 let result = self.pull(&remote_path, &local_path).await;
175 results.insert(rel.clone(), result);
176 }
177 results
178 }
179
180 async fn delete_batch(
188 &self,
189 remote_root: &str,
190 relative_paths: &[String],
191 ) -> HashMap<String, Result<(), InfraError>> {
192 let mut results = HashMap::with_capacity(relative_paths.len());
193 for rel in relative_paths {
194 let remote_path = if remote_root.is_empty() {
195 rel.clone()
196 } else {
197 format!("{remote_root}/{rel}")
198 };
199 let result = self.delete(&remote_path).await;
200 results.insert(rel.clone(), result);
201 }
202 results
203 }
204
205 fn supports_batch(&self) -> bool {
210 false
211 }
212
213 fn backend_type(&self) -> &str;
215
216 fn set_progress_callback(&self, _callback: Option<ProgressFn>) {}
223
224 async fn ensure(&self) -> Result<(), InfraError> {
231 self.list("").await.map(|_| ())
232 }
233}
234
235#[cfg(any(test, feature = "test-utils"))]
237pub mod memory {
238 use super::*;
239 use std::collections::HashMap;
240 use tokio::sync::Mutex;
241
242 pub struct InMemoryBackend {
244 pub log: Mutex<Vec<Op>>,
245 pub fail_next: Mutex<bool>,
246 pub files: Mutex<HashMap<String, Vec<u8>>>,
247 }
248
249 impl Default for InMemoryBackend {
250 fn default() -> Self {
251 Self {
252 log: Mutex::new(Vec::new()),
253 fail_next: Mutex::new(false),
254 files: Mutex::new(HashMap::new()),
255 }
256 }
257 }
258
259 #[derive(Debug, Clone)]
260 pub enum Op {
261 Push { local: String, remote: String },
262 Pull { remote: String, local: String },
263 List { path: String },
264 Exists { path: String },
265 Delete { path: String },
266 }
267
268 #[async_trait]
269 impl StorageBackend for InMemoryBackend {
270 async fn push(&self, local_path: &Path, remote_path: &str) -> Result<(), InfraError> {
271 self.log.lock().await.push(Op::Push {
272 local: local_path.display().to_string(),
273 remote: remote_path.into(),
274 });
275 let mut guard = self.fail_next.lock().await;
276 if *guard {
277 *guard = false;
278 return Err(InfraError::Transfer {
279 reason: "mock push error".into(),
280 });
281 }
282 Ok(())
283 }
284
285 async fn pull(&self, remote_path: &str, local_path: &Path) -> Result<(), InfraError> {
286 self.log.lock().await.push(Op::Pull {
287 remote: remote_path.into(),
288 local: local_path.display().to_string(),
289 });
290 let mut guard = self.fail_next.lock().await;
291 if *guard {
292 *guard = false;
293 return Err(InfraError::Transfer {
294 reason: "mock pull error".into(),
295 });
296 }
297 Ok(())
298 }
299
300 async fn list(&self, remote_path: &str) -> Result<Vec<RemoteFile>, InfraError> {
301 self.log.lock().await.push(Op::List {
302 path: remote_path.into(),
303 });
304 let files = self.files.lock().await;
305 Ok(files
306 .iter()
307 .map(|(path, data)| RemoteFile {
308 path: path.clone(),
309 size: Some(data.len() as u64),
310 modified_at: None,
311 })
312 .collect())
313 }
314
315 async fn exists(&self, remote_path: &str) -> Result<bool, InfraError> {
316 self.log.lock().await.push(Op::Exists {
317 path: remote_path.into(),
318 });
319 Ok(self.files.lock().await.contains_key(remote_path))
320 }
321
322 async fn delete(&self, remote_path: &str) -> Result<(), InfraError> {
323 self.log.lock().await.push(Op::Delete {
324 path: remote_path.into(),
325 });
326 let mut guard = self.fail_next.lock().await;
327 if *guard {
328 *guard = false;
329 return Err(InfraError::Transfer {
330 reason: "mock delete error".into(),
331 });
332 }
333 self.files.lock().await.remove(remote_path);
334 Ok(())
335 }
336
337 fn backend_type(&self) -> &str {
338 "memory"
339 }
340 }
341
342 #[async_trait]
346 impl StorageBackend for std::sync::Arc<InMemoryBackend> {
347 async fn push(&self, local_path: &Path, remote_path: &str) -> Result<(), InfraError> {
348 (**self).push(local_path, remote_path).await
349 }
350 async fn pull(&self, remote_path: &str, local_path: &Path) -> Result<(), InfraError> {
351 (**self).pull(remote_path, local_path).await
352 }
353 async fn list(&self, remote_path: &str) -> Result<Vec<RemoteFile>, InfraError> {
354 (**self).list(remote_path).await
355 }
356 async fn exists(&self, remote_path: &str) -> Result<bool, InfraError> {
357 (**self).exists(remote_path).await
358 }
359 async fn delete(&self, remote_path: &str) -> Result<(), InfraError> {
360 (**self).delete(remote_path).await
361 }
362 async fn push_batch(
363 &self,
364 src_root: &Path,
365 dest_root: &str,
366 relative_paths: &[String],
367 ) -> HashMap<String, Result<(), InfraError>> {
368 (**self)
369 .push_batch(src_root, dest_root, relative_paths)
370 .await
371 }
372 async fn delete_batch(
373 &self,
374 remote_root: &str,
375 relative_paths: &[String],
376 ) -> HashMap<String, Result<(), InfraError>> {
377 (**self).delete_batch(remote_root, relative_paths).await
378 }
379 fn supports_batch(&self) -> bool {
380 (**self).supports_batch()
381 }
382 fn backend_type(&self) -> &str {
383 (**self).backend_type()
384 }
385 async fn ensure(&self) -> Result<(), InfraError> {
386 (**self).ensure().await
387 }
388 fn set_progress_callback(&self, callback: Option<ProgressFn>) {
389 (**self).set_progress_callback(callback);
390 }
391 }
392}