Skip to main content

vdsl_sync/infra/
rclone.rs

1//! Rclone-based storage backend.
2//!
3//! Executes `rclone` CLI commands for file transfer to/from cloud storage.
4//! Supports any rclone-compatible remote (B2, S3, GCS, etc.).
5//!
6//! Commands are executed via a [`RemoteShell`],
7//! enabling transfer from different hosts (local machine, GPU pod, etc.).
8
9use std::collections::HashMap;
10use std::path::Path;
11use std::sync::Mutex as StdMutex;
12
13use async_trait::async_trait;
14use chrono::{DateTime, NaiveDateTime, Utc};
15use secrecy::{ExposeSecret, SecretBox};
16
17use super::backend::{ProgressFn, RemoteFile, StorageBackend};
18use super::shell::{LocalShell, RemoteShell};
19use crate::infra::error::InfraError;
20
21/// Default rclone command timeout (seconds).
22///
23/// Applies to individual rclone operations (copyto, lsf, deletefile).
24/// Batch operations (`rclone copy --files-from`) use a scaled timeout.
25const DEFAULT_RCLONE_TIMEOUT_SECS: u64 = 300;
26
27/// Environment variable to override the default rclone timeout.
28///
29/// Set via MCP server config or shell environment.
30/// Value: timeout in seconds (e.g. `VDSL_RCLONE_TIMEOUT=600`).
31const RCLONE_TIMEOUT_ENV: &str = "VDSL_RCLONE_TIMEOUT";
32
33/// Minimum timeout floor (seconds). Prevents misconfiguration.
34const MIN_RCLONE_TIMEOUT_SECS: u64 = 10;
35
36/// Per-file additional timeout for batch operations (seconds).
37///
38/// Batch timeout = base_timeout + (file_count * BATCH_PER_FILE_TIMEOUT_SECS).
39/// Prevents large batches from hitting the timeout while small batches
40/// still fail fast on network issues.
41const BATCH_PER_FILE_TIMEOUT_SECS: u64 = 30;
42
43/// Extra rclone flags for SFTP remotes to reduce protocol overhead.
44///
45/// SFTP performs per-file round-trips for modtime setting and hash verification
46/// after each transfer. On high-latency / low-bandwidth links (home ISP upload),
47/// this overhead dominates — observed 100 KB/s vs 1 MB/s on the same link via
48/// other protocols. These flags eliminate the extra round-trips:
49///
50/// - `--sftp-set-modtime=false`: skip post-transfer SFTP setstat for mtime
51///   (vdsl-sync uses sha256 for identity, not mtime)
52/// - `--sftp-disable-hashcheck`: skip post-transfer sha256sum via SFTP exec
53///   (vdsl-sync runs its own batch_inspect for verification)
54const SFTP_OPTIMIZATION_FLAGS: &[&str] = &["--sftp-set-modtime=false", "--sftp-disable-hashcheck"];
55
56/// Chunk size for SFTP batch transfers.
57///
58/// Large SFTP batches (thousands of files) cause rclone to stall due to
59/// SFTP session management overhead. Chunking into smaller batches with
60/// per-chunk progress logging and retry prevents hangs and provides visibility.
61///
62/// Non-SFTP backends (B2, S3) handle large batches natively — no chunking.
63const SFTP_BATCH_CHUNK_SIZE: usize = 100;
64
65/// Maximum retries per chunk on failure.
66const BATCH_CHUNK_MAX_RETRIES: u32 = 1;
67
68/// Resolve rclone timeout from: explicit > env > default.
69///
70/// Priority:
71/// 1. `explicit` — passed at construction (Config / API)
72/// 2. `VDSL_RCLONE_TIMEOUT` env var — set via MCP or shell
73/// 3. `DEFAULT_RCLONE_TIMEOUT_SECS` — compile-time default
74///
75/// Floor: `MIN_RCLONE_TIMEOUT_SECS` (guards against misconfiguration).
76fn resolve_timeout(explicit: Option<u64>) -> u64 {
77    let raw = explicit
78        .or_else(|| {
79            std::env::var(RCLONE_TIMEOUT_ENV)
80                .ok()
81                .and_then(|v| v.parse::<u64>().ok())
82        })
83        .unwrap_or(DEFAULT_RCLONE_TIMEOUT_SECS);
84    raw.max(MIN_RCLONE_TIMEOUT_SECS)
85}
86
87/// Rclone CLI storage backend.
88///
89/// Uses inline credentials via `:backend,key=val:bucket/path` syntax
90/// to avoid requiring global rclone config files.
91///
92/// The remote string is wrapped in [`SecretBox`] to prevent accidental
93/// logging of embedded credentials.
94///
95/// Commands are executed via a [`RemoteShell`], defaulting to [`LocalShell`].
96///
97/// # Timeout configuration
98///
99/// Resolution order: explicit (`with_timeout`) > env (`VDSL_RCLONE_TIMEOUT`) > default (300s).
100/// Batch operations scale the timeout by file count.
101pub struct RcloneBackend {
102    /// Rclone remote string, e.g. `:b2,account=KEY_ID,key=KEY:bucket`.
103    /// Wrapped in Secret to prevent accidental credential exposure in logs.
104    remote: SecretBox<String>,
105    /// Shell for executing rclone commands.
106    shell: Box<dyn RemoteShell>,
107    /// Per-command timeout in seconds.
108    timeout_secs: u64,
109    /// Progress callback for reporting chunk completion during batch transfers.
110    progress: StdMutex<Option<ProgressFn>>,
111}
112
113impl RcloneBackend {
114    /// Create a new RcloneBackend with the given remote string.
115    ///
116    /// Timeout: env `VDSL_RCLONE_TIMEOUT` or default 300s.
117    /// Uses [`LocalShell`] for command execution (backward compatible).
118    ///
119    /// # Example
120    /// ```no_run
121    /// # use vdsl_sync::RcloneBackend;
122    /// let backend = RcloneBackend::new(":b2,account=key_id,key=secret:my-bucket");
123    /// ```
124    pub fn new(remote: impl Into<String>) -> Self {
125        Self {
126            remote: SecretBox::new(Box::new(remote.into())),
127            shell: Box::new(LocalShell),
128            timeout_secs: resolve_timeout(None),
129            progress: StdMutex::new(None),
130        }
131    }
132
133    /// Create with a custom [`RemoteShell`] (e.g. PodShell for GPU pod execution).
134    pub fn with_shell(remote: impl Into<String>, shell: Box<dyn RemoteShell>) -> Self {
135        Self {
136            remote: SecretBox::new(Box::new(remote.into())),
137            shell,
138            timeout_secs: resolve_timeout(None),
139            progress: StdMutex::new(None),
140        }
141    }
142
143    /// Set an explicit timeout (seconds), overriding env and default.
144    ///
145    /// Useful when constructing from parsed config values.
146    /// Floor: 10 seconds.
147    pub fn with_timeout(mut self, timeout_secs: u64) -> Self {
148        self.timeout_secs = resolve_timeout(Some(timeout_secs));
149        self
150    }
151
152    /// Build the full remote path for a given relative path.
153    ///
154    /// Validates against CLI flag injection (`-` prefix) and
155    /// path traversal (`..` segments).
156    fn remote_path(&self, path: &str) -> Result<String, InfraError> {
157        let path = path.trim_matches('/');
158        // Reject paths that look like CLI flags (argument injection)
159        if path.starts_with('-') {
160            return Err(InfraError::Transfer {
161                reason: format!("invalid remote path (starts with '-'): {path}"),
162            });
163        }
164        // Reject path traversal attempts
165        if path.split('/').any(|seg| seg == "..") {
166            return Err(InfraError::Transfer {
167                reason: format!("invalid remote path (contains '..' traversal): {path}"),
168            });
169        }
170        let remote = self.remote.expose_secret();
171        if path.is_empty() {
172            Ok(remote.clone())
173        } else {
174            Ok(format!("{remote}/{path}"))
175        }
176    }
177
178    /// Whether this backend targets an SFTP remote.
179    ///
180    /// Used to inject SFTP-specific optimization flags that eliminate
181    /// per-file round-trips (modtime set, hash check).
182    fn is_sftp(&self) -> bool {
183        self.remote.expose_secret().starts_with(":sftp")
184    }
185
186    /// Execute an rclone command via the configured shell.
187    ///
188    /// Uses the configured timeout (`with_timeout` > `VDSL_RCLONE_TIMEOUT` > 300s).
189    /// Callers needing a different timeout (e.g. batch) should use `exec_rclone_with_timeout`.
190    async fn exec_rclone(&self, args: &[&str]) -> Result<String, InfraError> {
191        self.exec_rclone_with_timeout(args, self.timeout_secs).await
192    }
193
194    /// Execute an rclone command with an explicit timeout.
195    ///
196    /// Automatically appends SFTP optimization flags when the remote
197    /// is an SFTP target (see [`SFTP_OPTIMIZATION_FLAGS`]).
198    async fn exec_rclone_with_timeout(
199        &self,
200        args: &[&str],
201        timeout_secs: u64,
202    ) -> Result<String, InfraError> {
203        let mut full_args = vec!["rclone"];
204        full_args.extend_from_slice(args);
205        if self.is_sftp() {
206            full_args.extend_from_slice(SFTP_OPTIMIZATION_FLAGS);
207        }
208
209        let output = self.shell.exec(&full_args, Some(timeout_secs)).await?;
210
211        if !output.success {
212            return Err(InfraError::Transfer {
213                reason: format!(
214                    "rclone failed (exit {}): {}",
215                    output
216                        .exit_code
217                        .map_or("signal".to_string(), |c| c.to_string()),
218                    output.stderr.trim()
219                ),
220            });
221        }
222
223        Ok(output.stdout)
224    }
225}
226
227#[async_trait]
228impl StorageBackend for RcloneBackend {
229    async fn push(&self, local_path: &Path, remote_path: &str) -> Result<(), InfraError> {
230        let dest = self.remote_path(remote_path)?;
231        let local_str = local_path.to_str().ok_or_else(|| -> InfraError {
232            InfraError::Transfer {
233                reason: format!(
234                    "local path is not valid UTF-8: {}",
235                    local_path.to_string_lossy()
236                ),
237            }
238        })?;
239        self.exec_rclone(&["copyto", local_str, &dest]).await?;
240        Ok(())
241    }
242
243    async fn pull(&self, remote_path: &str, local_path: &Path) -> Result<(), InfraError> {
244        let src = self.remote_path(remote_path)?;
245        // Ensure parent directory exists via shell (works for both local and remote hosts)
246        if let Some(parent) = local_path.parent() {
247            if let Some(parent_str) = parent.to_str() {
248                if !parent_str.is_empty() {
249                    let _ = self
250                        .shell
251                        .exec(&["mkdir", "-p", parent_str], Some(10))
252                        .await;
253                }
254            }
255        }
256        let local_str = local_path.to_str().ok_or_else(|| -> InfraError {
257            InfraError::Transfer {
258                reason: format!(
259                    "local path is not valid UTF-8: {}",
260                    local_path.to_string_lossy()
261                ),
262            }
263        })?;
264        self.exec_rclone(&["copyto", &src, local_str]).await?;
265        Ok(())
266    }
267
268    async fn list(&self, remote_path: &str) -> Result<Vec<RemoteFile>, InfraError> {
269        let target = self.remote_path(remote_path)?;
270        // Format: "path;size;modtime" — modtime is ISO 8601 (e.g. "2024-01-15T10:30:00.000000000")
271        // --files-only excludes directory markers (B2/S3 "folders" are 0-byte objects
272        // that would be registered as files, causing phantom delete transfers).
273        let output = self
274            .exec_rclone(&["lsf", "--format", "pst", "--files-only", &target])
275            .await?;
276
277        let mut files = Vec::new();
278        for line in output.lines() {
279            let parts: Vec<&str> = line.splitn(3, ';').collect();
280            if parts.len() < 2 {
281                continue;
282            }
283            let path = parts[0];
284            let size = match parts[1].trim().parse::<u64>() {
285                Ok(s) => Some(s),
286                Err(e) => {
287                    tracing::debug!(
288                        path = path,
289                        raw_size = parts[1].trim(),
290                        error = %e,
291                        "rclone lsf: size parse failed, treating as unknown"
292                    );
293                    None
294                }
295            };
296            let modified_at = parts.get(2).and_then(|ts| parse_rclone_timestamp(ts));
297            files.push(RemoteFile {
298                path: path.to_string(),
299                size,
300                modified_at,
301            });
302        }
303        Ok(files)
304    }
305
306    /// Note: returns `Ok(false)` on any rclone error (including network failures).
307    /// This is a best-effort check — callers must not rely on `false` meaning
308    /// "confirmed absent". Use `push`/`pull` for authoritative operations.
309    async fn exists(&self, remote_path: &str) -> Result<bool, InfraError> {
310        let target = self.remote_path(remote_path)?;
311        let result = self.exec_rclone(&["lsf", &target]).await;
312        match result {
313            Ok(output) => Ok(!output.trim().is_empty()),
314            Err(e) => {
315                tracing::debug!(
316                    remote_path = remote_path,
317                    error = %e,
318                    "rclone exists check failed, returning false"
319                );
320                Ok(false)
321            }
322        }
323    }
324
325    async fn delete(&self, remote_path: &str) -> Result<(), InfraError> {
326        let target = self.remote_path(remote_path)?;
327        // rclone deletefile removes a single file (not directory).
328        // --retries 1 to fail fast on permanent errors.
329        //
330        // Per StorageBackend::delete contract: "Ok(()) if the file was
331        // deleted or didn't exist". rclone exit 4 = "not found" satisfies
332        // the postcondition (file is absent at dest), so we treat it as Ok.
333        match self
334            .exec_rclone(&["deletefile", &target, "--retries", "1"])
335            .await
336        {
337            Ok(_) => Ok(()),
338            Err(e) => {
339                let msg = e.to_string();
340                // rclone exit code 4 = object/directory not found.
341                // The delete goal (file absent at dest) is already met.
342                if msg.contains("exit 4") || msg.contains("not found") {
343                    tracing::debug!(
344                        remote_path = remote_path,
345                        "rclone deletefile: object already absent, treating as success"
346                    );
347                    Ok(())
348                } else {
349                    Err(e)
350                }
351            }
352        }
353    }
354
355    /// Batch push using `rclone copy --files-from`.
356    ///
357    /// For SFTP remotes, splits into chunks of [`SFTP_BATCH_CHUNK_SIZE`] files
358    /// with per-chunk progress logging and retry. Non-SFTP backends run as
359    /// a single batch (rclone handles large batches natively for B2/S3).
360    ///
361    /// Returns per-file Ok/Err.
362    async fn push_batch(
363        &self,
364        src_root: &Path,
365        dest_root: &str,
366        relative_paths: &[String],
367    ) -> HashMap<String, Result<(), InfraError>> {
368        if relative_paths.is_empty() {
369            return HashMap::new();
370        }
371
372        let dest_full = match self.remote_path(dest_root) {
373            Ok(d) => d,
374            Err(_) => {
375                let reason = format!("invalid dest_root for batch push: {dest_root}");
376                return Self::all_batch_err(relative_paths, &reason);
377            }
378        };
379
380        let src_root_str = match src_root.to_str() {
381            Some(s) => s.to_string(),
382            None => {
383                let reason = format!(
384                    "src_root is not valid UTF-8: {}",
385                    src_root.to_string_lossy()
386                );
387                return Self::all_batch_err(relative_paths, &reason);
388            }
389        };
390
391        self.exec_batch_chunked(
392            relative_paths,
393            "push",
394            |chunk, list_filename, sftp_flags, _chunk_timeout| {
395                let file_list = chunk.join("\n");
396                let src = &src_root_str;
397                let dest = &dest_full;
398                format!(
399                    "cat <<'__VDSL_EOF__' > /tmp/{list_filename}\n\
400                     {file_list}\n\
401                     __VDSL_EOF__\n\
402                     rclone copy {src} {dest} \
403                       --files-from /tmp/{list_filename} --transfers 8{sftp_flags}; \
404                     _rc=$?; rm -f /tmp/{list_filename}; exit $_rc"
405                )
406            },
407        )
408        .await
409    }
410
411    /// Batch pull using `rclone copy --files-from`.
412    ///
413    /// For SFTP remotes, splits into chunks with progress logging and retry.
414    async fn pull_batch(
415        &self,
416        src_root: &str,
417        dest_root: &Path,
418        relative_paths: &[String],
419    ) -> HashMap<String, Result<(), InfraError>> {
420        if relative_paths.is_empty() {
421            return HashMap::new();
422        }
423
424        let src_full = match self.remote_path(src_root) {
425            Ok(s) => s,
426            Err(_) => {
427                let reason = format!("invalid src_root for batch pull: {src_root}");
428                return Self::all_batch_err(relative_paths, &reason);
429            }
430        };
431
432        let dest_root_str = match dest_root.to_str() {
433            Some(s) => s.to_string(),
434            None => {
435                let reason = format!(
436                    "dest_root is not valid UTF-8: {}",
437                    dest_root.to_string_lossy()
438                );
439                return Self::all_batch_err(relative_paths, &reason);
440            }
441        };
442
443        self.exec_batch_chunked(
444            relative_paths,
445            "pull",
446            |chunk, list_filename, sftp_flags, _chunk_timeout| {
447                let file_list = chunk.join("\n");
448                let src = &src_full;
449                let dest = &dest_root_str;
450                format!(
451                    "cat <<'__VDSL_EOF__' > /tmp/{list_filename}\n\
452                     {file_list}\n\
453                     __VDSL_EOF__\n\
454                     rclone copy {src} {dest} \
455                       --files-from /tmp/{list_filename} --transfers 8{sftp_flags}; \
456                     _rc=$?; rm -f /tmp/{list_filename}; exit $_rc"
457                )
458            },
459        )
460        .await
461    }
462
463    /// Batch delete using `rclone delete --files-from`.
464    ///
465    /// For SFTP remotes, splits into chunks with progress logging and retry.
466    async fn delete_batch(
467        &self,
468        remote_root: &str,
469        relative_paths: &[String],
470    ) -> HashMap<String, Result<(), InfraError>> {
471        if relative_paths.is_empty() {
472            return HashMap::new();
473        }
474
475        let remote_full = match self.remote_path(remote_root) {
476            Ok(r) => r,
477            Err(_) => {
478                return Self::all_batch_err(
479                    relative_paths,
480                    &format!("invalid remote_root for batch delete: {remote_root}"),
481                );
482            }
483        };
484
485        self.exec_batch_chunked(
486            relative_paths,
487            "delete",
488            |chunk, list_filename, sftp_flags, _chunk_timeout| {
489                let file_list = chunk.join("\n");
490                let dest = &remote_full;
491                format!(
492                    "cat <<'__VDSL_EOF__' > /tmp/{list_filename}\n\
493                     {file_list}\n\
494                     __VDSL_EOF__\n\
495                     rclone delete {dest} \
496                       --files-from /tmp/{list_filename} --transfers 8{sftp_flags}; \
497                     _rc=$?; rm -f /tmp/{list_filename}; exit $_rc"
498                )
499            },
500        )
501        .await
502    }
503
504    fn supports_batch(&self) -> bool {
505        true
506    }
507
508    fn backend_type(&self) -> &str {
509        "rclone"
510    }
511
512    fn set_progress_callback(&self, callback: Option<ProgressFn>) {
513        if let Ok(mut guard) = self.progress.lock() {
514            *guard = callback;
515        }
516    }
517
518    async fn ensure(&self) -> Result<(), InfraError> {
519        // Step 1: rclone バイナリの存在確認
520        let check = self.shell.exec(&["which", "rclone"], Some(10)).await;
521        let rclone_found = matches!(&check, Ok(out) if out.success);
522
523        if !rclone_found {
524            // Step 2: インストール試行(.deb直接ダウンロード — unzip依存なし)
525            tracing::info!("rclone not found, attempting install via .deb package");
526            let install_script = concat!(
527                "curl -sL https://downloads.rclone.org/rclone-current-linux-amd64.deb -o /tmp/rclone.deb",
528                " && dpkg -i /tmp/rclone.deb",
529                " && rm -f /tmp/rclone.deb",
530            );
531            let install_result = self.shell.exec_script(install_script, Some(120)).await;
532
533            match &install_result {
534                Ok(out) if out.success => {
535                    tracing::info!("rclone installed successfully via .deb");
536                }
537                Ok(out) => {
538                    // dpkg失敗 → install.sh にフォールバック
539                    tracing::debug!(
540                        exit_code = out.exit_code,
541                        stderr = out.stderr.trim(),
542                        "dpkg install failed, falling back to install.sh"
543                    );
544                    let fallback = self
545                        .shell
546                        .exec_script("curl -sL https://rclone.org/install.sh | bash", Some(120))
547                        .await;
548                    match &fallback {
549                        Ok(o) if o.success => {
550                            tracing::info!("rclone installed successfully via install.sh");
551                        }
552                        Ok(o) => {
553                            return Err(InfraError::Init(format!(
554                                "rclone install failed (exit {}): {}",
555                                o.exit_code.unwrap_or(-1),
556                                o.stderr.trim()
557                            )));
558                        }
559                        Err(e) => {
560                            return Err(InfraError::Init(format!(
561                                "rclone install.sh exec failed: {e}"
562                            )));
563                        }
564                    }
565                }
566                Err(e) => {
567                    return Err(InfraError::Init(format!(
568                        "rclone .deb install exec failed: {e}"
569                    )));
570                }
571            }
572
573            // Step 3: インストール後の再確認
574            let recheck = self.shell.exec(&["which", "rclone"], Some(10)).await;
575            match &recheck {
576                Ok(out) if out.success => {}
577                _ => {
578                    return Err(InfraError::Init(
579                        "rclone still not found after install attempt".to_string(),
580                    ));
581                }
582            }
583        }
584
585        // Step 4: 接続テスト(rclone lsf でバケットルートにアクセス)
586        let remote = self.remote.expose_secret();
587        self.exec_rclone_with_timeout(&["lsf", "--max-depth", "1", remote], 30)
588            .await
589            .map_err(|e| InfraError::Init(format!("rclone connectivity test failed: {e}")))?;
590
591        Ok(())
592    }
593}
594
595impl RcloneBackend {
596    /// SFTP optimization flags as a space-separated string for shell scripts.
597    ///
598    /// Returns `" --sftp-set-modtime=false --sftp-disable-hashcheck"` for SFTP,
599    /// empty string otherwise.
600    fn sftp_flags_for_script(&self) -> &'static str {
601        if self.is_sftp() {
602            " --sftp-set-modtime=false --sftp-disable-hashcheck"
603        } else {
604            ""
605        }
606    }
607
608    /// Chunk size for this backend. SFTP uses small chunks; others run all-at-once.
609    fn batch_chunk_size(&self) -> usize {
610        if self.is_sftp() {
611            SFTP_BATCH_CHUNK_SIZE
612        } else {
613            usize::MAX // no chunking for non-SFTP
614        }
615    }
616
617    /// Execute a batch operation in chunks with progress logging and retry.
618    ///
619    /// `build_script` receives (chunk_paths, list_filename, sftp_flags, chunk_timeout)
620    /// and returns the shell script to execute.
621    ///
622    /// For SFTP: splits into [`SFTP_BATCH_CHUNK_SIZE`] chunks, logs progress
623    /// per chunk, retries failed chunks once.
624    /// For non-SFTP: runs as a single batch (chunk_size = usize::MAX).
625    async fn exec_batch_chunked<F>(
626        &self,
627        relative_paths: &[String],
628        operation: &str,
629        build_script: F,
630    ) -> HashMap<String, Result<(), InfraError>>
631    where
632        F: Fn(&[String], &str, &str, u64) -> String,
633    {
634        let chunk_size = self.batch_chunk_size();
635        let sftp_flags = self.sftp_flags_for_script();
636        let total = relative_paths.len();
637        let chunks: Vec<&[String]> = relative_paths.chunks(chunk_size).collect();
638        let num_chunks = chunks.len();
639
640        if num_chunks > 1 {
641            tracing::info!(
642                operation,
643                total,
644                num_chunks,
645                chunk_size,
646                "batch_{operation}: chunked transfer start"
647            );
648        }
649
650        let mut all_results = HashMap::with_capacity(total);
651        let mut completed = 0usize;
652
653        for (i, chunk) in chunks.iter().enumerate() {
654            let chunk_num = i + 1;
655            let chunk_timeout =
656                self.timeout_secs + (chunk.len() as u64 * BATCH_PER_FILE_TIMEOUT_SECS);
657            let list_filename =
658                format!("vdsl-{operation}-{}.txt", uuid::Uuid::new_v4().as_simple());
659
660            if num_chunks > 1 {
661                tracing::info!(
662                    operation,
663                    chunk = chunk_num,
664                    num_chunks,
665                    files = chunk.len(),
666                    completed,
667                    total,
668                    "batch_{operation}: chunk start"
669                );
670            }
671
672            let script = build_script(chunk, &list_filename, sftp_flags, chunk_timeout);
673
674            let mut attempt = 0u32;
675            let chunk_result = loop {
676                let result = self.shell.exec_script(&script, Some(chunk_timeout)).await;
677
678                match &result {
679                    Ok(output) if output.success => break Ok(()),
680                    Ok(output) => {
681                        let err_msg = format!(
682                            "rclone failed (exit {}): {}",
683                            output
684                                .exit_code
685                                .map_or("signal".to_string(), |c| c.to_string()),
686                            output.stderr.trim()
687                        );
688                        if attempt < BATCH_CHUNK_MAX_RETRIES {
689                            attempt += 1;
690                            tracing::warn!(
691                                operation,
692                                chunk = chunk_num,
693                                attempt,
694                                error = %err_msg,
695                                "batch_{operation}: chunk failed, retrying"
696                            );
697                            continue;
698                        }
699                        break Err(format!("batch {operation} failed: {err_msg}"));
700                    }
701                    Err(e) => {
702                        if attempt < BATCH_CHUNK_MAX_RETRIES {
703                            attempt += 1;
704                            tracing::warn!(
705                                operation,
706                                chunk = chunk_num,
707                                attempt,
708                                error = %e,
709                                "batch_{operation}: chunk failed, retrying"
710                            );
711                            continue;
712                        }
713                        break Err(format!("batch {operation} failed: {e}"));
714                    }
715                }
716            };
717
718            match chunk_result {
719                Ok(()) => {
720                    for p in *chunk {
721                        all_results.insert(p.clone(), Ok(()));
722                    }
723                    completed += chunk.len();
724                }
725                Err(reason) => {
726                    for p in *chunk {
727                        all_results.insert(
728                            p.clone(),
729                            Err(InfraError::Transfer {
730                                reason: reason.clone(),
731                            }),
732                        );
733                    }
734                    // Continue with next chunks — don't abort the entire batch
735                    tracing::error!(
736                        operation,
737                        chunk = chunk_num,
738                        failed_files = chunk.len(),
739                        reason = %reason,
740                        "batch_{operation}: chunk failed after retries, continuing"
741                    );
742                }
743            }
744
745            // Report progress via callback (if set).
746            if let Ok(guard) = self.progress.lock() {
747                if let Some(cb) = guard.as_ref() {
748                    cb(&format!(
749                        "{operation}: chunk {chunk_num}/{num_chunks} ({completed}/{total})"
750                    ));
751                }
752            }
753
754            if num_chunks > 1 {
755                tracing::info!(
756                    operation,
757                    chunk = chunk_num,
758                    num_chunks,
759                    completed,
760                    total,
761                    "batch_{operation}: chunk done"
762                );
763            }
764        }
765
766        if num_chunks > 1 {
767            let failed = total - completed;
768            tracing::info!(
769                operation,
770                total,
771                completed,
772                failed,
773                "batch_{operation}: all chunks done"
774            );
775        }
776
777        all_results
778    }
779
780    /// Helper: build all-error result map for batch operations.
781    fn all_batch_err(
782        relative_paths: &[String],
783        reason: &str,
784    ) -> HashMap<String, Result<(), InfraError>> {
785        relative_paths
786            .iter()
787            .map(|p| {
788                (
789                    p.clone(),
790                    Err(InfraError::Transfer {
791                        reason: reason.to_string(),
792                    }),
793                )
794            })
795            .collect()
796    }
797}
798
799/// Parse rclone's timestamp format into `DateTime<Utc>`.
800///
801/// rclone lsf `%t` outputs ISO 8601 with nanoseconds:
802/// `"2024-01-15T10:30:00.000000000"` (no timezone — always UTC).
803fn parse_rclone_timestamp(s: &str) -> Option<DateTime<Utc>> {
804    let trimmed = s.trim();
805    // Try full nanosecond format first, then fall back to simpler formats
806    NaiveDateTime::parse_from_str(trimmed, "%Y-%m-%dT%H:%M:%S%.f")
807        .or_else(|_| NaiveDateTime::parse_from_str(trimmed, "%Y-%m-%dT%H:%M:%S"))
808        .ok()
809        .map(|naive| naive.and_utc())
810}
811
812#[cfg(test)]
813mod tests {
814    use super::*;
815
816    #[test]
817    fn remote_path_construction() {
818        let b = RcloneBackend::new(":b2,account=kid,key=k:bucket");
819        assert_eq!(
820            b.remote_path("models/ckpt.safetensors").unwrap(),
821            ":b2,account=kid,key=k:bucket/models/ckpt.safetensors"
822        );
823        assert_eq!(
824            b.remote_path("/leading/slash").unwrap(),
825            ":b2,account=kid,key=k:bucket/leading/slash"
826        );
827        assert_eq!(b.remote_path("").unwrap(), ":b2,account=kid,key=k:bucket");
828    }
829
830    #[test]
831    fn remote_path_rejects_flag_like_input() {
832        let b = RcloneBackend::new("remote:bucket");
833        assert!(b.remote_path("--config=/etc/rclone.conf").is_err());
834        assert!(b.remote_path("-v").is_err());
835    }
836
837    #[test]
838    fn remote_path_rejects_traversal() {
839        let b = RcloneBackend::new("remote:bucket");
840        assert!(b.remote_path("../../etc/passwd").is_err());
841        assert!(b.remote_path("foo/../bar").is_err());
842        assert!(b.remote_path("..").is_err());
843        // Single dot is OK (current directory reference, harmless)
844        assert!(b.remote_path("./valid").is_ok());
845        // "..." is not "..", should be OK
846        assert!(b.remote_path("a/.../b").is_ok());
847    }
848
849    #[test]
850    fn backend_type() {
851        let b = RcloneBackend::new("remote:bucket");
852        assert_eq!(b.backend_type(), "rclone");
853    }
854
855    #[test]
856    fn parse_rclone_timestamp_nanoseconds() {
857        let ts = parse_rclone_timestamp("2024-01-15T10:30:00.123456789");
858        assert!(ts.is_some());
859        let dt = ts.unwrap();
860        assert_eq!(dt.year(), 2024);
861        assert_eq!(dt.month(), 1);
862        assert_eq!(dt.day(), 15);
863        assert_eq!(dt.hour(), 10);
864        assert_eq!(dt.minute(), 30);
865    }
866
867    #[test]
868    fn parse_rclone_timestamp_no_fraction() {
869        let ts = parse_rclone_timestamp("2024-01-15T10:30:00");
870        assert!(ts.is_some());
871    }
872
873    #[test]
874    fn parse_rclone_timestamp_invalid() {
875        assert!(parse_rclone_timestamp("not-a-date").is_none());
876        assert!(parse_rclone_timestamp("").is_none());
877    }
878
879    #[test]
880    fn is_sftp_detection() {
881        let sftp = RcloneBackend::new(":sftp,host=1.2.3.4,port=22,user=root:");
882        assert!(sftp.is_sftp());
883        assert_eq!(
884            sftp.sftp_flags_for_script(),
885            " --sftp-set-modtime=false --sftp-disable-hashcheck"
886        );
887
888        let b2 = RcloneBackend::new(":b2,account=kid,key=k:bucket");
889        assert!(!b2.is_sftp());
890        assert_eq!(b2.sftp_flags_for_script(), "");
891    }
892
893    use chrono::Datelike;
894    use chrono::Timelike;
895}