Skip to main content

vdsl_sync/infra/
rclone.rs

1//! Rclone-based storage backend.
2//!
3//! Executes `rclone` CLI commands for file transfer to/from cloud storage.
4//! Supports any rclone-compatible remote (B2, S3, GCS, etc.).
5//!
6//! Commands are executed via a [`RemoteShell`],
7//! enabling transfer from different hosts (local machine, GPU pod, etc.).
8
9use std::collections::HashMap;
10use std::path::Path;
11use std::sync::Mutex as StdMutex;
12
13use async_trait::async_trait;
14use chrono::{DateTime, NaiveDateTime, Utc};
15use secrecy::{ExposeSecret, SecretBox};
16
17use super::backend::{ProgressFn, RemoteFile, StorageBackend};
18use super::shell::{LocalShell, RemoteShell};
19use crate::infra::error::InfraError;
20
21/// Default rclone command timeout (seconds).
22///
23/// Applies to individual rclone operations (copyto, lsf, deletefile).
24/// Batch operations (`rclone copy --files-from`) use a scaled timeout.
25const DEFAULT_RCLONE_TIMEOUT_SECS: u64 = 300;
26
27/// Environment variable to override the default rclone timeout.
28///
29/// Set via MCP server config or shell environment.
30/// Value: timeout in seconds (e.g. `VDSL_RCLONE_TIMEOUT=600`).
31const RCLONE_TIMEOUT_ENV: &str = "VDSL_RCLONE_TIMEOUT";
32
33/// Minimum timeout floor (seconds). Prevents misconfiguration.
34const MIN_RCLONE_TIMEOUT_SECS: u64 = 10;
35
36/// Per-file additional timeout for batch operations (seconds).
37///
38/// Batch timeout = base_timeout + (file_count * BATCH_PER_FILE_TIMEOUT_SECS).
39/// Prevents large batches from hitting the timeout while small batches
40/// still fail fast on network issues.
41const BATCH_PER_FILE_TIMEOUT_SECS: u64 = 30;
42
43/// Extra rclone flags for SFTP remotes to reduce protocol overhead.
44///
45/// SFTP performs per-file round-trips for modtime setting and hash verification
46/// after each transfer. On high-latency / low-bandwidth links (home ISP upload),
47/// this overhead dominates — observed 100 KB/s vs 1 MB/s on the same link via
48/// other protocols. These flags eliminate the extra round-trips:
49///
50/// - `--sftp-set-modtime=false`: skip post-transfer SFTP setstat for mtime
51///   (vdsl-sync uses sha256 for identity, not mtime)
52/// - `--sftp-disable-hashcheck`: skip post-transfer sha256sum via SFTP exec
53///   (vdsl-sync runs its own batch_inspect for verification)
54const SFTP_OPTIMIZATION_FLAGS: &[&str] = &["--sftp-set-modtime=false", "--sftp-disable-hashcheck"];
55
56/// Chunk size for SFTP batch transfers.
57///
58/// Large SFTP batches (thousands of files) cause rclone to stall due to
59/// SFTP session management overhead. Chunking into smaller batches with
60/// per-chunk progress logging and retry prevents hangs and provides visibility.
61///
62/// Non-SFTP backends (B2, S3) handle large batches natively — no chunking.
63const SFTP_BATCH_CHUNK_SIZE: usize = 100;
64
65/// Maximum retries per chunk on failure.
66const BATCH_CHUNK_MAX_RETRIES: u32 = 1;
67
68/// Resolve rclone timeout from: explicit > env > default.
69///
70/// Priority:
71/// 1. `explicit` — passed at construction (Config / API)
72/// 2. `VDSL_RCLONE_TIMEOUT` env var — set via MCP or shell
73/// 3. `DEFAULT_RCLONE_TIMEOUT_SECS` — compile-time default
74///
75/// Floor: `MIN_RCLONE_TIMEOUT_SECS` (guards against misconfiguration).
76fn resolve_timeout(explicit: Option<u64>) -> u64 {
77    let raw = explicit
78        .or_else(|| {
79            std::env::var(RCLONE_TIMEOUT_ENV)
80                .ok()
81                .and_then(|v| v.parse::<u64>().ok())
82        })
83        .unwrap_or(DEFAULT_RCLONE_TIMEOUT_SECS);
84    raw.max(MIN_RCLONE_TIMEOUT_SECS)
85}
86
87/// Rclone CLI storage backend.
88///
89/// Uses inline credentials via `:backend,key=val:bucket/path` syntax
90/// to avoid requiring global rclone config files.
91///
92/// The remote string is wrapped in [`SecretBox`] to prevent accidental
93/// logging of embedded credentials.
94///
95/// Commands are executed via a [`RemoteShell`], defaulting to [`LocalShell`].
96///
97/// # Timeout configuration
98///
99/// Resolution order: explicit (`with_timeout`) > env (`VDSL_RCLONE_TIMEOUT`) > default (300s).
100/// Batch operations scale the timeout by file count.
101pub struct RcloneBackend {
102    /// Rclone remote string, e.g. `:b2,account=KEY_ID,key=KEY:bucket`.
103    /// Wrapped in Secret to prevent accidental credential exposure in logs.
104    remote: SecretBox<String>,
105    /// Shell for executing rclone commands.
106    shell: Box<dyn RemoteShell>,
107    /// Per-command timeout in seconds.
108    timeout_secs: u64,
109    /// Progress callback for reporting chunk completion during batch transfers.
110    progress: StdMutex<Option<ProgressFn>>,
111}
112
113impl RcloneBackend {
114    /// Create a new RcloneBackend with the given remote string.
115    ///
116    /// Timeout: env `VDSL_RCLONE_TIMEOUT` or default 300s.
117    /// Uses [`LocalShell`] for command execution (backward compatible).
118    ///
119    /// # Example
120    /// ```no_run
121    /// # use vdsl_sync::RcloneBackend;
122    /// let backend = RcloneBackend::new(":b2,account=key_id,key=secret:my-bucket");
123    /// ```
124    pub fn new(remote: impl Into<String>) -> Self {
125        Self {
126            remote: SecretBox::new(Box::new(remote.into())),
127            shell: Box::new(LocalShell),
128            timeout_secs: resolve_timeout(None),
129            progress: StdMutex::new(None),
130        }
131    }
132
133    /// Create with a custom [`RemoteShell`] (e.g. PodShell for GPU pod execution).
134    pub fn with_shell(remote: impl Into<String>, shell: Box<dyn RemoteShell>) -> Self {
135        Self {
136            remote: SecretBox::new(Box::new(remote.into())),
137            shell,
138            timeout_secs: resolve_timeout(None),
139            progress: StdMutex::new(None),
140        }
141    }
142
143    /// Set an explicit timeout (seconds), overriding env and default.
144    ///
145    /// Useful when constructing from parsed config values.
146    /// Floor: 10 seconds.
147    pub fn with_timeout(mut self, timeout_secs: u64) -> Self {
148        self.timeout_secs = resolve_timeout(Some(timeout_secs));
149        self
150    }
151
152    /// Build the full remote path for a given relative path.
153    ///
154    /// Validates against CLI flag injection (`-` prefix) and
155    /// path traversal (`..` segments).
156    fn remote_path(&self, path: &str) -> Result<String, InfraError> {
157        let path = path.trim_matches('/');
158        // Reject paths that look like CLI flags (argument injection)
159        if path.starts_with('-') {
160            return Err(InfraError::Transfer {
161                reason: format!("invalid remote path (starts with '-'): {path}"),
162            });
163        }
164        // Reject path traversal attempts
165        if path.split('/').any(|seg| seg == "..") {
166            return Err(InfraError::Transfer {
167                reason: format!("invalid remote path (contains '..' traversal): {path}"),
168            });
169        }
170        let remote = self.remote.expose_secret();
171        if path.is_empty() {
172            Ok(remote.clone())
173        } else {
174            Ok(format!("{remote}/{path}"))
175        }
176    }
177
178    /// Whether this backend targets an SFTP remote.
179    ///
180    /// Used to inject SFTP-specific optimization flags that eliminate
181    /// per-file round-trips (modtime set, hash check).
182    fn is_sftp(&self) -> bool {
183        self.remote.expose_secret().starts_with(":sftp")
184    }
185
186    /// Execute an rclone command via the configured shell.
187    ///
188    /// Uses the configured timeout (`with_timeout` > `VDSL_RCLONE_TIMEOUT` > 300s).
189    /// Callers needing a different timeout (e.g. batch) should use `exec_rclone_with_timeout`.
190    async fn exec_rclone(&self, args: &[&str]) -> Result<String, InfraError> {
191        self.exec_rclone_with_timeout(args, self.timeout_secs).await
192    }
193
194    /// Execute an rclone command with an explicit timeout.
195    ///
196    /// Automatically appends SFTP optimization flags when the remote
197    /// is an SFTP target (see [`SFTP_OPTIMIZATION_FLAGS`]).
198    async fn exec_rclone_with_timeout(
199        &self,
200        args: &[&str],
201        timeout_secs: u64,
202    ) -> Result<String, InfraError> {
203        let mut full_args = vec!["rclone"];
204        full_args.extend_from_slice(args);
205        if self.is_sftp() {
206            full_args.extend_from_slice(SFTP_OPTIMIZATION_FLAGS);
207        }
208
209        let output = self.shell.exec(&full_args, Some(timeout_secs)).await?;
210
211        if !output.success {
212            return Err(InfraError::Transfer {
213                reason: format!(
214                    "rclone failed (exit {}): {}",
215                    output
216                        .exit_code
217                        .map_or("signal".to_string(), |c| c.to_string()),
218                    output.stderr.trim()
219                ),
220            });
221        }
222
223        Ok(output.stdout)
224    }
225}
226
227#[async_trait]
228impl StorageBackend for RcloneBackend {
229    async fn push(&self, local_path: &Path, remote_path: &str) -> Result<(), InfraError> {
230        let dest = self.remote_path(remote_path)?;
231        let local_str = local_path.to_str().ok_or_else(|| -> InfraError {
232            InfraError::Transfer {
233                reason: format!(
234                    "local path is not valid UTF-8: {}",
235                    local_path.to_string_lossy()
236                ),
237            }
238        })?;
239        self.exec_rclone(&["copyto", local_str, &dest]).await?;
240        Ok(())
241    }
242
243    async fn pull(&self, remote_path: &str, local_path: &Path) -> Result<(), InfraError> {
244        let src = self.remote_path(remote_path)?;
245        // Ensure parent directory exists via shell (works for both local and remote hosts)
246        if let Some(parent) = local_path.parent() {
247            if let Some(parent_str) = parent.to_str() {
248                if !parent_str.is_empty() {
249                    let _ = self
250                        .shell
251                        .exec(&["mkdir", "-p", parent_str], Some(10))
252                        .await;
253                }
254            }
255        }
256        let local_str = local_path.to_str().ok_or_else(|| -> InfraError {
257            InfraError::Transfer {
258                reason: format!(
259                    "local path is not valid UTF-8: {}",
260                    local_path.to_string_lossy()
261                ),
262            }
263        })?;
264        self.exec_rclone(&["copyto", &src, local_str]).await?;
265        Ok(())
266    }
267
268    async fn list(&self, remote_path: &str) -> Result<Vec<RemoteFile>, InfraError> {
269        let target = self.remote_path(remote_path)?;
270        // Format: "path;size;modtime" — modtime is ISO 8601 (e.g. "2024-01-15T10:30:00.000000000")
271        // --files-only excludes directory markers (B2/S3 "folders" are 0-byte objects
272        // that would be registered as files, causing phantom delete transfers).
273        let output = self
274            .exec_rclone(&["lsf", "-R", "--format", "pst", "--files-only", &target])
275            .await?;
276
277        let mut files = Vec::new();
278        for line in output.lines() {
279            let parts: Vec<&str> = line.splitn(3, ';').collect();
280            if parts.len() < 2 {
281                continue;
282            }
283            let path = parts[0];
284            let size = match parts[1].trim().parse::<u64>() {
285                Ok(s) => Some(s),
286                Err(e) => {
287                    tracing::debug!(
288                        path = path,
289                        raw_size = parts[1].trim(),
290                        error = %e,
291                        "rclone lsf: size parse failed, treating as unknown"
292                    );
293                    None
294                }
295            };
296            let modified_at = parts.get(2).and_then(|ts| parse_rclone_timestamp(ts));
297            files.push(RemoteFile {
298                path: path.to_string(),
299                size,
300                modified_at,
301            });
302        }
303        Ok(files)
304    }
305
306    /// Note: returns `Ok(false)` on any rclone error (including network failures).
307    /// This is a best-effort check — callers must not rely on `false` meaning
308    /// "confirmed absent". Use `push`/`pull` for authoritative operations.
309    async fn exists(&self, remote_path: &str) -> Result<bool, InfraError> {
310        let target = self.remote_path(remote_path)?;
311        let result = self.exec_rclone(&["lsf", &target]).await;
312        match result {
313            Ok(output) => Ok(!output.trim().is_empty()),
314            Err(e) => {
315                tracing::debug!(
316                    remote_path = remote_path,
317                    error = %e,
318                    "rclone exists check failed, returning false"
319                );
320                Ok(false)
321            }
322        }
323    }
324
325    async fn delete(&self, remote_path: &str) -> Result<(), InfraError> {
326        let target = self.remote_path(remote_path)?;
327        // rclone deletefile removes a single file (not directory).
328        // --retries 1 to fail fast on permanent errors.
329        //
330        // Per StorageBackend::delete contract: "Ok(()) if the file was
331        // deleted or didn't exist". rclone exit 4 = "not found" satisfies
332        // the postcondition (file is absent at dest), so we treat it as Ok.
333        match self
334            .exec_rclone(&["deletefile", &target, "--retries", "1"])
335            .await
336        {
337            Ok(_) => Ok(()),
338            Err(e) => {
339                let msg = e.to_string();
340                // rclone exit code 4 = object/directory not found.
341                // The delete goal (file absent at dest) is already met.
342                if msg.contains("exit 4") || msg.contains("not found") {
343                    tracing::debug!(
344                        remote_path = remote_path,
345                        "rclone deletefile: object already absent, treating as success"
346                    );
347                    Ok(())
348                } else {
349                    Err(e)
350                }
351            }
352        }
353    }
354
355    /// Move a file to an archive location via `rclone moveto`.
356    ///
357    /// Used for soft-delete on cold storage: the file is relocated (not copied)
358    /// to an archive prefix, preserving content with a new path/revision.
359    /// `rclone moveto` is atomic at the object-store level for B2/S3.
360    async fn archive_move(
361        &self,
362        src_remote_path: &str,
363        archive_remote_path: &str,
364    ) -> Result<(), InfraError> {
365        let src = self.remote_path(src_remote_path)?;
366        let dest = self.remote_path(archive_remote_path)?;
367        match self
368            .exec_rclone(&["moveto", &src, &dest, "--retries", "1"])
369            .await
370        {
371            Ok(_) => Ok(()),
372            Err(e) => {
373                let msg = e.to_string();
374                // rclone exit 4 (not found) = already absent at src.
375                // Archive goal (original absent) is satisfied, though the
376                // revision record is missing. Treat as success for idempotence.
377                if msg.contains("exit 4") || msg.contains("not found") {
378                    tracing::debug!(
379                        src = src_remote_path,
380                        dest = archive_remote_path,
381                        "rclone moveto: src already absent, treating as success"
382                    );
383                    Ok(())
384                } else {
385                    Err(e)
386                }
387            }
388        }
389    }
390
391    /// Batch push using `rclone copy --files-from`.
392    ///
393    /// For SFTP remotes, splits into chunks of [`SFTP_BATCH_CHUNK_SIZE`] files
394    /// with per-chunk progress logging and retry. Non-SFTP backends run as
395    /// a single batch (rclone handles large batches natively for B2/S3).
396    ///
397    /// Returns per-file Ok/Err.
398    async fn push_batch(
399        &self,
400        src_root: &Path,
401        dest_root: &str,
402        relative_paths: &[String],
403    ) -> HashMap<String, Result<(), InfraError>> {
404        if relative_paths.is_empty() {
405            return HashMap::new();
406        }
407
408        let dest_full = match self.remote_path(dest_root) {
409            Ok(d) => d,
410            Err(_) => {
411                let reason = format!("invalid dest_root for batch push: {dest_root}");
412                return Self::all_batch_err(relative_paths, &reason);
413            }
414        };
415
416        let src_root_str = match src_root.to_str() {
417            Some(s) => s.to_string(),
418            None => {
419                let reason = format!(
420                    "src_root is not valid UTF-8: {}",
421                    src_root.to_string_lossy()
422                );
423                return Self::all_batch_err(relative_paths, &reason);
424            }
425        };
426
427        self.exec_batch_chunked(
428            relative_paths,
429            "push",
430            |chunk, list_filename, sftp_flags, _chunk_timeout| {
431                let file_list = chunk.join("\n");
432                let src = &src_root_str;
433                let dest = &dest_full;
434                format!(
435                    "cat <<'__VDSL_EOF__' > /tmp/{list_filename}\n\
436                     {file_list}\n\
437                     __VDSL_EOF__\n\
438                     rclone copy {src} {dest} \
439                       --files-from /tmp/{list_filename} --transfers 8{sftp_flags}; \
440                     _rc=$?; rm -f /tmp/{list_filename}; exit $_rc"
441                )
442            },
443        )
444        .await
445    }
446
447    /// Batch pull using `rclone copy --files-from`.
448    ///
449    /// For SFTP remotes, splits into chunks with progress logging and retry.
450    async fn pull_batch(
451        &self,
452        src_root: &str,
453        dest_root: &Path,
454        relative_paths: &[String],
455    ) -> HashMap<String, Result<(), InfraError>> {
456        if relative_paths.is_empty() {
457            return HashMap::new();
458        }
459
460        let src_full = match self.remote_path(src_root) {
461            Ok(s) => s,
462            Err(_) => {
463                let reason = format!("invalid src_root for batch pull: {src_root}");
464                return Self::all_batch_err(relative_paths, &reason);
465            }
466        };
467
468        let dest_root_str = match dest_root.to_str() {
469            Some(s) => s.to_string(),
470            None => {
471                let reason = format!(
472                    "dest_root is not valid UTF-8: {}",
473                    dest_root.to_string_lossy()
474                );
475                return Self::all_batch_err(relative_paths, &reason);
476            }
477        };
478
479        self.exec_batch_chunked(
480            relative_paths,
481            "pull",
482            |chunk, list_filename, sftp_flags, _chunk_timeout| {
483                let file_list = chunk.join("\n");
484                let src = &src_full;
485                let dest = &dest_root_str;
486                format!(
487                    "cat <<'__VDSL_EOF__' > /tmp/{list_filename}\n\
488                     {file_list}\n\
489                     __VDSL_EOF__\n\
490                     rclone copy {src} {dest} \
491                       --files-from /tmp/{list_filename} --transfers 8{sftp_flags}; \
492                     _rc=$?; rm -f /tmp/{list_filename}; exit $_rc"
493                )
494            },
495        )
496        .await
497    }
498
499    /// Batch delete using `rclone delete --files-from`.
500    ///
501    /// For SFTP remotes, splits into chunks with progress logging and retry.
502    async fn delete_batch(
503        &self,
504        remote_root: &str,
505        relative_paths: &[String],
506    ) -> HashMap<String, Result<(), InfraError>> {
507        if relative_paths.is_empty() {
508            return HashMap::new();
509        }
510
511        let remote_full = match self.remote_path(remote_root) {
512            Ok(r) => r,
513            Err(_) => {
514                return Self::all_batch_err(
515                    relative_paths,
516                    &format!("invalid remote_root for batch delete: {remote_root}"),
517                );
518            }
519        };
520
521        self.exec_batch_chunked(
522            relative_paths,
523            "delete",
524            |chunk, list_filename, sftp_flags, _chunk_timeout| {
525                let file_list = chunk.join("\n");
526                let dest = &remote_full;
527                format!(
528                    "cat <<'__VDSL_EOF__' > /tmp/{list_filename}\n\
529                     {file_list}\n\
530                     __VDSL_EOF__\n\
531                     rclone delete {dest} \
532                       --files-from /tmp/{list_filename} --transfers 8{sftp_flags}; \
533                     _rc=$?; rm -f /tmp/{list_filename}; exit $_rc"
534                )
535            },
536        )
537        .await
538    }
539
540    /// Batch archive-move using `rclone move --files-from`.
541    ///
542    /// Moves files from `src_root` to `archive_dest_root` preserving relative
543    /// paths. Uses the same chunked execution as other batch operations.
544    async fn archive_move_batch(
545        &self,
546        src_root: &str,
547        archive_dest_root: &str,
548        relative_paths: &[String],
549    ) -> HashMap<String, Result<(), InfraError>> {
550        if relative_paths.is_empty() {
551            return HashMap::new();
552        }
553
554        let src_full = match self.remote_path(src_root) {
555            Ok(r) => r,
556            Err(_) => {
557                return Self::all_batch_err(
558                    relative_paths,
559                    &format!("invalid src_root for batch archive_move: {src_root}"),
560                );
561            }
562        };
563
564        let dest_full = match self.remote_path(archive_dest_root) {
565            Ok(r) => r,
566            Err(_) => {
567                return Self::all_batch_err(
568                    relative_paths,
569                    &format!(
570                        "invalid archive_dest_root for batch archive_move: {archive_dest_root}"
571                    ),
572                );
573            }
574        };
575
576        self.exec_batch_chunked(
577            relative_paths,
578            "archive_move",
579            |chunk, list_filename, sftp_flags, _chunk_timeout| {
580                let file_list = chunk.join("\n");
581                let src = &src_full;
582                let dest = &dest_full;
583                format!(
584                    "cat <<'__VDSL_EOF__' > /tmp/{list_filename}\n\
585                     {file_list}\n\
586                     __VDSL_EOF__\n\
587                     rclone move {src} {dest} \
588                       --files-from /tmp/{list_filename} --transfers 8{sftp_flags}; \
589                     _rc=$?; rm -f /tmp/{list_filename}; exit $_rc"
590                )
591            },
592        )
593        .await
594    }
595
596    fn supports_batch(&self) -> bool {
597        true
598    }
599
600    fn backend_type(&self) -> &str {
601        "rclone"
602    }
603
604    fn set_progress_callback(&self, callback: Option<ProgressFn>) {
605        if let Ok(mut guard) = self.progress.lock() {
606            *guard = callback;
607        }
608    }
609
610    async fn ensure(&self) -> Result<(), InfraError> {
611        // Step 1: rclone バイナリの存在確認
612        let check = self.shell.exec(&["which", "rclone"], Some(10)).await;
613        let rclone_found = matches!(&check, Ok(out) if out.success);
614
615        if !rclone_found {
616            // Step 2: インストール試行(.deb直接ダウンロード — unzip依存なし)
617            tracing::info!("rclone not found, attempting install via .deb package");
618            let install_script = concat!(
619                "curl -sL https://downloads.rclone.org/rclone-current-linux-amd64.deb -o /tmp/rclone.deb",
620                " && dpkg -i /tmp/rclone.deb",
621                " && rm -f /tmp/rclone.deb",
622            );
623            let install_result = self.shell.exec_script(install_script, Some(120)).await;
624
625            match &install_result {
626                Ok(out) if out.success => {
627                    tracing::info!("rclone installed successfully via .deb");
628                }
629                Ok(out) => {
630                    // dpkg失敗 → install.sh にフォールバック
631                    tracing::debug!(
632                        exit_code = out.exit_code,
633                        stderr = out.stderr.trim(),
634                        "dpkg install failed, falling back to install.sh"
635                    );
636                    // install.sh は rclone.zip を展開するため unzip が必要。
637                    // 一部の最小 pod イメージには unzip が無いので apt で先に入れる。
638                    let fallback_script = concat!(
639                        "(command -v unzip >/dev/null 2>&1 || ",
640                        "(apt-get update -qq && apt-get install -y -qq unzip)) && ",
641                        "curl -sL https://rclone.org/install.sh | bash",
642                    );
643                    let fallback = self.shell.exec_script(fallback_script, Some(180)).await;
644                    match &fallback {
645                        Ok(o) if o.success => {
646                            tracing::info!("rclone installed successfully via install.sh");
647                        }
648                        Ok(o) => {
649                            return Err(InfraError::Init(format!(
650                                "rclone install failed (exit {}): {}",
651                                o.exit_code.unwrap_or(-1),
652                                o.stderr.trim()
653                            )));
654                        }
655                        Err(e) => {
656                            return Err(InfraError::Init(format!(
657                                "rclone install.sh exec failed: {e}"
658                            )));
659                        }
660                    }
661                }
662                Err(e) => {
663                    return Err(InfraError::Init(format!(
664                        "rclone .deb install exec failed: {e}"
665                    )));
666                }
667            }
668
669            // Step 3: インストール後の再確認
670            let recheck = self.shell.exec(&["which", "rclone"], Some(10)).await;
671            match &recheck {
672                Ok(out) if out.success => {}
673                _ => {
674                    return Err(InfraError::Init(
675                        "rclone still not found after install attempt".to_string(),
676                    ));
677                }
678            }
679        }
680
681        // Step 4: 接続テスト(rclone lsf でバケットルートにアクセス)
682        let remote = self.remote.expose_secret();
683        self.exec_rclone_with_timeout(&["lsf", "--max-depth", "1", remote], 30)
684            .await
685            .map_err(|e| InfraError::Init(format!("rclone connectivity test failed: {e}")))?;
686
687        Ok(())
688    }
689}
690
691impl RcloneBackend {
692    /// SFTP optimization flags as a space-separated string for shell scripts.
693    ///
694    /// Returns `" --sftp-set-modtime=false --sftp-disable-hashcheck"` for SFTP,
695    /// empty string otherwise.
696    fn sftp_flags_for_script(&self) -> &'static str {
697        if self.is_sftp() {
698            " --sftp-set-modtime=false --sftp-disable-hashcheck"
699        } else {
700            ""
701        }
702    }
703
704    /// Chunk size for this backend. SFTP uses small chunks; others run all-at-once.
705    fn batch_chunk_size(&self) -> usize {
706        if self.is_sftp() {
707            SFTP_BATCH_CHUNK_SIZE
708        } else {
709            usize::MAX // no chunking for non-SFTP
710        }
711    }
712
713    /// Execute a batch operation in chunks with progress logging and retry.
714    ///
715    /// `build_script` receives (chunk_paths, list_filename, sftp_flags, chunk_timeout)
716    /// and returns the shell script to execute.
717    ///
718    /// For SFTP: splits into [`SFTP_BATCH_CHUNK_SIZE`] chunks, logs progress
719    /// per chunk, retries failed chunks once.
720    /// For non-SFTP: runs as a single batch (chunk_size = usize::MAX).
721    async fn exec_batch_chunked<F>(
722        &self,
723        relative_paths: &[String],
724        operation: &str,
725        build_script: F,
726    ) -> HashMap<String, Result<(), InfraError>>
727    where
728        F: Fn(&[String], &str, &str, u64) -> String,
729    {
730        let chunk_size = self.batch_chunk_size();
731        let sftp_flags = self.sftp_flags_for_script();
732        let total = relative_paths.len();
733        let chunks: Vec<&[String]> = relative_paths.chunks(chunk_size).collect();
734        let num_chunks = chunks.len();
735
736        if num_chunks > 1 {
737            tracing::info!(
738                operation,
739                total,
740                num_chunks,
741                chunk_size,
742                "batch_{operation}: chunked transfer start"
743            );
744        }
745
746        let mut all_results = HashMap::with_capacity(total);
747        let mut completed = 0usize;
748
749        for (i, chunk) in chunks.iter().enumerate() {
750            let chunk_num = i + 1;
751            let chunk_timeout =
752                self.timeout_secs + (chunk.len() as u64 * BATCH_PER_FILE_TIMEOUT_SECS);
753            let list_filename =
754                format!("vdsl-{operation}-{}.txt", uuid::Uuid::new_v4().as_simple());
755
756            if num_chunks > 1 {
757                tracing::info!(
758                    operation,
759                    chunk = chunk_num,
760                    num_chunks,
761                    files = chunk.len(),
762                    completed,
763                    total,
764                    "batch_{operation}: chunk start"
765                );
766            }
767
768            let script = build_script(chunk, &list_filename, sftp_flags, chunk_timeout);
769
770            let mut attempt = 0u32;
771            let chunk_result = loop {
772                let result = self.shell.exec_script(&script, Some(chunk_timeout)).await;
773
774                match &result {
775                    Ok(output) if output.success => break Ok(()),
776                    Ok(output) => {
777                        let err_msg = format!(
778                            "rclone failed (exit {}): {}",
779                            output
780                                .exit_code
781                                .map_or("signal".to_string(), |c| c.to_string()),
782                            output.stderr.trim()
783                        );
784                        if attempt < BATCH_CHUNK_MAX_RETRIES {
785                            attempt += 1;
786                            tracing::warn!(
787                                operation,
788                                chunk = chunk_num,
789                                attempt,
790                                error = %err_msg,
791                                "batch_{operation}: chunk failed, retrying"
792                            );
793                            continue;
794                        }
795                        break Err(format!("batch {operation} failed: {err_msg}"));
796                    }
797                    Err(e) => {
798                        if attempt < BATCH_CHUNK_MAX_RETRIES {
799                            attempt += 1;
800                            tracing::warn!(
801                                operation,
802                                chunk = chunk_num,
803                                attempt,
804                                error = %e,
805                                "batch_{operation}: chunk failed, retrying"
806                            );
807                            continue;
808                        }
809                        break Err(format!("batch {operation} failed: {e}"));
810                    }
811                }
812            };
813
814            match chunk_result {
815                Ok(()) => {
816                    for p in *chunk {
817                        all_results.insert(p.clone(), Ok(()));
818                    }
819                    completed += chunk.len();
820                }
821                Err(reason) => {
822                    for p in *chunk {
823                        all_results.insert(
824                            p.clone(),
825                            Err(InfraError::Transfer {
826                                reason: reason.clone(),
827                            }),
828                        );
829                    }
830                    // Continue with next chunks — don't abort the entire batch
831                    tracing::error!(
832                        operation,
833                        chunk = chunk_num,
834                        failed_files = chunk.len(),
835                        reason = %reason,
836                        "batch_{operation}: chunk failed after retries, continuing"
837                    );
838                }
839            }
840
841            // Report progress via callback (if set).
842            if let Ok(guard) = self.progress.lock() {
843                if let Some(cb) = guard.as_ref() {
844                    cb(&format!(
845                        "{operation}: chunk {chunk_num}/{num_chunks} ({completed}/{total})"
846                    ));
847                }
848            }
849
850            if num_chunks > 1 {
851                tracing::info!(
852                    operation,
853                    chunk = chunk_num,
854                    num_chunks,
855                    completed,
856                    total,
857                    "batch_{operation}: chunk done"
858                );
859            }
860        }
861
862        if num_chunks > 1 {
863            let failed = total - completed;
864            tracing::info!(
865                operation,
866                total,
867                completed,
868                failed,
869                "batch_{operation}: all chunks done"
870            );
871        }
872
873        all_results
874    }
875
876    /// Helper: build all-error result map for batch operations.
877    fn all_batch_err(
878        relative_paths: &[String],
879        reason: &str,
880    ) -> HashMap<String, Result<(), InfraError>> {
881        relative_paths
882            .iter()
883            .map(|p| {
884                (
885                    p.clone(),
886                    Err(InfraError::Transfer {
887                        reason: reason.to_string(),
888                    }),
889                )
890            })
891            .collect()
892    }
893}
894
895/// Parse rclone's timestamp format into `DateTime<Utc>`.
896///
897/// rclone lsf `%t` outputs ISO 8601 with nanoseconds:
898/// `"2024-01-15T10:30:00.000000000"` (no timezone — always UTC).
899fn parse_rclone_timestamp(s: &str) -> Option<DateTime<Utc>> {
900    let trimmed = s.trim();
901    // Try full nanosecond format first, then fall back to simpler formats
902    NaiveDateTime::parse_from_str(trimmed, "%Y-%m-%dT%H:%M:%S%.f")
903        .or_else(|_| NaiveDateTime::parse_from_str(trimmed, "%Y-%m-%dT%H:%M:%S"))
904        .ok()
905        .map(|naive| naive.and_utc())
906}
907
908#[cfg(test)]
909mod tests {
910    use super::*;
911
912    #[test]
913    fn remote_path_construction() {
914        let b = RcloneBackend::new(":b2,account=kid,key=k:bucket");
915        assert_eq!(
916            b.remote_path("models/ckpt.safetensors").unwrap(),
917            ":b2,account=kid,key=k:bucket/models/ckpt.safetensors"
918        );
919        assert_eq!(
920            b.remote_path("/leading/slash").unwrap(),
921            ":b2,account=kid,key=k:bucket/leading/slash"
922        );
923        assert_eq!(b.remote_path("").unwrap(), ":b2,account=kid,key=k:bucket");
924    }
925
926    #[test]
927    fn remote_path_rejects_flag_like_input() {
928        let b = RcloneBackend::new("remote:bucket");
929        assert!(b.remote_path("--config=/etc/rclone.conf").is_err());
930        assert!(b.remote_path("-v").is_err());
931    }
932
933    #[test]
934    fn remote_path_rejects_traversal() {
935        let b = RcloneBackend::new("remote:bucket");
936        assert!(b.remote_path("../../etc/passwd").is_err());
937        assert!(b.remote_path("foo/../bar").is_err());
938        assert!(b.remote_path("..").is_err());
939        // Single dot is OK (current directory reference, harmless)
940        assert!(b.remote_path("./valid").is_ok());
941        // "..." is not "..", should be OK
942        assert!(b.remote_path("a/.../b").is_ok());
943    }
944
945    #[test]
946    fn backend_type() {
947        let b = RcloneBackend::new("remote:bucket");
948        assert_eq!(b.backend_type(), "rclone");
949    }
950
951    #[test]
952    fn parse_rclone_timestamp_nanoseconds() {
953        let ts = parse_rclone_timestamp("2024-01-15T10:30:00.123456789");
954        assert!(ts.is_some());
955        let dt = ts.unwrap();
956        assert_eq!(dt.year(), 2024);
957        assert_eq!(dt.month(), 1);
958        assert_eq!(dt.day(), 15);
959        assert_eq!(dt.hour(), 10);
960        assert_eq!(dt.minute(), 30);
961    }
962
963    #[test]
964    fn parse_rclone_timestamp_no_fraction() {
965        let ts = parse_rclone_timestamp("2024-01-15T10:30:00");
966        assert!(ts.is_some());
967    }
968
969    #[test]
970    fn parse_rclone_timestamp_invalid() {
971        assert!(parse_rclone_timestamp("not-a-date").is_none());
972        assert!(parse_rclone_timestamp("").is_none());
973    }
974
975    #[test]
976    fn is_sftp_detection() {
977        let sftp = RcloneBackend::new(":sftp,host=1.2.3.4,port=22,user=root:");
978        assert!(sftp.is_sftp());
979        assert_eq!(
980            sftp.sftp_flags_for_script(),
981            " --sftp-set-modtime=false --sftp-disable-hashcheck"
982        );
983
984        let b2 = RcloneBackend::new(":b2,account=kid,key=k:bucket");
985        assert!(!b2.is_sftp());
986        assert_eq!(b2.sftp_flags_for_script(), "");
987    }
988
989    use chrono::Datelike;
990    use chrono::Timelike;
991}