Skip to main content

vdsl_sync/infra/
backend.rs

1//! Abstract file transfer backend.
2//!
3//! Each remote location has an associated [`StorageBackend`] that handles
4//! push/pull/list/exists operations. vdsl-sync defines the trait;
5//! consumers (e.g. vdsl-mcp) provide concrete implementations.
6
7use std::collections::HashMap;
8use std::path::Path;
9use std::sync::Arc;
10
11use async_trait::async_trait;
12use chrono::{DateTime, Utc};
13
14use crate::infra::error::InfraError;
15
16/// Progress callback for reporting transfer phase changes.
17///
18/// Called with a human-readable phase description:
19/// - `"pushing to pod-xxx: chunk 5/22 (500/2111)"`
20/// - `"pulling from cloud: 1200 files"`
21/// - `"target cloud: 4383 queued"`
22pub type ProgressFn = Arc<dyn Fn(&str) + Send + Sync>;
23
24/// A file discovered on a remote location.
25///
26/// Metadata available depends on the storage backend:
27/// - `size`: most backends provide this (rclone lsf `%s`)
28/// - `modified_at`: available from rclone lsf `%t` (ISO 8601)
29///
30/// Used for metadata-based change detection on Cloud storage
31/// where content hash computation requires downloading the file.
32#[derive(Debug, Clone)]
33pub struct RemoteFile {
34    pub path: String,
35    pub size: Option<u64>,
36    /// Last modification time reported by the storage backend.
37    pub modified_at: Option<DateTime<Utc>>,
38}
39
40/// Abstract file transfer backend.
41///
42/// Implementations handle the actual data movement for a specific protocol.
43/// The sync service routes operations to the correct backend based on location.
44#[async_trait]
45pub trait StorageBackend: Send + Sync {
46    /// Push a local file to this remote.
47    async fn push(&self, local_path: &Path, remote_path: &str) -> Result<(), InfraError>;
48
49    /// Pull a file from this remote to a local path.
50    async fn pull(&self, remote_path: &str, local_path: &Path) -> Result<(), InfraError>;
51
52    /// List files at a remote path.
53    async fn list(&self, remote_path: &str) -> Result<Vec<RemoteFile>, InfraError>;
54
55    /// Check if a remote file exists.
56    async fn exists(&self, remote_path: &str) -> Result<bool, InfraError>;
57
58    /// Delete a file on this remote.
59    ///
60    /// Returns `Ok(())` if the file was deleted or didn't exist.
61    /// Default implementation returns `Err` — backends that support deletion
62    /// must override this.
63    async fn delete(&self, remote_path: &str) -> Result<(), InfraError> {
64        Err(InfraError::Transfer {
65            reason: format!(
66                "delete not supported by {} backend for path: {remote_path}",
67                self.backend_type()
68            ),
69        })
70    }
71
72    /// Push multiple files in a single batch operation.
73    ///
74    /// `src_root` is the local base directory, `dest_root` is the remote base,
75    /// and `relative_paths` are paths relative to both roots.
76    ///
77    /// Returns a map of relative_path → Ok/Err for per-file status tracking.
78    /// Default implementation falls back to sequential `push()` calls.
79    async fn push_batch(
80        &self,
81        src_root: &Path,
82        dest_root: &str,
83        relative_paths: &[String],
84    ) -> HashMap<String, Result<(), InfraError>> {
85        let mut results = HashMap::with_capacity(relative_paths.len());
86        for rel in relative_paths {
87            let local_path = src_root.join(rel);
88            let remote_path = if dest_root.is_empty() {
89                rel.clone()
90            } else {
91                format!("{dest_root}/{rel}")
92            };
93            let result = self.push(&local_path, &remote_path).await;
94            results.insert(rel.clone(), result);
95        }
96        results
97    }
98
99    /// Pull multiple files in a single batch operation.
100    ///
101    /// `src_root` is the remote base, `dest_root` is the local base directory,
102    /// and `relative_paths` are paths relative to both roots.
103    ///
104    /// Returns a map of relative_path → Ok/Err for per-file status tracking.
105    /// Default implementation falls back to sequential `pull()` calls.
106    async fn pull_batch(
107        &self,
108        src_root: &str,
109        dest_root: &Path,
110        relative_paths: &[String],
111    ) -> HashMap<String, Result<(), InfraError>> {
112        let mut results = HashMap::with_capacity(relative_paths.len());
113        for rel in relative_paths {
114            let remote_path = if src_root.is_empty() {
115                rel.clone()
116            } else {
117                format!("{src_root}/{rel}")
118            };
119            let local_path = dest_root.join(rel);
120            let result = self.pull(&remote_path, &local_path).await;
121            results.insert(rel.clone(), result);
122        }
123        results
124    }
125
126    /// Delete multiple files in a single batch operation.
127    ///
128    /// `remote_root` is the remote base directory, `relative_paths` are paths
129    /// relative to it. Uses `rclone delete --files-from` for rclone backends.
130    ///
131    /// Returns a map of relative_path → Ok/Err for per-file status tracking.
132    /// Default implementation falls back to sequential `delete()` calls.
133    async fn delete_batch(
134        &self,
135        remote_root: &str,
136        relative_paths: &[String],
137    ) -> HashMap<String, Result<(), InfraError>> {
138        let mut results = HashMap::with_capacity(relative_paths.len());
139        for rel in relative_paths {
140            let remote_path = if remote_root.is_empty() {
141                rel.clone()
142            } else {
143                format!("{remote_root}/{rel}")
144            };
145            let result = self.delete(&remote_path).await;
146            results.insert(rel.clone(), result);
147        }
148        results
149    }
150
151    /// Whether this backend supports efficient batch push/pull.
152    ///
153    /// When true, callers should prefer `push_batch`/`pull_batch`/`delete_batch`
154    /// over individual calls. Default: false (sequential fallback).
155    fn supports_batch(&self) -> bool {
156        false
157    }
158
159    /// Backend type name for display and config matching.
160    fn backend_type(&self) -> &str;
161
162    /// Set a progress callback for batch operations.
163    ///
164    /// Called by the sync engine before batch execution.
165    /// Implementations that support chunked transfers (e.g. RcloneBackend)
166    /// should call this callback on chunk completion.
167    /// Default: no-op (callback is ignored).
168    fn set_progress_callback(&self, _callback: Option<ProgressFn>) {}
169
170    /// 外部ツールの到達確認 + 確保。
171    ///
172    /// - rclone: バイナリ存在確認 → なければインストール → 接続テスト
173    /// - memory: 常にOk
174    ///
175    /// デフォルト実装: `list("")` で接続テスト(バイナリが存在しなければここで失敗する)。
176    async fn ensure(&self) -> Result<(), InfraError> {
177        self.list("").await.map(|_| ())
178    }
179}
180
181/// In-memory backend for testing.
182#[cfg(any(test, feature = "test-utils"))]
183pub mod memory {
184    use super::*;
185    use std::collections::HashMap;
186    use tokio::sync::Mutex;
187
188    /// Records operations for test assertions.
189    pub struct InMemoryBackend {
190        pub log: Mutex<Vec<Op>>,
191        pub fail_next: Mutex<bool>,
192        pub files: Mutex<HashMap<String, Vec<u8>>>,
193    }
194
195    impl Default for InMemoryBackend {
196        fn default() -> Self {
197            Self {
198                log: Mutex::new(Vec::new()),
199                fail_next: Mutex::new(false),
200                files: Mutex::new(HashMap::new()),
201            }
202        }
203    }
204
205    #[derive(Debug, Clone)]
206    pub enum Op {
207        Push { local: String, remote: String },
208        Pull { remote: String, local: String },
209        List { path: String },
210        Exists { path: String },
211        Delete { path: String },
212    }
213
214    #[async_trait]
215    impl StorageBackend for InMemoryBackend {
216        async fn push(&self, local_path: &Path, remote_path: &str) -> Result<(), InfraError> {
217            self.log.lock().await.push(Op::Push {
218                local: local_path.display().to_string(),
219                remote: remote_path.into(),
220            });
221            let mut guard = self.fail_next.lock().await;
222            if *guard {
223                *guard = false;
224                return Err(InfraError::Transfer {
225                    reason: "mock push error".into(),
226                }
227                .into());
228            }
229            Ok(())
230        }
231
232        async fn pull(&self, remote_path: &str, local_path: &Path) -> Result<(), InfraError> {
233            self.log.lock().await.push(Op::Pull {
234                remote: remote_path.into(),
235                local: local_path.display().to_string(),
236            });
237            let mut guard = self.fail_next.lock().await;
238            if *guard {
239                *guard = false;
240                return Err(InfraError::Transfer {
241                    reason: "mock pull error".into(),
242                }
243                .into());
244            }
245            Ok(())
246        }
247
248        async fn list(&self, remote_path: &str) -> Result<Vec<RemoteFile>, InfraError> {
249            self.log.lock().await.push(Op::List {
250                path: remote_path.into(),
251            });
252            let files = self.files.lock().await;
253            Ok(files
254                .iter()
255                .map(|(path, data)| RemoteFile {
256                    path: path.clone(),
257                    size: Some(data.len() as u64),
258                    modified_at: None,
259                })
260                .collect())
261        }
262
263        async fn exists(&self, remote_path: &str) -> Result<bool, InfraError> {
264            self.log.lock().await.push(Op::Exists {
265                path: remote_path.into(),
266            });
267            Ok(self.files.lock().await.contains_key(remote_path))
268        }
269
270        async fn delete(&self, remote_path: &str) -> Result<(), InfraError> {
271            self.log.lock().await.push(Op::Delete {
272                path: remote_path.into(),
273            });
274            let mut guard = self.fail_next.lock().await;
275            if *guard {
276                *guard = false;
277                return Err(InfraError::Transfer {
278                    reason: "mock delete error".into(),
279                }
280                .into());
281            }
282            self.files.lock().await.remove(remote_path);
283            Ok(())
284        }
285
286        fn backend_type(&self) -> &str {
287            "memory"
288        }
289    }
290
291    /// Blanket impl so `Arc<InMemoryBackend>` can be used as a `StorageBackend`.
292    ///
293    /// Avoids orphan-rule workarounds (newtype wrapper) in every test module.
294    #[async_trait]
295    impl StorageBackend for std::sync::Arc<InMemoryBackend> {
296        async fn push(&self, local_path: &Path, remote_path: &str) -> Result<(), InfraError> {
297            (**self).push(local_path, remote_path).await
298        }
299        async fn pull(&self, remote_path: &str, local_path: &Path) -> Result<(), InfraError> {
300            (**self).pull(remote_path, local_path).await
301        }
302        async fn list(&self, remote_path: &str) -> Result<Vec<RemoteFile>, InfraError> {
303            (**self).list(remote_path).await
304        }
305        async fn exists(&self, remote_path: &str) -> Result<bool, InfraError> {
306            (**self).exists(remote_path).await
307        }
308        async fn delete(&self, remote_path: &str) -> Result<(), InfraError> {
309            (**self).delete(remote_path).await
310        }
311        async fn push_batch(
312            &self,
313            src_root: &Path,
314            dest_root: &str,
315            relative_paths: &[String],
316        ) -> HashMap<String, Result<(), InfraError>> {
317            (**self)
318                .push_batch(src_root, dest_root, relative_paths)
319                .await
320        }
321        async fn delete_batch(
322            &self,
323            remote_root: &str,
324            relative_paths: &[String],
325        ) -> HashMap<String, Result<(), InfraError>> {
326            (**self).delete_batch(remote_root, relative_paths).await
327        }
328        fn supports_batch(&self) -> bool {
329            (**self).supports_batch()
330        }
331        fn backend_type(&self) -> &str {
332            (**self).backend_type()
333        }
334        async fn ensure(&self) -> Result<(), InfraError> {
335            (**self).ensure().await
336        }
337        fn set_progress_callback(&self, callback: Option<ProgressFn>) {
338            (**self).set_progress_callback(callback);
339        }
340    }
341}