Skip to main content

vdsl_sync/infra/
backend.rs

1//! Abstract file transfer backend.
2//!
3//! Each remote location has an associated [`StorageBackend`] that handles
4//! push/pull/list/exists operations. vdsl-sync defines the trait;
5//! consumers (e.g. vdsl-mcp) provide concrete implementations.
6
7use std::collections::HashMap;
8use std::path::Path;
9use std::sync::Arc;
10
11use async_trait::async_trait;
12use chrono::{DateTime, Utc};
13
14use crate::infra::error::InfraError;
15
16/// Progress callback for reporting transfer phase changes.
17///
18/// Called with a human-readable phase description:
19/// - `"pushing to pod-xxx: chunk 5/22 (500/2111)"`
20/// - `"pulling from cloud: 1200 files"`
21/// - `"target cloud: 4383 queued"`
22pub type ProgressFn = Arc<dyn Fn(&str) + Send + Sync>;
23
24/// A file discovered on a remote location.
25///
26/// Metadata available depends on the storage backend:
27/// - `size`: most backends provide this (rclone lsf `%s`)
28/// - `modified_at`: available from rclone lsf `%t` (ISO 8601)
29///
30/// Used for metadata-based change detection on Cloud storage
31/// where content hash computation requires downloading the file.
32#[derive(Debug, Clone)]
33pub struct RemoteFile {
34    pub path: String,
35    pub size: Option<u64>,
36    /// Last modification time reported by the storage backend.
37    pub modified_at: Option<DateTime<Utc>>,
38}
39
40/// Abstract file transfer backend.
41///
42/// Implementations handle the actual data movement for a specific protocol.
43/// The sync service routes operations to the correct backend based on location.
44#[async_trait]
45pub trait StorageBackend: Send + Sync {
46    /// Push a local file to this remote.
47    async fn push(&self, local_path: &Path, remote_path: &str) -> Result<(), InfraError>;
48
49    /// Pull a file from this remote to a local path.
50    async fn pull(&self, remote_path: &str, local_path: &Path) -> Result<(), InfraError>;
51
52    /// List files at a remote path.
53    async fn list(&self, remote_path: &str) -> Result<Vec<RemoteFile>, InfraError>;
54
55    /// Check if a remote file exists.
56    async fn exists(&self, remote_path: &str) -> Result<bool, InfraError>;
57
58    /// Delete a file on this remote.
59    ///
60    /// Returns `Ok(())` if the file was deleted or didn't exist.
61    /// Default implementation returns `Err` — backends that support deletion
62    /// must override this.
63    async fn delete(&self, remote_path: &str) -> Result<(), InfraError> {
64        Err(InfraError::Transfer {
65            reason: format!(
66                "delete not supported by {} backend for path: {remote_path}",
67                self.backend_type()
68            ),
69        })
70    }
71
72    /// Move a file to an archive path (soft delete).
73    ///
74    /// Semantics: `src_remote_path` is moved to `archive_remote_path` atomically.
75    /// Used by cold-storage backends (B2) to preserve deleted file revisions
76    /// instead of hard-deleting them. The caller constructs the archive path
77    /// (typically `{archive_root}/{ISO8601_ts}/{relative_path}`).
78    ///
79    /// Default implementation returns `Err` — backends that don't support
80    /// archive-on-delete should leave this unimplemented; callers must check
81    /// before invoking.
82    async fn archive_move(
83        &self,
84        src_remote_path: &str,
85        archive_remote_path: &str,
86    ) -> Result<(), InfraError> {
87        Err(InfraError::Transfer {
88            reason: format!(
89                "archive_move not supported by {} backend (src={src_remote_path}, dest={archive_remote_path})",
90                self.backend_type()
91            ),
92        })
93    }
94
95    /// Batch archive-move: relocate multiple files from `src_root` to
96    /// `archive_dest_root` preserving relative paths.
97    ///
98    /// Semantics: for each `relative_path`, moves
99    /// `{src_root}/{relative_path}` → `{archive_dest_root}/{relative_path}`.
100    ///
101    /// Default implementation falls back to sequential `archive_move()`.
102    async fn archive_move_batch(
103        &self,
104        src_root: &str,
105        archive_dest_root: &str,
106        relative_paths: &[String],
107    ) -> HashMap<String, Result<(), InfraError>> {
108        let mut results = HashMap::with_capacity(relative_paths.len());
109        for rel in relative_paths {
110            let src = if src_root.is_empty() {
111                rel.clone()
112            } else {
113                format!("{src_root}/{rel}")
114            };
115            let dest = if archive_dest_root.is_empty() {
116                rel.clone()
117            } else {
118                format!("{archive_dest_root}/{rel}")
119            };
120            let result = self.archive_move(&src, &dest).await;
121            results.insert(rel.clone(), result);
122        }
123        results
124    }
125
126    /// Push multiple files in a single batch operation.
127    ///
128    /// `src_root` is the local base directory, `dest_root` is the remote base,
129    /// and `relative_paths` are paths relative to both roots.
130    ///
131    /// Returns a map of relative_path → Ok/Err for per-file status tracking.
132    /// Default implementation falls back to sequential `push()` calls.
133    async fn push_batch(
134        &self,
135        src_root: &Path,
136        dest_root: &str,
137        relative_paths: &[String],
138    ) -> HashMap<String, Result<(), InfraError>> {
139        let mut results = HashMap::with_capacity(relative_paths.len());
140        for rel in relative_paths {
141            let local_path = src_root.join(rel);
142            let remote_path = if dest_root.is_empty() {
143                rel.clone()
144            } else {
145                format!("{dest_root}/{rel}")
146            };
147            let result = self.push(&local_path, &remote_path).await;
148            results.insert(rel.clone(), result);
149        }
150        results
151    }
152
153    /// Pull multiple files in a single batch operation.
154    ///
155    /// `src_root` is the remote base, `dest_root` is the local base directory,
156    /// and `relative_paths` are paths relative to both roots.
157    ///
158    /// Returns a map of relative_path → Ok/Err for per-file status tracking.
159    /// Default implementation falls back to sequential `pull()` calls.
160    async fn pull_batch(
161        &self,
162        src_root: &str,
163        dest_root: &Path,
164        relative_paths: &[String],
165    ) -> HashMap<String, Result<(), InfraError>> {
166        let mut results = HashMap::with_capacity(relative_paths.len());
167        for rel in relative_paths {
168            let remote_path = if src_root.is_empty() {
169                rel.clone()
170            } else {
171                format!("{src_root}/{rel}")
172            };
173            let local_path = dest_root.join(rel);
174            let result = self.pull(&remote_path, &local_path).await;
175            results.insert(rel.clone(), result);
176        }
177        results
178    }
179
180    /// Delete multiple files in a single batch operation.
181    ///
182    /// `remote_root` is the remote base directory, `relative_paths` are paths
183    /// relative to it. Uses `rclone delete --files-from` for rclone backends.
184    ///
185    /// Returns a map of relative_path → Ok/Err for per-file status tracking.
186    /// Default implementation falls back to sequential `delete()` calls.
187    async fn delete_batch(
188        &self,
189        remote_root: &str,
190        relative_paths: &[String],
191    ) -> HashMap<String, Result<(), InfraError>> {
192        let mut results = HashMap::with_capacity(relative_paths.len());
193        for rel in relative_paths {
194            let remote_path = if remote_root.is_empty() {
195                rel.clone()
196            } else {
197                format!("{remote_root}/{rel}")
198            };
199            let result = self.delete(&remote_path).await;
200            results.insert(rel.clone(), result);
201        }
202        results
203    }
204
205    /// Whether this backend supports efficient batch push/pull.
206    ///
207    /// When true, callers should prefer `push_batch`/`pull_batch`/`delete_batch`
208    /// over individual calls. Default: false (sequential fallback).
209    fn supports_batch(&self) -> bool {
210        false
211    }
212
213    /// Backend type name for display and config matching.
214    fn backend_type(&self) -> &str;
215
216    /// Set a progress callback for batch operations.
217    ///
218    /// Called by the sync engine before batch execution.
219    /// Implementations that support chunked transfers (e.g. RcloneBackend)
220    /// should call this callback on chunk completion.
221    /// Default: no-op (callback is ignored).
222    fn set_progress_callback(&self, _callback: Option<ProgressFn>) {}
223
224    /// 外部ツールの到達確認 + 確保。
225    ///
226    /// - rclone: バイナリ存在確認 → なければインストール → 接続テスト
227    /// - memory: 常にOk
228    ///
229    /// デフォルト実装: `list("")` で接続テスト(バイナリが存在しなければここで失敗する)。
230    async fn ensure(&self) -> Result<(), InfraError> {
231        self.list("").await.map(|_| ())
232    }
233}
234
235/// In-memory backend for testing.
236#[cfg(any(test, feature = "test-utils"))]
237pub mod memory {
238    use super::*;
239    use std::collections::HashMap;
240    use tokio::sync::Mutex;
241
242    /// Records operations for test assertions.
243    pub struct InMemoryBackend {
244        pub log: Mutex<Vec<Op>>,
245        pub fail_next: Mutex<bool>,
246        pub files: Mutex<HashMap<String, Vec<u8>>>,
247    }
248
249    impl Default for InMemoryBackend {
250        fn default() -> Self {
251            Self {
252                log: Mutex::new(Vec::new()),
253                fail_next: Mutex::new(false),
254                files: Mutex::new(HashMap::new()),
255            }
256        }
257    }
258
259    #[derive(Debug, Clone)]
260    pub enum Op {
261        Push { local: String, remote: String },
262        Pull { remote: String, local: String },
263        List { path: String },
264        Exists { path: String },
265        Delete { path: String },
266    }
267
268    #[async_trait]
269    impl StorageBackend for InMemoryBackend {
270        async fn push(&self, local_path: &Path, remote_path: &str) -> Result<(), InfraError> {
271            self.log.lock().await.push(Op::Push {
272                local: local_path.display().to_string(),
273                remote: remote_path.into(),
274            });
275            let mut guard = self.fail_next.lock().await;
276            if *guard {
277                *guard = false;
278                return Err(InfraError::Transfer {
279                    reason: "mock push error".into(),
280                });
281            }
282            Ok(())
283        }
284
285        async fn pull(&self, remote_path: &str, local_path: &Path) -> Result<(), InfraError> {
286            self.log.lock().await.push(Op::Pull {
287                remote: remote_path.into(),
288                local: local_path.display().to_string(),
289            });
290            let mut guard = self.fail_next.lock().await;
291            if *guard {
292                *guard = false;
293                return Err(InfraError::Transfer {
294                    reason: "mock pull error".into(),
295                });
296            }
297            Ok(())
298        }
299
300        async fn list(&self, remote_path: &str) -> Result<Vec<RemoteFile>, InfraError> {
301            self.log.lock().await.push(Op::List {
302                path: remote_path.into(),
303            });
304            let files = self.files.lock().await;
305            Ok(files
306                .iter()
307                .map(|(path, data)| RemoteFile {
308                    path: path.clone(),
309                    size: Some(data.len() as u64),
310                    modified_at: None,
311                })
312                .collect())
313        }
314
315        async fn exists(&self, remote_path: &str) -> Result<bool, InfraError> {
316            self.log.lock().await.push(Op::Exists {
317                path: remote_path.into(),
318            });
319            Ok(self.files.lock().await.contains_key(remote_path))
320        }
321
322        async fn delete(&self, remote_path: &str) -> Result<(), InfraError> {
323            self.log.lock().await.push(Op::Delete {
324                path: remote_path.into(),
325            });
326            let mut guard = self.fail_next.lock().await;
327            if *guard {
328                *guard = false;
329                return Err(InfraError::Transfer {
330                    reason: "mock delete error".into(),
331                });
332            }
333            self.files.lock().await.remove(remote_path);
334            Ok(())
335        }
336
337        fn backend_type(&self) -> &str {
338            "memory"
339        }
340    }
341
342    /// Blanket impl so `Arc<InMemoryBackend>` can be used as a `StorageBackend`.
343    ///
344    /// Avoids orphan-rule workarounds (newtype wrapper) in every test module.
345    #[async_trait]
346    impl StorageBackend for std::sync::Arc<InMemoryBackend> {
347        async fn push(&self, local_path: &Path, remote_path: &str) -> Result<(), InfraError> {
348            (**self).push(local_path, remote_path).await
349        }
350        async fn pull(&self, remote_path: &str, local_path: &Path) -> Result<(), InfraError> {
351            (**self).pull(remote_path, local_path).await
352        }
353        async fn list(&self, remote_path: &str) -> Result<Vec<RemoteFile>, InfraError> {
354            (**self).list(remote_path).await
355        }
356        async fn exists(&self, remote_path: &str) -> Result<bool, InfraError> {
357            (**self).exists(remote_path).await
358        }
359        async fn delete(&self, remote_path: &str) -> Result<(), InfraError> {
360            (**self).delete(remote_path).await
361        }
362        async fn push_batch(
363            &self,
364            src_root: &Path,
365            dest_root: &str,
366            relative_paths: &[String],
367        ) -> HashMap<String, Result<(), InfraError>> {
368            (**self)
369                .push_batch(src_root, dest_root, relative_paths)
370                .await
371        }
372        async fn delete_batch(
373            &self,
374            remote_root: &str,
375            relative_paths: &[String],
376        ) -> HashMap<String, Result<(), InfraError>> {
377            (**self).delete_batch(remote_root, relative_paths).await
378        }
379        fn supports_batch(&self) -> bool {
380            (**self).supports_batch()
381        }
382        fn backend_type(&self) -> &str {
383            (**self).backend_type()
384        }
385        async fn ensure(&self) -> Result<(), InfraError> {
386            (**self).ensure().await
387        }
388        fn set_progress_callback(&self, callback: Option<ProgressFn>) {
389            (**self).set_progress_callback(callback);
390        }
391    }
392}