Skip to main content

coding_agent_search/sources/
sync.rs

1//! Sync engine for pulling agent sessions from remote sources.
2//!
3//! This module provides the core sync functionality using rsync over SSH
4//! for efficient delta transfers, with progress reporting and error recovery.
5//!
6//! # Safety
7//!
8//! **IMPORTANT**: The sync engine uses rsync WITHOUT the `--delete` flag
9//! to ensure safe additive syncs. This prevents accidental data loss if
10//! a remote is misconfigured or temporarily empty.
11//!
12//! # Example
13//!
14//! ```rust,ignore
15//! use coding_agent_search::sources::sync::SyncEngine;
16//! use coding_agent_search::sources::config::SourcesConfig;
17//!
18//! let config = SourcesConfig::load()?;
19//! let engine = SyncEngine::new(&data_dir);
20//!
21//! for source in config.remote_sources() {
22//!     let report = engine.sync_source(source)?;
23//!     println!("Synced {}: {} files", source.name, report.total_files());
24//! }
25//! ```
26
27use std::path::{Path, PathBuf};
28use std::process::{Command, Stdio};
29use std::sync::OnceLock;
30use std::time::{Duration, Instant};
31
32use thiserror::Error;
33
34use super::{
35    config::{
36        SourceDefinition, SyncSchedule, discover_ssh_hosts, source_path_entry_error,
37        ssh_host_has_safe_token_chars, validate_optional_user_host_shape,
38    },
39    host_key_verification_error, is_host_key_verification_failure, strict_ssh_cli_tokens,
40    strict_ssh_command_for_rsync, wait_for_child_output_with_timeout,
41};
42use ssh2::{FileStat, Session, Sftp};
43use std::io::{Read as IoRead, Write as IoWrite};
44use std::net::{Shutdown, TcpStream};
45
46/// Which variant of rsync's "pass args protected to the remote" flag the
47/// system `rsync` accepts. The flag was introduced in rsync 3.0.0 as
48/// `--protect-args`; rsync 3.4.0 renamed the primary form to
49/// `--secluded-args` (`-s`) and current Homebrew `rsync 3.4.1` prints only
50/// the new name in `--help`, so a simple substring probe for `--protect-args`
51/// mis-classifies it as unsupported and falls through to the quoted-path
52/// rsync branch — which breaks (#191). openrsync (macOS 15+) supports
53/// neither.
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55enum RsyncArgProtection {
56    /// Neither flag supported — callers must manually quote remote paths for
57    /// the remote login shell.
58    None,
59    /// rsync 3.0.0..3.4.0 — original flag name.
60    ProtectArgs,
61    /// rsync 3.4.0+ (incl. Homebrew 3.4.1) — renamed primary form.
62    SecludedArgs,
63}
64
65impl RsyncArgProtection {
66    fn is_supported(self) -> bool {
67        !matches!(self, Self::None)
68    }
69
70    /// CLI flag to pass to rsync, or `None` if no protection variant is
71    /// available.
72    fn flag(self) -> Option<&'static str> {
73        match self {
74            Self::ProtectArgs => Some("--protect-args"),
75            Self::SecludedArgs => Some("--secluded-args"),
76            Self::None => None,
77        }
78    }
79}
80
81fn detect_rsync_arg_protection() -> RsyncArgProtection {
82    static CACHED: OnceLock<RsyncArgProtection> = OnceLock::new();
83    *CACHED.get_or_init(|| {
84        let Some(out) = Command::new("rsync").arg("--help").output().ok() else {
85            return RsyncArgProtection::None;
86        };
87        // rsync prints to stdout on GNU/Linux and Homebrew macOS, but some
88        // forks / older builds print help on stderr — check both so we never
89        // misclassify a supported rsync as unsupported.
90        let mut combined = String::from_utf8_lossy(&out.stdout).into_owned();
91        combined.push_str(&String::from_utf8_lossy(&out.stderr));
92        // Prefer the newer name when both are listed (forward-compat with a
93        // hypothetical rsync that keeps both as aliases): `--secluded-args`
94        // is what current rsync actually prints in help output, and using
95        // the printed name is the one guaranteed to be accepted.
96        if combined.contains("--secluded-args") {
97            RsyncArgProtection::SecludedArgs
98        } else if combined.contains("--protect-args") {
99            RsyncArgProtection::ProtectArgs
100        } else {
101            RsyncArgProtection::None
102        }
103    })
104}
105
106fn quote_remote_shell_path(path: &str) -> String {
107    // POSIX shell single-quote escape:
108    // 1. Wrap the whole thing in single quotes.
109    // 2. Escape existing single quotes by closing the current quote,
110    //    inserting a backslash-escaped quote, and opening a new one.
111    // Result: 'foo'\''bar'
112    format!("'{}'", path.replace('\'', r#"'\''"#))
113}
114
115fn remote_spec_for_shell_bound_copy(host: &str, remote_path: &str) -> String {
116    // host itself might contain user@ or be an alias, but we should not quote it
117    // if it's already a single token. However, if it contains spaces or other
118    // weirdness it's already broken for SSH. We focus on the path part.
119    format!("{host}:{}", quote_remote_shell_path(remote_path))
120}
121
122fn remote_spec_for_rsync(host: &str, remote_path: &str, protect_args_supported: bool) -> String {
123    if protect_args_supported {
124        // With --protect-args, rsync handles its own escaping over the wire
125        format!("{host}:{remote_path}")
126    } else {
127        // Without it (e.g. openrsync), we must manually quote for the remote shell
128        remote_spec_for_shell_bound_copy(host, remote_path)
129    }
130}
131
132fn remote_spec_for_scp(host: &str, remote_path: &str) -> String {
133    // scp still executes a remote shell command for the source operand, so the
134    // path side must be quoted even though we pass it as one local argv token.
135    remote_spec_for_shell_bound_copy(host, remote_path)
136}
137
138fn remote_find_regular_files_command(remote_path: &str) -> String {
139    format!(
140        "find -P {} -type f -print0",
141        quote_remote_shell_path(remote_path)
142    )
143}
144
145fn parse_remote_home_stdout(stdout: &[u8]) -> Option<String> {
146    let output = String::from_utf8_lossy(stdout);
147    for line in output.lines() {
148        if let Some(home) = line.trim().strip_prefix("CASS_HOME_MARKER:")
149            && home.starts_with('/')
150            && !home.contains('\0')
151        {
152            return Some(home.to_string());
153        }
154    }
155    None
156}
157
158fn parse_null_terminated_utf8_paths(bytes: &[u8]) -> Vec<String> {
159    bytes
160        .split(|byte| *byte == 0)
161        .filter(|part| !part.is_empty())
162        .filter_map(|part| std::str::from_utf8(part).ok())
163        .map(ToOwned::to_owned)
164        .collect()
165}
166
167fn validate_remote_sync_path_entry(index: usize, path: &str) -> Result<(), SyncError> {
168    match source_path_entry_error(index, path) {
169        Some(message) => Err(SyncError::InvalidPath(message)),
170        None => Ok(()),
171    }
172}
173
174fn invalid_remote_sync_path_result(remote_path: &str, err: SyncError) -> PathSyncResult {
175    PathSyncResult {
176        remote_path: remote_path.to_string(),
177        success: false,
178        error: Some(err.to_string()),
179        ..Default::default()
180    }
181}
182
183fn remote_file_to_safe_local_path(
184    remote_root: &Path,
185    remote_file: &Path,
186    local_container: &Path,
187    leaf_name: &str,
188) -> Option<PathBuf> {
189    let mut local_path = local_container.join(leaf_name);
190    if remote_file == remote_root {
191        return Some(local_path);
192    }
193
194    let relative = remote_file.strip_prefix(remote_root).ok()?;
195    for component in relative.components() {
196        match component {
197            std::path::Component::Normal(name) => local_path.push(name),
198            std::path::Component::CurDir => {}
199            _ => return None,
200        }
201    }
202
203    Some(local_path)
204}
205
206fn existing_local_symlink_below_root(root: &Path, path: &Path) -> Result<Option<PathBuf>, String> {
207    let rel = path.strip_prefix(root).map_err(|_| {
208        format!(
209            "Local path {} is outside sync root {}",
210            path.display(),
211            root.display()
212        )
213    })?;
214
215    let mut current = root.to_path_buf();
216    if let Some(link) = existing_path_symlink(&current)? {
217        return Ok(Some(link));
218    }
219
220    for component in rel.components() {
221        match component {
222            std::path::Component::Normal(name) => current.push(name),
223            std::path::Component::CurDir => continue,
224            _ => {
225                return Err(format!(
226                    "Local path {} contains unsafe component below sync root {}",
227                    path.display(),
228                    root.display()
229                ));
230            }
231        }
232
233        if let Some(link) = existing_path_symlink(&current)? {
234            return Ok(Some(link));
235        }
236    }
237
238    Ok(None)
239}
240
241fn existing_path_symlink(path: &Path) -> Result<Option<PathBuf>, String> {
242    match std::fs::symlink_metadata(path) {
243        Ok(metadata) if metadata.file_type().is_symlink() => Ok(Some(path.to_path_buf())),
244        Ok(_) => Ok(None),
245        Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
246        Err(e) => Err(format!("Failed to inspect {}: {}", path.display(), e)),
247    }
248}
249
250fn reject_local_symlink_below_root(root: &Path, path: &Path) -> Result<(), String> {
251    if let Some(link) = existing_local_symlink_below_root(root, path)? {
252        return Err(format!(
253            "Refusing to write {} through local symlink {}",
254            path.display(),
255            link.display()
256        ));
257    }
258
259    Ok(())
260}
261
262fn prepare_local_sync_container(sync_root: &Path, local_path: &Path) -> Result<(), String> {
263    reject_local_symlink_below_root(sync_root, local_path)?;
264    std::fs::create_dir_all(local_path)
265        .map_err(|e| format!("Failed to create directory: {}", e))?;
266    reject_local_symlink_below_root(sync_root, local_path)?;
267    Ok(())
268}
269
270fn sftp_file_stat_is_symlink(stat: &FileStat) -> bool {
271    stat.file_type().is_symlink()
272}
273
274/// Errors that can occur during sync operations.
275#[derive(Error, Debug)]
276pub enum SyncError {
277    #[error("Source has no host configured")]
278    NoHost,
279
280    #[error("Source has no paths configured")]
281    NoPaths,
282
283    #[error("Invalid source path: {0}")]
284    InvalidPath(String),
285
286    #[error("Invalid source definition: {0}")]
287    InvalidSource(String),
288
289    #[error("rsync command failed: {0}")]
290    RsyncFailed(String),
291
292    #[error("Failed to create local directory: {0}")]
293    CreateDirFailed(#[from] std::io::Error),
294
295    #[error("SSH connection failed: {0}")]
296    SshFailed(String),
297
298    #[error("Connection timed out after {0} seconds")]
299    Timeout(u64),
300
301    #[error("Sync cancelled")]
302    Cancelled,
303}
304
305/// Method used for syncing files from remote.
306#[derive(Debug, Clone, Copy, PartialEq, Eq)]
307pub enum SyncMethod {
308    /// rsync over SSH - preferred for delta transfers
309    Rsync,
310    /// rsync invoked via WSL (`wsl rsync`) - used on Windows when native rsync is unavailable
311    /// but WSL is installed with rsync available inside it.
312    WslRsync,
313    /// SCP-based transfer using the system `scp` command.
314    ///
315    /// Used on Windows (and other platforms) when rsync is unavailable. Delegates all
316    /// authentication to the system `ssh`/`scp` binary so it inherits OpenSSH agent,
317    /// `~/.ssh/` keys, and `~/.ssh/config` correctly – avoiding the `ssh2` library
318    /// which does not integrate with the Windows OpenSSH agent.
319    Scp,
320    /// SFTP fallback using the `ssh2` crate – last resort only.
321    ///
322    /// Deprecated in favour of [`SyncMethod::Scp`] which uses the native system SSH
323    /// binary. Kept for backward compatibility with callers that pattern-match on this
324    /// variant.
325    Sftp,
326}
327
328impl SyncMethod {
329    pub fn as_str(self) -> &'static str {
330        match self {
331            Self::Rsync => "rsync",
332            Self::WslRsync => "wsl-rsync",
333            Self::Scp => "scp",
334            Self::Sftp => "sftp",
335        }
336    }
337}
338
339impl std::fmt::Display for SyncMethod {
340    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
341        f.write_str(self.as_str())
342    }
343}
344
345/// Result of syncing a single path.
346#[derive(Debug, Clone, Default)]
347pub struct PathSyncResult {
348    /// Remote path that was synced.
349    pub remote_path: String,
350    /// Local destination path.
351    pub local_path: PathBuf,
352    /// Number of files transferred.
353    pub files_transferred: u64,
354    /// Total bytes transferred.
355    pub bytes_transferred: u64,
356    /// Whether the sync succeeded.
357    pub success: bool,
358    /// Error message if sync failed.
359    pub error: Option<String>,
360    /// Duration of the sync operation.
361    pub duration_ms: u64,
362}
363
364/// Report from syncing an entire source.
365#[derive(Debug, Clone)]
366pub struct SyncReport {
367    /// Name of the source that was synced.
368    pub source_name: String,
369    /// Method used for syncing.
370    pub method: SyncMethod,
371    /// Results for each path.
372    pub path_results: Vec<PathSyncResult>,
373    /// Total duration of the sync.
374    pub total_duration_ms: u64,
375    /// Whether all paths synced successfully.
376    pub all_succeeded: bool,
377}
378
379impl SyncReport {
380    /// Create a new report for a source.
381    pub fn new(source_name: impl Into<String>, method: SyncMethod) -> Self {
382        Self {
383            source_name: source_name.into(),
384            method,
385            path_results: Vec::new(),
386            total_duration_ms: 0,
387            all_succeeded: true,
388        }
389    }
390
391    /// Create a failed report when sync couldn't even start.
392    pub fn failed(source_name: impl Into<String>, error: SyncError) -> Self {
393        Self {
394            source_name: source_name.into(),
395            method: SyncMethod::Rsync,
396            path_results: vec![PathSyncResult {
397                error: Some(error.to_string()),
398                success: false,
399                ..Default::default()
400            }],
401            total_duration_ms: 0,
402            all_succeeded: false,
403        }
404    }
405
406    /// Add a path result to the report.
407    pub fn add_path_result(&mut self, result: PathSyncResult) {
408        if !result.success {
409            self.all_succeeded = false;
410        }
411        self.path_results.push(result);
412    }
413
414    /// Get total files transferred across all paths.
415    pub fn total_files(&self) -> u64 {
416        self.path_results.iter().map(|r| r.files_transferred).sum()
417    }
418
419    /// Get total bytes transferred across all paths.
420    pub fn total_bytes(&self) -> u64 {
421        self.path_results.iter().map(|r| r.bytes_transferred).sum()
422    }
423
424    /// Get count of successful path syncs.
425    pub fn successful_paths(&self) -> usize {
426        self.path_results.iter().filter(|r| r.success).count()
427    }
428
429    /// Get count of failed path syncs.
430    pub fn failed_paths(&self) -> usize {
431        self.path_results.iter().filter(|r| !r.success).count()
432    }
433
434    /// Summarize the overall sync outcome.
435    pub fn sync_result(&self) -> SyncResult {
436        if self.all_succeeded {
437            SyncResult::Success
438        } else {
439            let errors: Vec<String> = self
440                .path_results
441                .iter()
442                .filter_map(|r| r.error.clone())
443                .collect();
444            if self.successful_paths() > 0 {
445                SyncResult::PartialFailure(errors.join("; "))
446            } else {
447                SyncResult::Failed(errors.join("; "))
448            }
449        }
450    }
451}
452
453/// Statistics parsed from rsync output.
454#[derive(Debug, Default)]
455struct RsyncStats {
456    files_transferred: u64,
457    bytes_transferred: u64,
458}
459
460/// Sync engine for pulling sessions from remote sources.
461pub struct SyncEngine {
462    /// Base directory for storing synced data.
463    /// Structure: `{local_store}/remotes/{source_name}/mirror/`
464    local_store: PathBuf,
465    /// Connection timeout in seconds.
466    connection_timeout: u64,
467    /// Transfer timeout in seconds (0 = no timeout).
468    transfer_timeout: u64,
469}
470
471impl SyncEngine {
472    /// Create a new sync engine.
473    ///
474    /// # Arguments
475    /// * `data_dir` - The cass data directory (e.g., ~/.local/share/coding-agent-search)
476    pub fn new(data_dir: &Path) -> Self {
477        Self {
478            local_store: data_dir.to_path_buf(),
479            connection_timeout: 10,
480            transfer_timeout: 300, // 5 minutes
481        }
482    }
483
484    /// Set the connection timeout.
485    pub fn with_connection_timeout(mut self, seconds: u64) -> Self {
486        self.connection_timeout = seconds;
487        self
488    }
489
490    /// Set the transfer timeout.
491    pub fn with_transfer_timeout(mut self, seconds: u64) -> Self {
492        self.transfer_timeout = seconds;
493        self
494    }
495
496    /// Get the local mirror directory for a source.
497    pub fn mirror_dir(&self, source_name: &str) -> PathBuf {
498        self.local_store
499            .join("remotes")
500            .join(source_name)
501            .join("mirror")
502    }
503
504    /// Get the remote home directory by SSH-ing to the host and printing `$HOME`.
505    ///
506    /// This is called once per source sync to avoid repeated SSH calls for each path.
507    fn get_remote_home(&self, host: &str) -> Result<String, SyncError> {
508        // Validate host doesn't contain shell metacharacters to prevent injection
509        if host.trim().is_empty()
510            || host.starts_with('-')
511            || !ssh_host_has_safe_token_chars(host)
512            || validate_optional_user_host_shape(host).is_err()
513        {
514            return Err(SyncError::SshFailed(format!(
515                "Invalid characters in host: {}",
516                host
517            )));
518        }
519
520        let timeout_secs = self.connection_timeout.max(1);
521        let mut cmd = Command::new("ssh");
522        cmd.args(strict_ssh_cli_tokens(timeout_secs))
523            .arg("--")
524            .arg(host)
525            .arg("printf 'CASS_HOME_MARKER:%s\\n' \"$HOME\"")
526            .stdout(Stdio::piped())
527            .stderr(Stdio::piped());
528
529        let child = cmd
530            .spawn()
531            .map_err(|e| SyncError::SshFailed(format!("Failed to execute ssh: {}", e)))?;
532        let output = wait_for_child_output_with_timeout(child, Duration::from_secs(timeout_secs))
533            .map_err(|e| SyncError::SshFailed(format!("SSH command failed: {}", e)))?
534            .ok_or(SyncError::Timeout(timeout_secs))?;
535
536        if !output.status.success() {
537            let stderr = String::from_utf8_lossy(&output.stderr);
538            if is_host_key_verification_failure(&stderr) {
539                return Err(SyncError::SshFailed(host_key_verification_error(host)));
540            }
541            return Err(SyncError::SshFailed(format!(
542                "Failed to get remote home directory: {}",
543                stderr.trim()
544            )));
545        }
546
547        let remote_home = parse_remote_home_stdout(&output.stdout).ok_or_else(|| {
548            SyncError::SshFailed(
549                "Unable to parse remote home directory from SSH output".to_string(),
550            )
551        })?;
552
553        tracing::debug!(host = %host, remote_home = %remote_home, "got remote home directory");
554        Ok(remote_home)
555    }
556
557    /// Expand ~ in a remote path using the provided home directory.
558    ///
559    /// If `remote_home` is None, returns the path unchanged.
560    fn expand_tilde_with_home(path: &str, remote_home: Option<&str>) -> String {
561        if !path.starts_with('~') {
562            return path.to_string();
563        }
564
565        let Some(home) = remote_home else {
566            return path.to_string();
567        };
568
569        if path == "~" {
570            home.to_string()
571        } else if let Some(rest) = path.strip_prefix("~/") {
572            format!("{}/{}", home, rest)
573        } else {
574            // ~user/path case - not supported, return as-is
575            path.to_string()
576        }
577    }
578
579    /// Detect the available sync method.
580    ///
581    /// Detection order:
582    /// 1. Native `rsync` → [`SyncMethod::Rsync`]
583    /// 2. `wsl rsync` (Windows only) → [`SyncMethod::WslRsync`]
584    /// 3. System `scp` available → [`SyncMethod::Scp`]
585    /// 4. Last resort → [`SyncMethod::Sftp`] (ssh2-based, no native-agent integration)
586    ///
587    /// On Windows the `ssh2` SFTP path is intentionally avoided whenever possible
588    /// because it bypasses the Windows OpenSSH agent and `~/.ssh/config`, leading to
589    /// "No valid authentication method found" errors even when SSH keys are properly
590    /// configured. Using the system `scp` binary instead lets OpenSSH handle auth the
591    /// same way `ssh` and `cass sources doctor` do.
592    pub fn detect_sync_method() -> SyncMethod {
593        // 1. Native rsync
594        if Command::new("rsync")
595            .arg("--version")
596            .output()
597            .map(|o| o.status.success())
598            .unwrap_or(false)
599        {
600            return SyncMethod::Rsync;
601        }
602
603        // 2. WSL rsync (Windows-only: rsync inside WSL invoked via `wsl rsync`)
604        #[cfg(target_os = "windows")]
605        if Command::new("wsl")
606            .args(["rsync", "--version"])
607            .output()
608            .map(|o| o.status.success())
609            .unwrap_or(false)
610        {
611            return SyncMethod::WslRsync;
612        }
613
614        // 3. System scp – preferred over ssh2/SFTP because it inherits the native
615        //    OpenSSH agent and ~/.ssh/config on all platforms (especially Windows).
616        if Command::new("scp")
617            .arg("-S")
618            .arg("ssh")
619            .arg("--")
620            // pass a harmless flag; scp prints usage and exits non-zero, but if the
621            // binary exists the spawn itself succeeds which is all we need to check.
622            .output()
623            .is_ok()
624        {
625            // Confirm scp is a real binary by checking for the executable
626            if which_scp_exists() {
627                return SyncMethod::Scp;
628            }
629        }
630
631        // 4. Last resort: ssh2-based SFTP
632        SyncMethod::Sftp
633    }
634
635    /// Sync a single source.
636    ///
637    /// Syncs all configured paths from the source to the local mirror directory.
638    /// Individual path failures don't abort the entire sync.
639    pub fn sync_source(&self, source: &SourceDefinition) -> Result<SyncReport, SyncError> {
640        if !source.is_remote() {
641            return Err(SyncError::NoHost);
642        }
643
644        let host = source.host.as_ref().ok_or(SyncError::NoHost)?;
645
646        if source.paths.is_empty() {
647            return Err(SyncError::NoPaths);
648        }
649
650        source
651            .validate_structure()
652            .map_err(|e| SyncError::InvalidSource(e.to_string()))?;
653
654        let method = Self::detect_sync_method();
655        let mut report = SyncReport::new(&source.name, method);
656        let overall_start = Instant::now();
657
658        // Create the mirror directory
659        let mirror_dir = self.mirror_dir(&source.name);
660        std::fs::create_dir_all(&mirror_dir)?;
661
662        // Pre-fetch remote home directory if any paths use tilde (avoids multiple SSH calls)
663        let remote_home = if source.paths.iter().enumerate().any(|(index, path)| {
664            path.starts_with('~') && validate_remote_sync_path_entry(index, path).is_ok()
665        }) {
666            match self.get_remote_home(host) {
667                Ok(home) => Some(home),
668                Err(e) => {
669                    tracing::warn!(host = %host, error = %e, "Failed to get remote home directory");
670                    None
671                }
672            }
673        } else {
674            None
675        };
676
677        for (index, remote_path) in source.paths.iter().enumerate() {
678            if let Err(err) = validate_remote_sync_path_entry(index, remote_path) {
679                report.add_path_result(invalid_remote_sync_path_result(remote_path, err));
680                continue;
681            }
682
683            let result = match method {
684                SyncMethod::Rsync => {
685                    self.sync_path_rsync(host, remote_path, &mirror_dir, remote_home.as_deref())
686                }
687                SyncMethod::WslRsync => {
688                    self.sync_path_wsl_rsync(host, remote_path, &mirror_dir, remote_home.as_deref())
689                }
690                SyncMethod::Scp => {
691                    self.sync_path_scp(host, remote_path, &mirror_dir, remote_home.as_deref())
692                }
693                SyncMethod::Sftp => {
694                    self.sync_path_sftp(host, remote_path, &mirror_dir, remote_home.as_deref())
695                }
696            };
697            report.add_path_result(result);
698        }
699
700        report.total_duration_ms = overall_start.elapsed().as_millis() as u64;
701        Ok(report)
702    }
703
704    /// Sync all remote sources from a config.
705    ///
706    /// Continues even if individual sources fail.
707    pub fn sync_all(
708        &self,
709        sources: impl Iterator<Item = impl std::borrow::Borrow<SourceDefinition>>,
710    ) -> Vec<SyncReport> {
711        sources
712            .map(|source| {
713                let source = source.borrow();
714                self.sync_source(source)
715                    .unwrap_or_else(|e| SyncReport::failed(&source.name, e))
716            })
717            .collect()
718    }
719
720    /// Sync a single path using rsync.
721    ///
722    /// **IMPORTANT**: Uses rsync WITHOUT --delete for safe additive syncs.
723    ///
724    /// The `remote_home` parameter should be pre-fetched via `get_remote_home()` to avoid
725    /// repeated SSH calls for each path.
726    fn sync_path_rsync(
727        &self,
728        host: &str,
729        remote_path: &str,
730        dest_dir: &Path,
731        remote_home: Option<&str>,
732    ) -> PathSyncResult {
733        let start = Instant::now();
734        if remote_path.starts_with('~') && remote_home.is_none() {
735            let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
736            return PathSyncResult {
737                remote_path: remote_path.to_string(),
738                local_path,
739                success: false,
740                error: Some(
741                    "Cannot expand '~' in remote path; failed to determine remote home directory"
742                        .to_string(),
743                ),
744                duration_ms: start.elapsed().as_millis() as u64,
745                ..Default::default()
746            };
747        }
748
749        // Expand ~ using pre-fetched home directory (no SSH call here)
750        let expanded_path = Self::expand_tilde_with_home(remote_path, remote_home);
751
752        // If tilde expansion failed (no remote_home provided), log a warning
753        if remote_path.starts_with('~') && expanded_path == remote_path {
754            tracing::warn!(
755                remote_path = %remote_path,
756                "Could not expand tilde in path (remote home directory not available)"
757            );
758        }
759
760        // Convert remote path to safe local directory name
761        // Use raw remote_path for stability (independent of home expansion success)
762        let safe_name = path_to_safe_dirname(remote_path);
763        let local_path = dest_dir.join(&safe_name);
764
765        // Create local directory without following any pre-existing mirror symlink.
766        if let Err(e) = prepare_local_sync_container(dest_dir, &local_path) {
767            return PathSyncResult {
768                remote_path: remote_path.to_string(),
769                local_path: local_path.clone(),
770                success: false,
771                error: Some(e),
772                duration_ms: start.elapsed().as_millis() as u64,
773                ..Default::default()
774            };
775        }
776
777        // Build rsync command
778        // NOTE: NO --delete flag! Safe additive sync only.
779        let arg_protection = detect_rsync_arg_protection();
780        let protect_args_supported = arg_protection.is_supported();
781        let remote_spec = remote_spec_for_rsync(host, &expanded_path, protect_args_supported);
782        let ssh_opts = strict_ssh_command_for_rsync(self.connection_timeout);
783
784        let local_path_str = match local_path.to_str() {
785            Some(s) => s,
786            None => {
787                return PathSyncResult {
788                    remote_path: remote_path.to_string(),
789                    local_path,
790                    success: false,
791                    error: Some("Local path contains invalid UTF-8".to_string()),
792                    duration_ms: start.elapsed().as_millis() as u64,
793                    ..Default::default()
794                };
795            }
796        };
797
798        let timeout_str = self.transfer_timeout.to_string();
799        let mut cmd = Command::new("rsync");
800        cmd.args(["-avz", "--links", "--safe-links", "--stats", "--partial"]);
801        if let Some(flag) = arg_protection.flag() {
802            cmd.arg(flag);
803        }
804        cmd.args([
805            "--timeout",
806            &timeout_str,
807            "-e",
808            &ssh_opts,
809            "--",
810            &remote_spec,
811            local_path_str,
812        ]);
813
814        tracing::debug!(
815            host = %host,
816            remote_path = %expanded_path,
817            local_path = %local_path.display(),
818            "starting rsync"
819        );
820
821        let output = match cmd.output() {
822            Ok(o) => o,
823            Err(e) => {
824                return PathSyncResult {
825                    remote_path: remote_path.to_string(),
826                    local_path,
827                    success: false,
828                    error: Some(format!("Failed to execute rsync: {}", e)),
829                    duration_ms: start.elapsed().as_millis() as u64,
830                    ..Default::default()
831                };
832            }
833        };
834
835        let duration_ms = start.elapsed().as_millis() as u64;
836        let stdout = String::from_utf8_lossy(&output.stdout);
837        let stderr = String::from_utf8_lossy(&output.stderr);
838
839        if !output.status.success() {
840            // Check for specific error types
841            let error_msg = if stderr.contains("Connection refused")
842                || stderr.contains("Connection timed out")
843            {
844                format!("SSH connection failed: {}", stderr.trim())
845            } else if is_host_key_verification_failure(&stderr) {
846                host_key_verification_error(host)
847            } else if stderr.contains("No such file or directory") {
848                format!("Remote path not found: {}", expanded_path)
849            } else if stderr.contains("Permission denied") {
850                format!("Permission denied: {}", stderr.trim())
851            } else {
852                format!("rsync failed: {}", stderr.trim())
853            };
854
855            tracing::warn!(
856                host = %host,
857                remote_path = %expanded_path,
858                error = %error_msg,
859                "rsync failed"
860            );
861
862            return PathSyncResult {
863                remote_path: remote_path.to_string(),
864                local_path,
865                success: false,
866                error: Some(error_msg),
867                duration_ms,
868                ..Default::default()
869            };
870        }
871
872        // Parse stats from rsync output
873        let stats = parse_rsync_stats(&stdout);
874
875        tracing::info!(
876            host = %host,
877            remote_path = %expanded_path,
878            files = stats.files_transferred,
879            bytes = stats.bytes_transferred,
880            duration_ms,
881            "rsync completed"
882        );
883
884        PathSyncResult {
885            remote_path: remote_path.to_string(),
886            local_path,
887            files_transferred: stats.files_transferred,
888            bytes_transferred: stats.bytes_transferred,
889            success: true,
890            error: None,
891            duration_ms,
892        }
893    }
894
895    /// Sync a single path using rsync invoked through WSL (`wsl rsync …`).
896    ///
897    /// Used on Windows when native rsync is absent but WSL with rsync is available.
898    /// WSL paths use the `\\wsl$\…` UNC convention for the local destination.
899    fn sync_path_wsl_rsync(
900        &self,
901        host: &str,
902        remote_path: &str,
903        dest_dir: &Path,
904        remote_home: Option<&str>,
905    ) -> PathSyncResult {
906        let start = Instant::now();
907
908        if remote_path.starts_with('~') && remote_home.is_none() {
909            let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
910            return PathSyncResult {
911                remote_path: remote_path.to_string(),
912                local_path,
913                success: false,
914                error: Some(
915                    "Cannot expand '~' in remote path; failed to determine remote home directory"
916                        .to_string(),
917                ),
918                duration_ms: start.elapsed().as_millis() as u64,
919                ..Default::default()
920            };
921        }
922
923        let expanded_path = Self::expand_tilde_with_home(remote_path, remote_home);
924        let safe_name = path_to_safe_dirname(remote_path);
925        let local_path = dest_dir.join(&safe_name);
926
927        if let Err(e) = prepare_local_sync_container(dest_dir, &local_path) {
928            return PathSyncResult {
929                remote_path: remote_path.to_string(),
930                local_path,
931                success: false,
932                error: Some(e),
933                duration_ms: start.elapsed().as_millis() as u64,
934                ..Default::default()
935            };
936        }
937
938        let local_path_str = match local_path.to_str() {
939            Some(s) => s,
940            None => {
941                return PathSyncResult {
942                    remote_path: remote_path.to_string(),
943                    local_path,
944                    success: false,
945                    error: Some("Local path contains invalid UTF-8".to_string()),
946                    duration_ms: start.elapsed().as_millis() as u64,
947                    ..Default::default()
948                };
949            }
950        };
951
952        // Convert Windows path to a WSL-accessible path.
953        // WSL can access Windows paths via /mnt/<drive>/... conventions.
954        // E.g. C:\Users\george\AppData\... → /mnt/c/Users/george/AppData/...
955        let wsl_dest = windows_path_to_wsl(local_path_str);
956
957        let remote_spec = remote_spec_for_rsync(host, &expanded_path, true);
958        let ssh_opts = strict_ssh_command_for_rsync(self.connection_timeout);
959        let timeout_str = self.transfer_timeout.to_string();
960
961        let mut cmd = Command::new("wsl");
962        cmd.args([
963            "rsync",
964            "-avz",
965            "--links",
966            "--safe-links",
967            "--stats",
968            "--partial",
969        ]);
970        // WSL rsync is the real rsync (not openrsync), so --protect-args is safe.
971        cmd.arg("--protect-args");
972        cmd.args([
973            "--timeout",
974            &timeout_str,
975            "-e",
976            &ssh_opts,
977            "--",
978            &remote_spec,
979            &wsl_dest,
980        ]);
981
982        tracing::debug!(
983            host = %host,
984            remote_path = %expanded_path,
985            local_path = %local_path.display(),
986            wsl_dest = %wsl_dest,
987            "starting wsl rsync"
988        );
989
990        let output = match cmd.output() {
991            Ok(o) => o,
992            Err(e) => {
993                return PathSyncResult {
994                    remote_path: remote_path.to_string(),
995                    local_path,
996                    success: false,
997                    error: Some(format!("Failed to execute wsl rsync: {}", e)),
998                    duration_ms: start.elapsed().as_millis() as u64,
999                    ..Default::default()
1000                };
1001            }
1002        };
1003
1004        let duration_ms = start.elapsed().as_millis() as u64;
1005        let stdout = String::from_utf8_lossy(&output.stdout);
1006        let stderr = String::from_utf8_lossy(&output.stderr);
1007
1008        if !output.status.success() {
1009            let error_msg = if stderr.contains("Connection refused")
1010                || stderr.contains("Connection timed out")
1011            {
1012                format!("SSH connection failed: {}", stderr.trim())
1013            } else if is_host_key_verification_failure(&stderr) {
1014                host_key_verification_error(host)
1015            } else if stderr.contains("No such file or directory") {
1016                format!("Remote path not found: {}", expanded_path)
1017            } else if stderr.contains("Permission denied") {
1018                format!("Permission denied: {}", stderr.trim())
1019            } else {
1020                format!("wsl rsync failed: {}", stderr.trim())
1021            };
1022
1023            tracing::warn!(
1024                host = %host,
1025                remote_path = %expanded_path,
1026                error = %error_msg,
1027                "wsl rsync failed"
1028            );
1029
1030            return PathSyncResult {
1031                remote_path: remote_path.to_string(),
1032                local_path,
1033                success: false,
1034                error: Some(error_msg),
1035                duration_ms,
1036                ..Default::default()
1037            };
1038        }
1039
1040        let stats = parse_rsync_stats(&stdout);
1041
1042        tracing::info!(
1043            host = %host,
1044            remote_path = %expanded_path,
1045            files = stats.files_transferred,
1046            bytes = stats.bytes_transferred,
1047            duration_ms,
1048            "wsl rsync completed"
1049        );
1050
1051        PathSyncResult {
1052            remote_path: remote_path.to_string(),
1053            local_path,
1054            files_transferred: stats.files_transferred,
1055            bytes_transferred: stats.bytes_transferred,
1056            success: true,
1057            error: None,
1058            duration_ms,
1059        }
1060    }
1061
1062    /// Sync a single path using SCP after a physical `find -P` regular-file listing.
1063    ///
1064    /// This method delegates all authentication to the native system `scp`/`ssh`
1065    /// binary, which correctly reads `~/.ssh/config`, the OpenSSH agent (including
1066    /// the Windows OpenSSH agent on Windows), and all standard key locations.
1067    ///
1068    /// This avoids the "No valid authentication method found" failure that occurs
1069    /// in the `ssh2`-based SFTP path on Windows, where the library does not
1070    /// integrate with the Windows OpenSSH agent (`ssh-agent.exe`).
1071    fn sync_path_scp(
1072        &self,
1073        host: &str,
1074        remote_path: &str,
1075        dest_dir: &Path,
1076        remote_home: Option<&str>,
1077    ) -> PathSyncResult {
1078        let start = Instant::now();
1079
1080        if remote_path.starts_with('~') && remote_home.is_none() {
1081            let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
1082            return PathSyncResult {
1083                remote_path: remote_path.to_string(),
1084                local_path,
1085                success: false,
1086                error: Some(
1087                    "Cannot expand '~' in remote path; failed to determine remote home directory"
1088                        .to_string(),
1089                ),
1090                duration_ms: start.elapsed().as_millis() as u64,
1091                ..Default::default()
1092            };
1093        }
1094
1095        let expanded_path = Self::expand_tilde_with_home(remote_path, remote_home);
1096        let safe_name = path_to_safe_dirname(remote_path);
1097        let local_path = dest_dir.join(&safe_name);
1098
1099        if let Err(e) = prepare_local_sync_container(dest_dir, &local_path) {
1100            return PathSyncResult {
1101                remote_path: remote_path.to_string(),
1102                local_path,
1103                success: false,
1104                error: Some(e),
1105                duration_ms: start.elapsed().as_millis() as u64,
1106                ..Default::default()
1107            };
1108        }
1109
1110        // `scp -r` follows symlinks on some OpenSSH paths. Enumerate only regular
1111        // files with physical traversal first, then copy those files individually.
1112        let connect_timeout = self.connection_timeout.to_string();
1113        let find_command = remote_find_regular_files_command(&expanded_path);
1114
1115        tracing::debug!(
1116            host = %host,
1117            remote_path = %expanded_path,
1118            local_path = %local_path.display(),
1119            "listing regular files for scp sync"
1120        );
1121
1122        let timeout_secs = self.connection_timeout.max(1);
1123        let mut cmd = Command::new("ssh");
1124        cmd.args(strict_ssh_cli_tokens(timeout_secs))
1125            .arg("--")
1126            .arg(host)
1127            .arg(&find_command)
1128            .stdout(Stdio::piped())
1129            .stderr(Stdio::piped());
1130
1131        let output = match cmd.spawn().and_then(|child| {
1132            wait_for_child_output_with_timeout(child, Duration::from_secs(timeout_secs))
1133        }) {
1134            Ok(Some(o)) => o,
1135            Ok(None) => {
1136                return PathSyncResult {
1137                    remote_path: remote_path.to_string(),
1138                    local_path,
1139                    success: false,
1140                    error: Some(format!(
1141                        "SSH file listing timed out after {timeout_secs} seconds"
1142                    )),
1143                    duration_ms: start.elapsed().as_millis() as u64,
1144                    ..Default::default()
1145                };
1146            }
1147            Err(e) => {
1148                return PathSyncResult {
1149                    remote_path: remote_path.to_string(),
1150                    local_path,
1151                    success: false,
1152                    error: Some(format!("Failed to execute ssh file listing: {}", e)),
1153                    duration_ms: start.elapsed().as_millis() as u64,
1154                    ..Default::default()
1155                };
1156            }
1157        };
1158
1159        let stderr = String::from_utf8_lossy(&output.stderr);
1160        if !output.status.success() {
1161            let error_msg = if stderr.contains("Connection refused")
1162                || stderr.contains("Connection timed out")
1163            {
1164                format!("SSH connection failed: {}", stderr.trim())
1165            } else if is_host_key_verification_failure(&stderr) {
1166                host_key_verification_error(host)
1167            } else if stderr.contains("No such file or directory") {
1168                format!("Remote path not found: {}", expanded_path)
1169            } else if stderr.contains("Permission denied") {
1170                format!("Permission denied: {}", stderr.trim())
1171            } else {
1172                format!("Remote file listing failed: {}", stderr.trim())
1173            };
1174
1175            tracing::warn!(
1176                host = %host,
1177                remote_path = %expanded_path,
1178                error = %error_msg,
1179                "scp file listing failed"
1180            );
1181
1182            return PathSyncResult {
1183                remote_path: remote_path.to_string(),
1184                local_path,
1185                success: false,
1186                error: Some(error_msg),
1187                duration_ms: start.elapsed().as_millis() as u64,
1188                ..Default::default()
1189            };
1190        }
1191
1192        let remote_files = parse_null_terminated_utf8_paths(&output.stdout);
1193        let remote_root = Path::new(&expanded_path);
1194        let leaf_name = Path::new(remote_path)
1195            .file_name()
1196            .and_then(|n| n.to_str())
1197            .unwrap_or("remote");
1198        let mut files_transferred = 0u64;
1199        let mut bytes_transferred = 0u64;
1200
1201        for remote_file in remote_files {
1202            let remote_file_path = Path::new(&remote_file);
1203            let Some(local_file) = remote_file_to_safe_local_path(
1204                remote_root,
1205                remote_file_path,
1206                &local_path,
1207                leaf_name,
1208            ) else {
1209                tracing::warn!(
1210                    remote_path = %remote_file,
1211                    root = %expanded_path,
1212                    "skipping scp file outside listed root"
1213                );
1214                continue;
1215            };
1216
1217            if let Err(e) = reject_local_symlink_below_root(&local_path, &local_file) {
1218                return PathSyncResult {
1219                    remote_path: remote_path.to_string(),
1220                    local_path,
1221                    success: false,
1222                    error: Some(e),
1223                    duration_ms: start.elapsed().as_millis() as u64,
1224                    ..Default::default()
1225                };
1226            }
1227
1228            if let Some(parent) = local_file.parent() {
1229                if let Err(e) = std::fs::create_dir_all(parent) {
1230                    return PathSyncResult {
1231                        remote_path: remote_path.to_string(),
1232                        local_path,
1233                        success: false,
1234                        error: Some(format!("Failed to create {}: {}", parent.display(), e)),
1235                        duration_ms: start.elapsed().as_millis() as u64,
1236                        ..Default::default()
1237                    };
1238                }
1239
1240                if let Err(e) = reject_local_symlink_below_root(&local_path, parent) {
1241                    return PathSyncResult {
1242                        remote_path: remote_path.to_string(),
1243                        local_path,
1244                        success: false,
1245                        error: Some(e),
1246                        duration_ms: start.elapsed().as_millis() as u64,
1247                        ..Default::default()
1248                    };
1249                }
1250            }
1251
1252            if let Err(e) = reject_local_symlink_below_root(&local_path, &local_file) {
1253                return PathSyncResult {
1254                    remote_path: remote_path.to_string(),
1255                    local_path,
1256                    success: false,
1257                    error: Some(e),
1258                    duration_ms: start.elapsed().as_millis() as u64,
1259                    ..Default::default()
1260                };
1261            }
1262
1263            let temp_path =
1264                unique_atomic_sidecar_path(&local_file, "download", "cass-sync-scp-download");
1265            let Some(temp_path_str) = temp_path.to_str() else {
1266                return PathSyncResult {
1267                    remote_path: remote_path.to_string(),
1268                    local_path,
1269                    success: false,
1270                    error: Some("Local path contains invalid UTF-8".to_string()),
1271                    duration_ms: start.elapsed().as_millis() as u64,
1272                    ..Default::default()
1273                };
1274            };
1275            if let Err(e) = std::fs::OpenOptions::new()
1276                .write(true)
1277                .create_new(true)
1278                .open(&temp_path)
1279                .and_then(|file| file.sync_all())
1280            {
1281                return PathSyncResult {
1282                    remote_path: remote_path.to_string(),
1283                    local_path,
1284                    success: false,
1285                    error: Some(format!("Failed to create {}: {}", temp_path.display(), e)),
1286                    duration_ms: start.elapsed().as_millis() as u64,
1287                    ..Default::default()
1288                };
1289            }
1290
1291            let remote_spec = remote_spec_for_scp(host, &remote_file);
1292            let mut cmd = Command::new("scp");
1293            cmd.args([
1294                "-B",
1295                "-o",
1296                &format!("ConnectTimeout={}", connect_timeout),
1297                "-o",
1298                "ServerAliveInterval=15",
1299                "-o",
1300                "ServerAliveCountMax=3",
1301                "-o",
1302                "StrictHostKeyChecking=yes",
1303                "--",
1304                &remote_spec,
1305                temp_path_str,
1306            ]);
1307
1308            let output = match cmd.output() {
1309                Ok(o) => o,
1310                Err(e) => {
1311                    return PathSyncResult {
1312                        remote_path: remote_path.to_string(),
1313                        local_path,
1314                        success: false,
1315                        error: Some(format!("Failed to execute scp: {}", e)),
1316                        duration_ms: start.elapsed().as_millis() as u64,
1317                        ..Default::default()
1318                    };
1319                }
1320            };
1321
1322            if !output.status.success() {
1323                let _ = std::fs::remove_file(&temp_path);
1324                let stderr = String::from_utf8_lossy(&output.stderr);
1325                let error_msg = if is_host_key_verification_failure(&stderr) {
1326                    host_key_verification_error(host)
1327                } else if stderr.contains("Permission denied") {
1328                    format!("Permission denied: {}", stderr.trim())
1329                } else {
1330                    format!("scp failed: {}", stderr.trim())
1331                };
1332
1333                tracing::warn!(
1334                    host = %host,
1335                    remote_path = %remote_file,
1336                    error = %error_msg,
1337                    "scp file transfer failed"
1338                );
1339
1340                return PathSyncResult {
1341                    remote_path: remote_path.to_string(),
1342                    local_path,
1343                    success: false,
1344                    error: Some(error_msg),
1345                    duration_ms: start.elapsed().as_millis() as u64,
1346                    ..Default::default()
1347                };
1348            }
1349
1350            files_transferred += 1;
1351            if let Err(e) = sync_file_path(&temp_path) {
1352                return PathSyncResult {
1353                    remote_path: remote_path.to_string(),
1354                    local_path,
1355                    success: false,
1356                    error: Some(format!("Failed to sync {}: {}", temp_path.display(), e)),
1357                    duration_ms: start.elapsed().as_millis() as u64,
1358                    ..Default::default()
1359                };
1360            }
1361            if let Ok(metadata) = std::fs::metadata(&temp_path) {
1362                bytes_transferred = bytes_transferred.saturating_add(metadata.len());
1363            }
1364            if let Err(e) = replace_file_from_temp(&temp_path, &local_file) {
1365                return PathSyncResult {
1366                    remote_path: remote_path.to_string(),
1367                    local_path,
1368                    success: false,
1369                    error: Some(format!(
1370                        "Failed to publish {} to {}: {}",
1371                        temp_path.display(),
1372                        local_file.display(),
1373                        e
1374                    )),
1375                    duration_ms: start.elapsed().as_millis() as u64,
1376                    ..Default::default()
1377                };
1378            }
1379        }
1380
1381        let duration_ms = start.elapsed().as_millis() as u64;
1382
1383        tracing::info!(
1384            host = %host,
1385            remote_path = %expanded_path,
1386            files = files_transferred,
1387            bytes = bytes_transferred,
1388            duration_ms,
1389            "scp sync completed"
1390        );
1391
1392        PathSyncResult {
1393            remote_path: remote_path.to_string(),
1394            local_path,
1395            files_transferred,
1396            bytes_transferred,
1397            success: true,
1398            error: None,
1399            duration_ms,
1400        }
1401    }
1402
1403    /// Sync a single path using SFTP (fallback when rsync unavailable).
1404    ///
1405    /// Uses the ssh2 crate for SFTP transfers. Authenticates via SSH agent
1406    /// or key file from SSH config.
1407    fn sync_path_sftp(
1408        &self,
1409        host: &str,
1410        remote_path: &str,
1411        dest_dir: &Path,
1412        remote_home: Option<&str>,
1413    ) -> PathSyncResult {
1414        let start = Instant::now();
1415        if remote_path.starts_with('~') && remote_home.is_none() {
1416            let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
1417            return PathSyncResult {
1418                remote_path: remote_path.to_string(),
1419                local_path,
1420                success: false,
1421                error: Some(
1422                    "Cannot expand '~' in remote path; failed to determine remote home directory"
1423                        .to_string(),
1424                ),
1425                duration_ms: start.elapsed().as_millis() as u64,
1426                ..Default::default()
1427            };
1428        }
1429        let expanded_path = Self::expand_tilde_with_home(remote_path, remote_home);
1430        // Use raw remote_path for stability (independent of home expansion success)
1431        let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
1432
1433        // Create local directory without following any pre-existing mirror symlink.
1434        if let Err(e) = prepare_local_sync_container(dest_dir, &local_path) {
1435            return PathSyncResult {
1436                remote_path: remote_path.to_string(),
1437                local_path,
1438                success: false,
1439                error: Some(e),
1440                duration_ms: start.elapsed().as_millis() as u64,
1441                ..Default::default()
1442            };
1443        }
1444
1445        // Parse host to extract user if present (user@host format)
1446        let (ssh_user, ssh_host) = parse_ssh_host(host);
1447
1448        // Look up host in SSH config for connection details
1449        // First try matching by SSH config alias (Host line), then by actual hostname
1450        let ssh_config = discover_ssh_hosts();
1451        let host_config = ssh_config.iter().find(|h| h.name == ssh_host).or_else(|| {
1452            ssh_config
1453                .iter()
1454                .find(|h| h.hostname.as_deref() == Some(ssh_host))
1455        });
1456
1457        // Determine connection parameters
1458        let hostname = host_config
1459            .and_then(|h| h.hostname.as_deref())
1460            .unwrap_or(ssh_host);
1461        let port = host_config.and_then(|h| h.port).unwrap_or(22);
1462        // Resolve username deterministically; never guess with a sentinel value.
1463        let username = match first_nonblank_username([
1464            ssh_user,
1465            host_config.and_then(|h| h.user.as_deref()),
1466        ])
1467        .or_else(|| env_username("USER"))
1468        .or_else(|| env_username("LOGNAME"))
1469        {
1470            Some(user) => user,
1471            None => {
1472                return PathSyncResult {
1473                    remote_path: remote_path.to_string(),
1474                    local_path,
1475                    success: false,
1476                    error: Some(format!(
1477                        "Unable to determine SSH username for host '{}' (missing/blank user@host, SSH config user, USER, and LOGNAME)",
1478                        host
1479                    )),
1480                    duration_ms: start.elapsed().as_millis() as u64,
1481                    ..Default::default()
1482                };
1483            }
1484        };
1485        let identity_file = host_config.and_then(|h| h.identity_file.as_deref());
1486
1487        tracing::debug!(
1488            hostname = %hostname,
1489            port,
1490            username = %username,
1491            identity_file = ?identity_file,
1492            remote_path = %expanded_path,
1493            "SFTP connection parameters"
1494        );
1495
1496        // Connect via TCP with connection timeout
1497        let conn_timeout = std::time::Duration::from_secs(self.connection_timeout);
1498        let addr = format!("{}:{}", hostname, port);
1499        let sock_addr: std::net::SocketAddr = match addr.parse().or_else(|_| {
1500            // Resolve hostname to socket address
1501            use std::net::ToSocketAddrs;
1502            (hostname, port)
1503                .to_socket_addrs()
1504                .ok()
1505                .and_then(|mut addrs| addrs.next())
1506                .ok_or(std::io::Error::new(
1507                    std::io::ErrorKind::InvalidInput,
1508                    "cannot resolve hostname",
1509                ))
1510        }) {
1511            Ok(a) => a,
1512            Err(e) => {
1513                return PathSyncResult {
1514                    remote_path: remote_path.to_string(),
1515                    local_path,
1516                    success: false,
1517                    error: Some(format!("DNS resolution failed for {hostname}:{port}: {e}")),
1518                    duration_ms: start.elapsed().as_millis() as u64,
1519                    ..Default::default()
1520                };
1521            }
1522        };
1523        let tcp = match TcpStream::connect_timeout(&sock_addr, conn_timeout) {
1524            Ok(t) => t,
1525            Err(e) => {
1526                return PathSyncResult {
1527                    remote_path: remote_path.to_string(),
1528                    local_path,
1529                    success: false,
1530                    error: Some(format!(
1531                        "TCP connection failed to {}:{}: {}",
1532                        hostname, port, e
1533                    )),
1534                    duration_ms: start.elapsed().as_millis() as u64,
1535                    ..Default::default()
1536                };
1537            }
1538        };
1539
1540        // Set TCP read/write timeout (use transfer_timeout, not connection_timeout)
1541        let timeout = std::time::Duration::from_secs(self.transfer_timeout);
1542        if let Err(e) = tcp.set_read_timeout(Some(timeout)) {
1543            tracing::warn!("Failed to set TCP read timeout: {}", e);
1544        }
1545        if let Err(e) = tcp.set_write_timeout(Some(timeout)) {
1546            tracing::warn!("Failed to set TCP write timeout: {}", e);
1547        }
1548        let tcp_shutdown = tcp.try_clone().ok();
1549
1550        // Create SSH session
1551        let mut session = match Session::new() {
1552            Ok(s) => s,
1553            Err(e) => {
1554                let _ = tcp.shutdown(Shutdown::Both);
1555                return PathSyncResult {
1556                    remote_path: remote_path.to_string(),
1557                    local_path,
1558                    success: false,
1559                    error: Some(format!("Failed to create SSH session: {}", e)),
1560                    duration_ms: start.elapsed().as_millis() as u64,
1561                    ..Default::default()
1562                };
1563            }
1564        };
1565
1566        session.set_tcp_stream(tcp);
1567        let close_connections = |session: &mut Session, reason: &str| {
1568            let _ = session.disconnect(None, reason, None);
1569            if let Some(stream) = tcp_shutdown.as_ref() {
1570                let _ = stream.shutdown(Shutdown::Both);
1571            }
1572        };
1573
1574        if let Err(e) = session.handshake() {
1575            close_connections(&mut session, "handshake failed");
1576            return PathSyncResult {
1577                remote_path: remote_path.to_string(),
1578                local_path,
1579                success: false,
1580                error: Some(format!("SSH handshake failed: {}", e)),
1581                duration_ms: start.elapsed().as_millis() as u64,
1582                ..Default::default()
1583            };
1584        }
1585
1586        // Authenticate - try agent first, then key file
1587        if let Err(e) = self.authenticate_ssh(&session, &username, identity_file) {
1588            close_connections(&mut session, "authentication failed");
1589            return PathSyncResult {
1590                remote_path: remote_path.to_string(),
1591                local_path,
1592                success: false,
1593                error: Some(format!("SSH authentication failed: {}", e)),
1594                duration_ms: start.elapsed().as_millis() as u64,
1595                ..Default::default()
1596            };
1597        }
1598
1599        // Open SFTP session
1600        let sftp = match session.sftp() {
1601            Ok(s) => s,
1602            Err(e) => {
1603                close_connections(&mut session, "sftp open failed");
1604                return PathSyncResult {
1605                    remote_path: remote_path.to_string(),
1606                    local_path,
1607                    success: false,
1608                    error: Some(format!("Failed to open SFTP session: {}", e)),
1609                    duration_ms: start.elapsed().as_millis() as u64,
1610                    ..Default::default()
1611                };
1612            }
1613        };
1614
1615        tracing::info!(
1616            host = %host,
1617            remote_path = %expanded_path,
1618            local_path = %local_path.display(),
1619            "starting SFTP sync"
1620        );
1621
1622        // Recursively download the remote path
1623        let mut files_transferred = 0u64;
1624        let mut bytes_transferred = 0u64;
1625
1626        // For consistency with rsync and scp, we should create a subdirectory
1627        // with the remote path's leaf name inside the container directory.
1628        let leaf_name = Path::new(remote_path)
1629            .file_name()
1630            .and_then(|n| n.to_str())
1631            .unwrap_or("remote");
1632        let target_local_path = local_path.join(leaf_name);
1633
1634        if let Err(e) = self.sftp_download_recursive(
1635            &sftp,
1636            Path::new(&expanded_path),
1637            &target_local_path,
1638            &local_path,
1639            &mut files_transferred,
1640            &mut bytes_transferred,
1641        ) {
1642            close_connections(&mut session, "sftp download failed");
1643            return PathSyncResult {
1644                remote_path: remote_path.to_string(),
1645                local_path,
1646                files_transferred,
1647                bytes_transferred,
1648                success: false,
1649                error: Some(format!("SFTP download failed: {}", e)),
1650                duration_ms: start.elapsed().as_millis() as u64,
1651            };
1652        }
1653
1654        let duration_ms = start.elapsed().as_millis() as u64;
1655
1656        tracing::info!(
1657            host = %host,
1658            remote_path = %expanded_path,
1659            files = files_transferred,
1660            bytes = bytes_transferred,
1661            duration_ms,
1662            "SFTP sync completed"
1663        );
1664
1665        close_connections(&mut session, "sync complete");
1666        PathSyncResult {
1667            remote_path: remote_path.to_string(),
1668            local_path,
1669            files_transferred,
1670            bytes_transferred,
1671            success: true,
1672            error: None,
1673            duration_ms,
1674        }
1675    }
1676
1677    /// Authenticate SSH session using agent or key file.
1678    fn authenticate_ssh(
1679        &self,
1680        session: &Session,
1681        username: &str,
1682        identity_file: Option<&str>,
1683    ) -> Result<(), String> {
1684        // Try SSH agent first
1685        if let Ok(mut agent) = session.agent()
1686            && agent.connect().is_ok()
1687            && agent.list_identities().is_ok()
1688        {
1689            for identity in agent.identities().unwrap_or_default() {
1690                if agent.userauth(username, &identity).is_ok() && session.authenticated() {
1691                    tracing::debug!("Authenticated via SSH agent");
1692                    return Ok(());
1693                }
1694            }
1695        }
1696
1697        // Try key file if specified
1698        if let Some(key_path) = identity_file {
1699            let key_path_expanded = expand_tilde_local(key_path);
1700            let key_path_buf = Path::new(&key_path_expanded);
1701
1702            if key_path_buf.exists()
1703                && session
1704                    .userauth_pubkey_file(username, None, key_path_buf, None)
1705                    .is_ok()
1706                && session.authenticated()
1707            {
1708                tracing::debug!(key = %key_path_buf.display(), "Authenticated via key file");
1709                return Ok(());
1710            }
1711        }
1712
1713        // Try default key locations
1714        if let Some(home) = dirs::home_dir() {
1715            for key_name in ["id_ed25519", "id_rsa", "id_ecdsa"] {
1716                let key_path = home.join(".ssh").join(key_name);
1717                if key_path.exists()
1718                    && session
1719                        .userauth_pubkey_file(username, None, &key_path, None)
1720                        .is_ok()
1721                    && session.authenticated()
1722                {
1723                    tracing::debug!(key = %key_path.display(), "Authenticated via default key");
1724                    return Ok(());
1725                }
1726            }
1727        }
1728
1729        Err(format!(
1730            "No valid authentication method found for user '{}'",
1731            username
1732        ))
1733    }
1734
1735    /// Recursively download a remote path via SFTP.
1736    fn sftp_download_recursive(
1737        &self,
1738        sftp: &Sftp,
1739        remote_path: &Path,
1740        local_path: &Path,
1741        local_root: &Path,
1742        files_transferred: &mut u64,
1743        bytes_transferred: &mut u64,
1744    ) -> Result<(), String> {
1745        // Use lstat so a remote symlink is classified as a symlink rather than
1746        // followed to a file or directory outside the configured source root.
1747        let stat = sftp
1748            .lstat(remote_path)
1749            .map_err(|e| format!("Failed to lstat {}: {}", remote_path.display(), e))?;
1750
1751        if sftp_file_stat_is_symlink(&stat) {
1752            tracing::warn!(
1753                path = %remote_path.display(),
1754                "Skipping remote symlink during SFTP sync"
1755            );
1756            return Ok(());
1757        }
1758
1759        if stat.is_dir() {
1760            // Create local directory for this directory item
1761            reject_local_symlink_below_root(local_root, local_path)?;
1762            std::fs::create_dir_all(local_path)
1763                .map_err(|e| format!("Failed to create {}: {}", local_path.display(), e))?;
1764            reject_local_symlink_below_root(local_root, local_path)?;
1765
1766            // List directory contents
1767            let entries = sftp
1768                .readdir(remote_path)
1769                .map_err(|e| format!("Failed to list {}: {}", remote_path.display(), e))?;
1770
1771            for (entry_path, _entry_stat) in entries {
1772                let Some(file_name) = sftp_entry_file_name(&entry_path, remote_path) else {
1773                    continue;
1774                };
1775
1776                let entry_stat = sftp
1777                    .lstat(&entry_path)
1778                    .map_err(|e| format!("Failed to lstat {}: {}", entry_path.display(), e))?;
1779                if sftp_file_stat_is_symlink(&entry_stat) {
1780                    tracing::warn!(
1781                        path = %entry_path.display(),
1782                        "Skipping remote symlink during SFTP sync"
1783                    );
1784                    continue;
1785                }
1786
1787                let local_entry = local_path.join(file_name);
1788
1789                if entry_stat.is_dir() {
1790                    // Recurse into subdirectory
1791                    self.sftp_download_recursive(
1792                        sftp,
1793                        &entry_path,
1794                        &local_entry,
1795                        local_root,
1796                        files_transferred,
1797                        bytes_transferred,
1798                    )?;
1799                } else if entry_stat.is_file() {
1800                    // Download file
1801                    if self.sftp_download_file(
1802                        sftp,
1803                        &entry_path,
1804                        &local_entry,
1805                        local_root,
1806                        bytes_transferred,
1807                    )? {
1808                        *files_transferred += 1;
1809                    }
1810                }
1811                // Skip symlinks and other types for safety
1812            }
1813        } else if stat.is_file() {
1814            // Ensure the parent directory exists
1815            if let Some(parent) = local_path.parent() {
1816                reject_local_symlink_below_root(local_root, parent)?;
1817                std::fs::create_dir_all(parent).map_err(|e| {
1818                    format!("Failed to create local dir {}: {}", parent.display(), e)
1819                })?;
1820                reject_local_symlink_below_root(local_root, parent)?;
1821            }
1822
1823            if self.sftp_download_file(
1824                sftp,
1825                remote_path,
1826                local_path,
1827                local_root,
1828                bytes_transferred,
1829            )? {
1830                *files_transferred += 1;
1831            }
1832        } else {
1833            // Not a regular file or directory (symlink, socket, etc.) - skip with warning
1834            tracing::warn!(
1835                path = %remote_path.display(),
1836                "Skipping remote path: not a regular file or directory"
1837            );
1838        }
1839
1840        Ok(())
1841    }
1842
1843    /// Download a single file via SFTP.
1844    fn sftp_download_file(
1845        &self,
1846        sftp: &Sftp,
1847        remote_path: &Path,
1848        local_path: &Path,
1849        local_root: &Path,
1850        bytes_transferred: &mut u64,
1851    ) -> Result<bool, String> {
1852        let stat = sftp
1853            .lstat(remote_path)
1854            .map_err(|e| format!("Failed to lstat {}: {}", remote_path.display(), e))?;
1855        if sftp_file_stat_is_symlink(&stat) {
1856            tracing::warn!(
1857                path = %remote_path.display(),
1858                "Skipping remote symlink during SFTP sync"
1859            );
1860            return Ok(false);
1861        }
1862        if !stat.is_file() {
1863            tracing::warn!(
1864                path = %remote_path.display(),
1865                "Skipping remote path: not a regular file"
1866            );
1867            return Ok(false);
1868        }
1869
1870        let mut remote_file = sftp
1871            .open(remote_path)
1872            .map_err(|e| format!("Failed to open {}: {}", remote_path.display(), e))?;
1873
1874        reject_local_symlink_below_root(local_root, local_path)?;
1875
1876        let temp_path = unique_atomic_sidecar_path(local_path, "download", "cass-sync-download");
1877        let mut local_file = std::fs::OpenOptions::new()
1878            .write(true)
1879            .create_new(true)
1880            .open(&temp_path)
1881            .map_err(|e| format!("Failed to create {}: {}", temp_path.display(), e))?;
1882
1883        // Transfer in chunks
1884        let mut buffer = [0u8; 32768]; // 32KB chunks
1885        loop {
1886            let bytes_read = remote_file
1887                .read(&mut buffer)
1888                .map_err(|e| format!("Failed to read {}: {}", remote_path.display(), e))?;
1889
1890            if bytes_read == 0 {
1891                break;
1892            }
1893
1894            local_file
1895                .write_all(&buffer[..bytes_read])
1896                .map_err(|e| format!("Failed to write {}: {}", local_path.display(), e))?;
1897
1898            *bytes_transferred += bytes_read as u64;
1899        }
1900
1901        tracing::trace!(
1902            remote = %remote_path.display(),
1903            local = %local_path.display(),
1904            "downloaded file"
1905        );
1906
1907        local_file
1908            .sync_all()
1909            .map_err(|e| format!("Failed to sync {}: {}", temp_path.display(), e))?;
1910        drop(local_file);
1911        replace_file_from_temp(&temp_path, local_path).map_err(|e| {
1912            format!(
1913                "Failed to publish {} to {}: {}",
1914                temp_path.display(),
1915                local_path.display(),
1916                e
1917            )
1918        })?;
1919
1920        Ok(true)
1921    }
1922}
1923
1924/// Resolve an SFTP entry's basename for local mirroring.
1925fn sftp_entry_file_name<'a>(entry_path: &'a Path, parent_path: &Path) -> Option<&'a str> {
1926    let Some(file_name) = entry_path.file_name() else {
1927        tracing::warn!(
1928            parent = %parent_path.display(),
1929            entry = ?entry_path,
1930            "Skipping SFTP entry without a file name"
1931        );
1932        return None;
1933    };
1934
1935    let Some(file_name) = file_name.to_str() else {
1936        tracing::warn!(
1937            parent = %parent_path.display(),
1938            entry = ?entry_path,
1939            "Skipping SFTP entry with non-UTF-8 file name"
1940        );
1941        return None;
1942    };
1943
1944    if file_name.is_empty() {
1945        tracing::warn!(
1946            parent = %parent_path.display(),
1947            entry = ?entry_path,
1948            "Skipping SFTP entry with empty file name"
1949        );
1950        return None;
1951    }
1952
1953    if file_name == "." || file_name == ".." {
1954        return None;
1955    }
1956
1957    Some(file_name)
1958}
1959
1960/// Check whether the `scp` executable exists on this system.
1961///
1962/// Uses a simple PATH search rather than running `scp` (which exits non-zero
1963/// when invoked without arguments on many platforms).
1964fn which_scp_exists() -> bool {
1965    std::env::var_os("PATH")
1966        .map(|path_var| {
1967            std::env::split_paths(&path_var).any(|dir| {
1968                let candidate = dir.join(if cfg!(target_os = "windows") {
1969                    "scp.exe"
1970                } else {
1971                    "scp"
1972                });
1973                candidate.is_file()
1974            })
1975        })
1976        .unwrap_or(false)
1977}
1978
1979/// Convert a Windows absolute path to a WSL-accessible `/mnt/<drive>/…` path.
1980///
1981/// E.g. `C:\Users\george\AppData\Roaming\cass` →
1982///      `/mnt/c/Users/george/AppData/Roaming/cass`
1983///
1984/// If the path does not look like a Windows drive path it is returned unchanged.
1985fn windows_path_to_wsl(path: &str) -> String {
1986    // Match "C:\..." or "C:/..."
1987    if path.len() >= 3 {
1988        let bytes = path.as_bytes();
1989        if bytes[1] == b':' && (bytes[2] == b'\\' || bytes[2] == b'/') {
1990            let drive = (bytes[0] as char).to_lowercase().next().unwrap_or('c');
1991            let rest = path[3..].replace('\\', "/");
1992            return format!("/mnt/{}/{}", drive, rest);
1993        }
1994    }
1995    path.to_string()
1996}
1997
1998/// Parse SSH host string into (optional_user, host).
1999///
2000/// Examples:
2001/// - "myserver" -> (None, "myserver")
2002/// - "user@myserver" -> (Some("user"), "myserver")
2003fn parse_ssh_host(host: &str) -> (Option<&str>, &str) {
2004    if let Some(at_pos) = host.find('@') {
2005        let user = &host[..at_pos];
2006        let hostname = &host[at_pos + 1..];
2007        (Some(user), hostname)
2008    } else {
2009        (None, host)
2010    }
2011}
2012
2013fn first_nonblank_username<'a>(
2014    candidates: impl IntoIterator<Item = Option<&'a str>>,
2015) -> Option<String> {
2016    candidates.into_iter().find_map(|candidate| {
2017        let trimmed = candidate?.trim();
2018        if trimmed.is_empty() {
2019            None
2020        } else {
2021            Some(trimmed.to_string())
2022        }
2023    })
2024}
2025
2026fn env_username(key: &str) -> Option<String> {
2027    dotenvy::var(key)
2028        .ok()
2029        .and_then(|value| first_nonblank_username([Some(value.as_str())]))
2030}
2031
2032/// Expand tilde in local paths.
2033fn expand_tilde_local(path: &str) -> String {
2034    if let Some(stripped) = path.strip_prefix("~/")
2035        && let Some(home) = dirs::home_dir()
2036    {
2037        return format!("{}/{}", home.display(), stripped);
2038    } else if path == "~"
2039        && let Some(home) = dirs::home_dir()
2040    {
2041        return home.display().to_string();
2042    }
2043    path.to_string()
2044}
2045
2046/// Convert a remote path to a safe directory name.
2047///
2048/// Sanitizes path by:
2049/// - Removing leading `~` and `/`
2050/// - Replacing path separators and spaces with underscores
2051/// - Removing parent directory references (`..`) to prevent traversal attacks
2052/// - Removing current directory references (`.`)
2053/// - Appending a stable hash to prevent collisions (e.g., "foo/bar" vs "foo_bar")
2054pub fn path_to_safe_dirname(path: &str) -> String {
2055    use std::path::{Component, Path};
2056
2057    let path_obj = Path::new(path);
2058    let mut parts: Vec<&str> = Vec::new();
2059
2060    for component in path_obj.components() {
2061        match component {
2062            Component::Normal(name) => {
2063                if let Some(s) = name.to_str() {
2064                    // Skip "~" (home directory marker) and empty/dot-only components
2065                    if !s.is_empty() && s != "." && s != "~" {
2066                        parts.push(s);
2067                    }
2068                }
2069            }
2070            // Skip all traversal components for security
2071            Component::ParentDir
2072            | Component::CurDir
2073            | Component::RootDir
2074            | Component::Prefix(_) => {}
2075        }
2076    }
2077
2078    let cleaned = parts.join("_").replace([' ', '\\'], "_");
2079
2080    // Append stable hash to prevent collisions
2081    let hash = fnv1a_hash(path);
2082    let hash_suffix = format!("{:08x}", hash);
2083
2084    if cleaned.is_empty() {
2085        format!("root_{}", hash_suffix)
2086    } else {
2087        format!("{}_{}", cleaned, hash_suffix)
2088    }
2089}
2090
2091fn fnv1a_hash(text: &str) -> u64 {
2092    let mut hash: u64 = 0xcbf29ce484222325;
2093    for byte in text.bytes() {
2094        hash ^= u64::from(byte);
2095        hash = hash.wrapping_mul(0x100000001b3);
2096    }
2097    hash
2098}
2099
2100/// Parse transfer statistics from rsync --stats output.
2101fn parse_rsync_stats(output: &str) -> RsyncStats {
2102    let mut stats = RsyncStats::default();
2103
2104    for line in output.lines() {
2105        let line = line.trim();
2106
2107        // Parse "Number of regular files transferred: N"
2108        if line.starts_with("Number of regular files transferred:")
2109            && let Some(num_str) = line.split(':').nth(1)
2110        {
2111            stats.files_transferred = num_str.trim().replace(',', "").parse().unwrap_or(0);
2112        }
2113
2114        // Parse "Total transferred file size: N bytes"
2115        if line.starts_with("Total transferred file size:")
2116            && let Some(size_part) = line.split(':').nth(1)
2117        {
2118            // Handle formats like "1,234 bytes" or "1234"
2119            let size_str = size_part
2120                .split_whitespace()
2121                .next()
2122                .unwrap_or("0")
2123                .replace(',', "");
2124            stats.bytes_transferred = size_str.parse().unwrap_or(0);
2125        }
2126    }
2127
2128    stats
2129}
2130
2131// =============================================================================
2132// Sync Status Persistence
2133// =============================================================================
2134
2135/// Result of a sync operation for a source.
2136#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
2137#[serde(rename_all = "snake_case")]
2138pub enum SyncResult {
2139    /// Sync completed successfully.
2140    Success,
2141    /// Some paths synced, some failed.
2142    PartialFailure(String),
2143    /// Sync failed completely.
2144    Failed(String),
2145    /// Sync was skipped (e.g., dry run).
2146    #[default]
2147    Skipped,
2148}
2149
2150impl SyncResult {
2151    /// Short display label for the result.
2152    pub fn label(&self) -> &'static str {
2153        match self {
2154            Self::Success => "success",
2155            Self::PartialFailure(_) => "partial",
2156            Self::Failed(_) => "failed",
2157            Self::Skipped => "never",
2158        }
2159    }
2160
2161    /// Error text for partial/full failures.
2162    pub fn error_message(&self) -> Option<&str> {
2163        match self {
2164            Self::PartialFailure(error) | Self::Failed(error) => Some(error.as_str()),
2165            Self::Success | Self::Skipped => None,
2166        }
2167    }
2168}
2169
2170/// Scheduler action for a remote source.
2171#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
2172#[serde(rename_all = "snake_case")]
2173pub enum SourceSyncAction {
2174    /// The source is eligible to sync now.
2175    Sync,
2176    /// The source is healthy enough but not due under its configured schedule.
2177    Skip,
2178    /// The source is temporarily or operationally unsafe to sync automatically.
2179    Defer,
2180}
2181
2182impl SourceSyncAction {
2183    pub fn as_str(self) -> &'static str {
2184        match self {
2185            Self::Sync => "sync",
2186            Self::Skip => "skip",
2187            Self::Defer => "defer",
2188        }
2189    }
2190}
2191
2192/// Health class used by the adaptive source scheduler.
2193#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
2194#[serde(rename_all = "snake_case")]
2195pub enum SourceHealthKind {
2196    NeverSynced,
2197    Healthy,
2198    Stale,
2199    HighLatency,
2200    Flapping,
2201    AuthFailed,
2202    BackingOff,
2203}
2204
2205impl SourceHealthKind {
2206    pub fn as_str(self) -> &'static str {
2207        match self {
2208            Self::NeverSynced => "never_synced",
2209            Self::Healthy => "healthy",
2210            Self::Stale => "stale",
2211            Self::HighLatency => "high_latency",
2212            Self::Flapping => "flapping",
2213            Self::AuthFailed => "auth_failed",
2214            Self::BackingOff => "backing_off",
2215        }
2216    }
2217}
2218
2219/// Evidence-backed scheduling decision for one source.
2220#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
2221pub struct SourceSyncDecision {
2222    /// Decision action the scheduler would take.
2223    pub action: SourceSyncAction,
2224    /// Current health class inferred from durable sync state.
2225    pub health: SourceHealthKind,
2226    /// Coarse 0..=100 health score for sorting/explanations.
2227    pub health_score: u8,
2228    /// Age of the last sync attempt, capped at zero when clocks move backward.
2229    pub staleness_ms: Option<i64>,
2230    /// Coarse 0..=100 estimate of value from refreshing stale remote data.
2231    pub stale_value_score: u8,
2232    /// Whether an explicit operator request is overriding automatic scheduling.
2233    pub manual_override: bool,
2234    /// Whether the decision is using the conservative fallback path.
2235    pub fallback_active: bool,
2236    /// Next time this source is eligible under its configured schedule.
2237    pub next_eligible_sync_ms: Option<i64>,
2238    /// End of transient failure backoff when applicable.
2239    pub backoff_until_ms: Option<i64>,
2240    /// Human-readable evidence terms, stable enough for robot consumers.
2241    pub reasons: Vec<String>,
2242}
2243
2244impl SourceSyncDecision {
2245    fn evaluate(
2246        source: &SourceDefinition,
2247        info: Option<&SourceSyncInfo>,
2248        now_ms: i64,
2249        manual_override: bool,
2250    ) -> Self {
2251        let period_ms = sync_schedule_period_ms(source.sync_schedule);
2252        let next_eligible_sync_ms = info
2253            .and_then(|info| info.last_sync)
2254            .and_then(|last_sync| period_ms.map(|period| last_sync.saturating_add(period)));
2255        let backoff_until_ms = info.and_then(failure_backoff_until_ms);
2256        let staleness_ms = info.and_then(|info| {
2257            info.last_sync
2258                .map(|last_sync| now_ms.saturating_sub(last_sync).max(0))
2259        });
2260        let stale_value_score =
2261            stale_value_score_for_source(source.sync_schedule, staleness_ms, info);
2262        let mut reasons = Vec::new();
2263
2264        let health = match info {
2265            None => {
2266                reasons.push("no durable sync status exists for this source".to_string());
2267                SourceHealthKind::NeverSynced
2268            }
2269            Some(info) if info.last_sync.is_none() => {
2270                reasons.push("source has never completed or attempted a sync".to_string());
2271                SourceHealthKind::NeverSynced
2272            }
2273            Some(info) if sync_result_auth_failure(&info.last_result) => {
2274                reasons
2275                    .push("last sync failed with an authentication or host-key error".to_string());
2276                SourceHealthKind::AuthFailed
2277            }
2278            Some(info) if matches!(info.last_result, SyncResult::PartialFailure(_)) => {
2279                reasons.push("last sync partially succeeded and partially failed".to_string());
2280                SourceHealthKind::Flapping
2281            }
2282            Some(info)
2283                if info.consecutive_failures > 0
2284                    && backoff_until_ms.is_some_and(|until| until > now_ms) =>
2285            {
2286                reasons.push(format!(
2287                    "{} consecutive failure(s) are inside retry backoff",
2288                    info.consecutive_failures
2289                ));
2290                SourceHealthKind::BackingOff
2291            }
2292            Some(info) if matches!(info.last_result, SyncResult::Failed(_)) => {
2293                let error = info.last_result.error_message().unwrap_or("unknown error");
2294                reasons.push(format!(
2295                    "last sync failed completely ({error}); local fallback remains active"
2296                ));
2297                SourceHealthKind::Flapping
2298            }
2299            Some(info) if info.duration_ms >= SOURCE_HIGH_LATENCY_MS => {
2300                reasons.push(format!(
2301                    "last sync took {}ms, above {}ms high-latency guard",
2302                    info.duration_ms, SOURCE_HIGH_LATENCY_MS
2303                ));
2304                SourceHealthKind::HighLatency
2305            }
2306            Some(info) if sync_schedule_due(info.last_sync, period_ms, now_ms) => {
2307                reasons.push("configured sync schedule is due".to_string());
2308                SourceHealthKind::Stale
2309            }
2310            Some(_) => SourceHealthKind::Healthy,
2311        };
2312
2313        let fallback_active = matches!(
2314            health,
2315            SourceHealthKind::AuthFailed
2316                | SourceHealthKind::BackingOff
2317                | SourceHealthKind::Flapping
2318                | SourceHealthKind::HighLatency
2319        );
2320
2321        let mut action = if manual_override {
2322            reasons.push("explicit sync command overrides automatic scheduling".to_string());
2323            SourceSyncAction::Sync
2324        } else {
2325            automatic_source_sync_action(source.sync_schedule, health, info, now_ms)
2326        };
2327
2328        if !manual_override && matches!(health, SourceHealthKind::AuthFailed) {
2329            action = SourceSyncAction::Defer;
2330        }
2331
2332        if !manual_override && matches!(source.sync_schedule, SyncSchedule::Manual) {
2333            reasons.push("sync_schedule=manual requires an explicit sync command".to_string());
2334        }
2335
2336        if !manual_override
2337            && matches!(action, SourceSyncAction::Skip)
2338            && let Some(next_ms) = next_eligible_sync_ms
2339        {
2340            reasons.push(format!(
2341                "next scheduled sync is eligible at unix_ms={next_ms}"
2342            ));
2343        }
2344
2345        if reasons.is_empty() {
2346            reasons.push("source is healthy and within schedule".to_string());
2347        }
2348
2349        Self {
2350            action,
2351            health,
2352            health_score: health_score_for_source(health),
2353            staleness_ms,
2354            stale_value_score,
2355            manual_override,
2356            fallback_active,
2357            next_eligible_sync_ms,
2358            backoff_until_ms,
2359            reasons,
2360        }
2361    }
2362}
2363
2364/// Sync information for a single source.
2365#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
2366pub struct SourceSyncInfo {
2367    /// Timestamp of last sync attempt.
2368    pub last_sync: Option<i64>,
2369    /// Result of last sync.
2370    pub last_result: SyncResult,
2371    /// Number of files synced in last sync.
2372    pub files_synced: u64,
2373    /// Number of bytes transferred in last sync.
2374    pub bytes_transferred: u64,
2375    /// Duration of last sync in milliseconds.
2376    pub duration_ms: u64,
2377    /// Consecutive failed sync attempts, reset to zero by a fully successful sync.
2378    #[serde(default)]
2379    pub consecutive_failures: u32,
2380}
2381
2382impl SourceSyncInfo {
2383    /// Build sync info from a sync report using the current wall clock time.
2384    pub fn from_report(report: &SyncReport) -> Self {
2385        let last_result = report.sync_result();
2386        Self {
2387            last_sync: Some(current_unix_ms()),
2388            consecutive_failures: u32::from(!report.all_succeeded),
2389            last_result,
2390            files_synced: report.total_files(),
2391            bytes_transferred: report.total_bytes(),
2392            duration_ms: report.total_duration_ms,
2393        }
2394    }
2395}
2396
2397/// Persistent sync status for all sources.
2398#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
2399pub struct SyncStatus {
2400    /// Sync info per source (keyed by source name).
2401    pub sources: std::collections::HashMap<String, SourceSyncInfo>,
2402}
2403
2404impl SyncStatus {
2405    /// Load sync status from disk.
2406    pub fn load(data_dir: &Path) -> Result<Self, std::io::Error> {
2407        let path = Self::status_path(data_dir);
2408        match std::fs::read_to_string(&path) {
2409            Ok(content) => serde_json::from_str(&content)
2410                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
2411            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()),
2412            Err(e) => Err(e),
2413        }
2414    }
2415
2416    /// Save sync status to disk.
2417    ///
2418    /// Uses an atomic rename on Unix. On Windows, falls back to remove-then-rename
2419    /// because replacing an existing destination with `std::fs::rename` fails.
2420    pub fn save(&self, data_dir: &Path) -> Result<(), std::io::Error> {
2421        let path = Self::status_path(data_dir);
2422        if let Some(parent) = path.parent() {
2423            std::fs::create_dir_all(parent)?;
2424        }
2425        let content = serde_json::to_string_pretty(self)?;
2426        let tmp_path = unique_atomic_temp_path(&path);
2427        std::fs::write(&tmp_path, content)?;
2428        sync_file_path(&tmp_path)?;
2429        replace_file_from_temp(&tmp_path, &path)
2430    }
2431
2432    /// Update status for a source from a sync report.
2433    pub fn update(&mut self, source_name: &str, report: &SyncReport) {
2434        let previous_failures = self
2435            .get(source_name)
2436            .map(|info| info.consecutive_failures)
2437            .unwrap_or_default();
2438        let mut info = SourceSyncInfo::from_report(report);
2439        if report.all_succeeded {
2440            info.consecutive_failures = 0;
2441        } else {
2442            info.consecutive_failures = previous_failures.saturating_add(1);
2443        }
2444        self.set_info(source_name, info);
2445    }
2446
2447    /// Set status for a source from precomputed sync info.
2448    pub fn set_info(&mut self, source_name: &str, info: SourceSyncInfo) {
2449        self.sources.insert(source_name.to_string(), info);
2450    }
2451
2452    /// Drop sync status entries for sources that no longer exist.
2453    ///
2454    /// Returns `true` when at least one stale entry was removed.
2455    pub fn retain_sources<'a>(&mut self, source_names: impl IntoIterator<Item = &'a str>) -> bool {
2456        let allowed: std::collections::HashSet<&str> = source_names.into_iter().collect();
2457        let previous_len = self.sources.len();
2458        self.sources
2459            .retain(|source_name, _| allowed.contains(source_name.as_str()));
2460        self.sources.len() != previous_len
2461    }
2462
2463    /// Get sync info for a source.
2464    pub fn get(&self, source_name: &str) -> Option<&SourceSyncInfo> {
2465        self.sources.get(source_name)
2466    }
2467
2468    /// Evaluate automatic scheduling for one source at a deterministic timestamp.
2469    pub fn decision_for_source_at(
2470        &self,
2471        source: &SourceDefinition,
2472        now_ms: i64,
2473        manual_override: bool,
2474    ) -> SourceSyncDecision {
2475        SourceSyncDecision::evaluate(source, self.get(&source.name), now_ms, manual_override)
2476    }
2477
2478    /// Get the path to the status file.
2479    fn status_path(data_dir: &Path) -> PathBuf {
2480        data_dir.join("sync_status.json")
2481    }
2482}
2483
2484const SOURCE_HIGH_LATENCY_MS: u64 = 60_000;
2485const SOURCE_FAILURE_BACKOFF_BASE_MS: i64 = 5 * 60 * 1000;
2486const SOURCE_FAILURE_BACKOFF_MAX_MS: i64 = 60 * 60 * 1000;
2487
2488pub(crate) fn current_unix_ms() -> i64 {
2489    let now = std::time::SystemTime::now()
2490        .duration_since(std::time::UNIX_EPOCH)
2491        .unwrap_or_default()
2492        .as_millis();
2493    i64::try_from(now).unwrap_or(i64::MAX)
2494}
2495
2496fn sync_schedule_period_ms(schedule: SyncSchedule) -> Option<i64> {
2497    match schedule {
2498        SyncSchedule::Manual => None,
2499        SyncSchedule::Hourly => Some(60 * 60 * 1000),
2500        SyncSchedule::Daily => Some(24 * 60 * 60 * 1000),
2501    }
2502}
2503
2504fn sync_schedule_due(last_sync: Option<i64>, period_ms: Option<i64>, now_ms: i64) -> bool {
2505    match (last_sync, period_ms) {
2506        (None, _) => true,
2507        (Some(_), None) => false,
2508        (Some(last_sync), Some(period_ms)) => last_sync.saturating_add(period_ms) <= now_ms,
2509    }
2510}
2511
2512fn automatic_source_sync_action(
2513    schedule: SyncSchedule,
2514    health: SourceHealthKind,
2515    info: Option<&SourceSyncInfo>,
2516    now_ms: i64,
2517) -> SourceSyncAction {
2518    match health {
2519        SourceHealthKind::AuthFailed | SourceHealthKind::BackingOff => SourceSyncAction::Defer,
2520        _ if matches!(schedule, SyncSchedule::Manual) => SourceSyncAction::Skip,
2521        SourceHealthKind::NeverSynced | SourceHealthKind::Stale => SourceSyncAction::Sync,
2522        SourceHealthKind::Flapping | SourceHealthKind::HighLatency => {
2523            if sync_schedule_due(
2524                info.and_then(|info| info.last_sync),
2525                sync_schedule_period_ms(schedule),
2526                now_ms,
2527            ) {
2528                SourceSyncAction::Sync
2529            } else {
2530                SourceSyncAction::Skip
2531            }
2532        }
2533        SourceHealthKind::Healthy => {
2534            if sync_schedule_due(
2535                info.and_then(|info| info.last_sync),
2536                sync_schedule_period_ms(schedule),
2537                now_ms,
2538            ) {
2539                SourceSyncAction::Sync
2540            } else {
2541                SourceSyncAction::Skip
2542            }
2543        }
2544    }
2545}
2546
2547fn health_score_for_source(health: SourceHealthKind) -> u8 {
2548    match health {
2549        SourceHealthKind::Healthy => 100,
2550        SourceHealthKind::Stale => 75,
2551        SourceHealthKind::NeverSynced => 65,
2552        SourceHealthKind::HighLatency => 55,
2553        SourceHealthKind::Flapping => 40,
2554        SourceHealthKind::BackingOff => 25,
2555        SourceHealthKind::AuthFailed => 10,
2556    }
2557}
2558
2559fn stale_value_score_for_source(
2560    schedule: SyncSchedule,
2561    staleness_ms: Option<i64>,
2562    info: Option<&SourceSyncInfo>,
2563) -> u8 {
2564    let Some(info) = info else {
2565        return 100;
2566    };
2567    if info.last_sync.is_none() {
2568        return 100;
2569    }
2570
2571    let Some(staleness_ms) = staleness_ms else {
2572        return 100;
2573    };
2574
2575    let Some(period_ms) = sync_schedule_period_ms(schedule) else {
2576        return 0;
2577    };
2578
2579    let score = staleness_ms.saturating_mul(100) / period_ms.max(1);
2580    u8::try_from(score.clamp(0, 100)).unwrap_or(100)
2581}
2582
2583fn failure_backoff_until_ms(info: &SourceSyncInfo) -> Option<i64> {
2584    if info.consecutive_failures == 0 {
2585        return None;
2586    }
2587    let last_sync = info.last_sync?;
2588    let exponent = info.consecutive_failures.saturating_sub(1).min(4);
2589    let multiplier = 1_i64.checked_shl(exponent).unwrap_or(16);
2590    let backoff_ms = SOURCE_FAILURE_BACKOFF_BASE_MS
2591        .saturating_mul(multiplier)
2592        .min(SOURCE_FAILURE_BACKOFF_MAX_MS);
2593    Some(last_sync.saturating_add(backoff_ms))
2594}
2595
2596fn sync_result_auth_failure(result: &SyncResult) -> bool {
2597    let Some(error) = result.error_message() else {
2598        return false;
2599    };
2600    let error = error.to_ascii_lowercase();
2601    error.contains("permission denied")
2602        || error.contains("authentication")
2603        || error.contains("host key verification failed")
2604        || error.contains("known_hosts")
2605        || error.contains("no valid authentication")
2606}
2607
2608fn unique_atomic_temp_path(path: &Path) -> PathBuf {
2609    unique_atomic_sidecar_path(path, "tmp", "sync_status.json")
2610}
2611
2612fn replace_file_from_temp(temp_path: &Path, final_path: &Path) -> Result<(), std::io::Error> {
2613    #[cfg(windows)]
2614    {
2615        match std::fs::rename(temp_path, final_path) {
2616            Ok(()) => sync_parent_directory(final_path),
2617            Err(first_err)
2618                if final_path.exists()
2619                    && matches!(
2620                        first_err.kind(),
2621                        std::io::ErrorKind::AlreadyExists | std::io::ErrorKind::PermissionDenied
2622                    ) =>
2623            {
2624                let backup_path = unique_replace_backup_path(final_path);
2625                std::fs::rename(final_path, &backup_path).map_err(|backup_err| {
2626                    let _ = std::fs::remove_file(temp_path);
2627                    std::io::Error::other(format!(
2628                        "failed preparing backup {} before replacing {}: first error: {}; backup error: {}",
2629                        backup_path.display(),
2630                        final_path.display(),
2631                        first_err,
2632                        backup_err
2633                    ))
2634                })?;
2635                match std::fs::rename(temp_path, final_path) {
2636                    Ok(()) => {
2637                        let _ = std::fs::remove_file(&backup_path);
2638                        sync_parent_directory(final_path)
2639                    }
2640                    Err(second_err) => {
2641                        let restore_result = std::fs::rename(&backup_path, final_path);
2642                        match restore_result {
2643                            Ok(()) => {
2644                                let _ = std::fs::remove_file(temp_path);
2645                                sync_parent_directory(final_path).map_err(|sync_err| {
2646                                    std::io::Error::other(format!(
2647                                        "failed replacing {} with {}: first error: {}; second error: {}; restored original file but failed syncing parent directory: {}",
2648                                        final_path.display(),
2649                                        temp_path.display(),
2650                                        first_err,
2651                                        second_err,
2652                                        sync_err
2653                                    ))
2654                                })?;
2655                                Err(std::io::Error::new(
2656                                    second_err.kind(),
2657                                    format!(
2658                                        "failed replacing {} with {}: first error: {}; second error: {}; restored original file",
2659                                        final_path.display(),
2660                                        temp_path.display(),
2661                                        first_err,
2662                                        second_err
2663                                    ),
2664                                ))
2665                            }
2666                            Err(restore_err) => Err(std::io::Error::other(format!(
2667                                "failed replacing {} with {}: first error: {}; second error: {}; restore error: {}; temp file retained at {}",
2668                                final_path.display(),
2669                                temp_path.display(),
2670                                first_err,
2671                                second_err,
2672                                restore_err,
2673                                temp_path.display()
2674                            ))),
2675                        }
2676                    }
2677                }
2678            }
2679            Err(rename_err) => Err(rename_err),
2680        }
2681    }
2682
2683    #[cfg(not(windows))]
2684    {
2685        std::fs::rename(temp_path, final_path)?;
2686        sync_parent_directory(final_path)
2687    }
2688}
2689
2690fn sync_file_path(path: &Path) -> Result<(), std::io::Error> {
2691    std::fs::File::open(path)?.sync_all()
2692}
2693
2694#[cfg(not(windows))]
2695fn sync_parent_directory(path: &Path) -> Result<(), std::io::Error> {
2696    let Some(parent) = path.parent() else {
2697        return Ok(());
2698    };
2699    std::fs::File::open(parent)?.sync_all()
2700}
2701
2702#[cfg(windows)]
2703fn sync_parent_directory(_path: &Path) -> Result<(), std::io::Error> {
2704    Ok(())
2705}
2706
2707#[cfg(windows)]
2708fn unique_replace_backup_path(path: &Path) -> PathBuf {
2709    unique_atomic_sidecar_path(path, "bak", "sync_status.json")
2710}
2711
2712fn unique_atomic_sidecar_path(path: &Path, suffix: &str, fallback_name: &str) -> PathBuf {
2713    static NEXT_NONCE: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
2714
2715    let timestamp = std::time::SystemTime::now()
2716        .duration_since(std::time::UNIX_EPOCH)
2717        .unwrap_or_default()
2718        .as_nanos();
2719    let nonce = NEXT_NONCE.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2720    let file_name = path
2721        .file_name()
2722        .and_then(|name| name.to_str())
2723        .unwrap_or(fallback_name);
2724
2725    path.with_file_name(format!(
2726        ".{file_name}.{suffix}.{}.{}.{}",
2727        std::process::id(),
2728        timestamp,
2729        nonce
2730    ))
2731}
2732
2733#[cfg(test)]
2734mod tests {
2735    use super::*;
2736    use tempfile::TempDir;
2737
2738    #[test]
2739    fn test_path_to_safe_dirname() {
2740        let res = path_to_safe_dirname("~/.claude/projects");
2741        assert!(res.starts_with(".claude_projects_"));
2742
2743        let res = path_to_safe_dirname("/home/user/data");
2744        assert!(res.starts_with("home_user_data_"));
2745
2746        let res = path_to_safe_dirname("~/");
2747        assert!(res.starts_with("root_"));
2748
2749        let res = path_to_safe_dirname("");
2750        assert!(res.starts_with("root_"));
2751    }
2752
2753    #[test]
2754    fn test_path_to_safe_dirname_empty() {
2755        let res = path_to_safe_dirname("~");
2756        assert!(res.starts_with("root_"));
2757
2758        let res = path_to_safe_dirname("/");
2759        assert!(res.starts_with("root_"));
2760    }
2761
2762    #[test]
2763    fn test_path_to_safe_dirname_strips_traversal_components() {
2764        let res = path_to_safe_dirname("../../etc/passwd");
2765
2766        assert!(res.starts_with("etc_passwd_"));
2767        assert!(!res.contains(".."));
2768        assert!(!res.contains('/'));
2769        assert!(!res.contains('\\'));
2770    }
2771
2772    #[test]
2773    fn test_get_remote_home_rejects_unsafe_hosts_before_ssh() {
2774        let temp = TempDir::new().unwrap();
2775        let engine = SyncEngine::new(temp.path());
2776
2777        for host in [
2778            "work-mac;touch /tmp/cass-owned",
2779            "work mac",
2780            "work-mac\nhostname",
2781            "work-mac`hostname`",
2782            "work-mac/../../secret",
2783            "-oProxyCommand=evil",
2784            "",
2785            "@host",
2786            "user@",
2787            "user@host@extra",
2788        ] {
2789            let err = engine.get_remote_home(host).unwrap_err();
2790            assert!(
2791                matches!(err, SyncError::SshFailed(ref message) if message.contains("Invalid characters in host")),
2792                "expected invalid-host rejection for {host:?}, got {err}"
2793            );
2794        }
2795    }
2796
2797    #[test]
2798    fn test_sync_source_rejects_invalid_source_name_before_mirror_creation() {
2799        let temp = TempDir::new().unwrap();
2800        let engine = SyncEngine::new(temp.path());
2801        let mut source = SourceDefinition::ssh("../escape", "user@host");
2802        source.paths = vec!["/tmp/sessions".to_string()];
2803
2804        let err = engine
2805            .sync_source(&source)
2806            .expect_err("invalid source name should fail before local writes");
2807
2808        assert!(
2809            matches!(err, SyncError::InvalidSource(ref message) if message.contains("Source name cannot contain path separators")),
2810            "expected invalid source-name rejection, got {err}"
2811        );
2812        assert!(
2813            !temp.path().join("escape").exists(),
2814            "invalid source name must not escape the remotes mirror layout"
2815        );
2816        assert!(
2817            !temp.path().join("remotes").exists(),
2818            "invalid source name must be rejected before creating mirror roots"
2819        );
2820    }
2821
2822    #[test]
2823    fn test_sync_source_rejects_invalid_host_before_mirror_creation() {
2824        let temp = TempDir::new().unwrap();
2825        let engine = SyncEngine::new(temp.path());
2826        let mut source = SourceDefinition::ssh("unsafe-host", "user@host withspace");
2827        source.paths = vec!["/tmp/sessions".to_string()];
2828
2829        let err = engine
2830            .sync_source(&source)
2831            .expect_err("invalid host should fail before local writes");
2832
2833        assert!(
2834            matches!(err, SyncError::InvalidSource(ref message) if message.contains("SSH host cannot contain whitespace")),
2835            "expected invalid host rejection, got {err}"
2836        );
2837        assert!(
2838            !temp.path().join("remotes").exists(),
2839            "invalid host must be rejected before creating mirror roots"
2840        );
2841    }
2842
2843    #[test]
2844    fn test_sync_source_reports_invalid_remote_paths_without_transfer() {
2845        let temp = TempDir::new().unwrap();
2846        let engine = SyncEngine::new(temp.path());
2847
2848        for (path, expected) in [
2849            ("", "paths[0] cannot be empty"),
2850            ("   ", "paths[0] cannot be empty"),
2851            (" ~/.claude/projects", "paths[0] cannot have leading"),
2852            ("~/.claude/projects ", "paths[0] cannot have leading"),
2853            ("~/.claude\nprojects", "paths[0] cannot contain control"),
2854        ] {
2855            let mut source = SourceDefinition::ssh("laptop", "user@laptop.local");
2856            source.paths = vec![path.to_string()];
2857
2858            let report = engine.sync_source(&source).unwrap();
2859            assert_eq!(report.path_results.len(), 1);
2860            let result = &report.path_results[0];
2861            assert!(!result.success);
2862            assert_eq!(result.remote_path, path);
2863            assert!(
2864                result
2865                    .error
2866                    .as_deref()
2867                    .is_some_and(|message| message.contains(expected)),
2868                "expected invalid path rejection for {path:?}, got {result:?}"
2869            );
2870        }
2871    }
2872
2873    #[test]
2874    fn test_remote_sync_path_validation_allows_internal_spaces() {
2875        assert!(
2876            validate_remote_sync_path_entry(
2877                0,
2878                "~/Library/Application Support/Cursor/User/globalStorage"
2879            )
2880            .is_ok()
2881        );
2882    }
2883
2884    #[test]
2885    fn test_sync_source_preserves_path_result_order_for_mixed_invalid_paths() {
2886        let temp = TempDir::new().unwrap();
2887        let engine = SyncEngine::new(temp.path()).with_connection_timeout(1);
2888        // Use a validation-safe TEST-NET host so source structure checks pass,
2889        // but remote-home lookup still fails quickly before path result ordering.
2890        let mut source = SourceDefinition::ssh("laptop", "192.0.2.1");
2891        source.paths = vec![
2892            "~/.codex/sessions".to_string(),
2893            " ~/.claude/projects".to_string(),
2894            "~/.gemini/tmp".to_string(),
2895        ];
2896
2897        let report = engine.sync_source(&source).unwrap();
2898        let remote_paths = report
2899            .path_results
2900            .iter()
2901            .map(|result| result.remote_path.as_str())
2902            .collect::<Vec<_>>();
2903
2904        assert_eq!(
2905            remote_paths,
2906            vec!["~/.codex/sessions", " ~/.claude/projects", "~/.gemini/tmp"]
2907        );
2908        assert!(
2909            report.path_results[1]
2910                .error
2911                .as_deref()
2912                .is_some_and(|message| message.contains("paths[1] cannot have leading")),
2913            "expected invalid path error in original slot: {:?}",
2914            report.path_results
2915        );
2916    }
2917
2918    #[test]
2919    fn test_remote_find_regular_files_command_uses_physical_traversal() {
2920        assert_eq!(
2921            remote_find_regular_files_command("/tmp/has space"),
2922            "find -P '/tmp/has space' -type f -print0"
2923        );
2924        assert_eq!(
2925            remote_find_regular_files_command("/tmp/that's all"),
2926            "find -P '/tmp/that'\\''s all' -type f -print0"
2927        );
2928    }
2929
2930    #[test]
2931    fn test_parse_remote_home_stdout_accepts_single_absolute_candidate() {
2932        assert_eq!(
2933            parse_remote_home_stdout(b"Welcome to host\nCASS_HOME_MARKER:/home/user\n"),
2934            Some("/home/user".to_string())
2935        );
2936        assert_eq!(
2937            parse_remote_home_stdout(b"CASS_HOME_MARKER:/Users/test user\r\n"),
2938            Some("/Users/test user".to_string())
2939        );
2940    }
2941
2942    #[test]
2943    fn test_parse_remote_home_stdout_rejects_missing_or_ambiguous_home() {
2944        assert_eq!(parse_remote_home_stdout(b"Welcome to host\n"), None);
2945        assert_eq!(
2946            parse_remote_home_stdout(b"CASS_HOME_MARKER:not_absolute\n"),
2947            None
2948        );
2949    }
2950
2951    #[test]
2952    fn test_parse_null_terminated_utf8_paths_skips_invalid_entries() {
2953        let paths = parse_null_terminated_utf8_paths(
2954            b"/remote/sessions/a.jsonl\0bad-\xff-name\0/remote/sessions/b.jsonl\0",
2955        );
2956        assert_eq!(
2957            paths,
2958            vec![
2959                "/remote/sessions/a.jsonl".to_string(),
2960                "/remote/sessions/b.jsonl".to_string()
2961            ]
2962        );
2963    }
2964
2965    #[test]
2966    fn test_remote_file_to_safe_local_path_rejects_outside_root() {
2967        let root = Path::new("/remote/sessions");
2968        let local = Path::new("/mirror/root");
2969
2970        assert_eq!(
2971            remote_file_to_safe_local_path(
2972                root,
2973                Path::new("/remote/sessions/a/b.jsonl"),
2974                local,
2975                "sessions"
2976            ),
2977            Some(PathBuf::from("/mirror/root/sessions/a/b.jsonl"))
2978        );
2979        assert_eq!(
2980            remote_file_to_safe_local_path(
2981                Path::new("/remote/session.jsonl"),
2982                Path::new("/remote/session.jsonl"),
2983                local,
2984                "session.jsonl"
2985            ),
2986            Some(PathBuf::from("/mirror/root/session.jsonl"))
2987        );
2988        assert_eq!(
2989            remote_file_to_safe_local_path(
2990                root,
2991                Path::new("/remote/sessions/../secret.txt"),
2992                local,
2993                "sessions"
2994            ),
2995            None
2996        );
2997        assert_eq!(
2998            remote_file_to_safe_local_path(
2999                root,
3000                Path::new("/remote/other/secret.txt"),
3001                local,
3002                "sessions"
3003            ),
3004            None
3005        );
3006    }
3007
3008    #[test]
3009    fn test_local_symlink_guard_allows_regular_paths() {
3010        let temp = TempDir::new().expect("tempdir");
3011        let root = temp.path().join("mirror");
3012        let target = root.join("sessions/session.jsonl");
3013
3014        assert!(reject_local_symlink_below_root(&root, &target).is_ok());
3015
3016        std::fs::create_dir_all(target.parent().expect("target parent")).expect("create parent");
3017        std::fs::write(&target, "{}").expect("write target");
3018
3019        assert!(reject_local_symlink_below_root(&root, &target).is_ok());
3020    }
3021
3022    #[cfg(unix)]
3023    #[test]
3024    fn test_local_symlink_guard_rejects_nested_symlink() {
3025        use std::os::unix::fs::symlink;
3026
3027        let temp = TempDir::new().expect("tempdir");
3028        let root = temp.path().join("mirror");
3029        let outside = temp.path().join("outside");
3030        std::fs::create_dir_all(&root).expect("create root");
3031        std::fs::create_dir_all(&outside).expect("create outside");
3032        symlink(&outside, root.join("sessions")).expect("symlink nested dir");
3033
3034        let err = reject_local_symlink_below_root(&root, &root.join("sessions/session.jsonl"))
3035            .expect_err("nested symlink should be rejected");
3036
3037        assert!(err.contains("Refusing to write"));
3038        assert!(err.contains("sessions"));
3039    }
3040
3041    #[cfg(unix)]
3042    #[test]
3043    fn test_local_symlink_guard_rejects_root_symlink() {
3044        use std::os::unix::fs::symlink;
3045
3046        let temp = TempDir::new().expect("tempdir");
3047        let outside = temp.path().join("outside");
3048        let root = temp.path().join("mirror-link");
3049        std::fs::create_dir_all(&outside).expect("create outside");
3050        symlink(&outside, &root).expect("symlink root");
3051
3052        let err = reject_local_symlink_below_root(&root, &root.join("session.jsonl"))
3053            .expect_err("root symlink should be rejected");
3054
3055        assert!(err.contains("Refusing to write"));
3056        assert!(err.contains("mirror-link"));
3057    }
3058
3059    #[test]
3060    fn test_prepare_local_sync_container_creates_regular_container() {
3061        let temp = TempDir::new().expect("tempdir");
3062        let root = temp.path().join("mirror");
3063        let target = root.join("sessions");
3064
3065        prepare_local_sync_container(&root, &target).expect("regular container should be created");
3066
3067        assert!(target.is_dir());
3068    }
3069
3070    #[cfg(unix)]
3071    #[test]
3072    fn test_prepare_local_sync_container_rejects_preexisting_target_symlink() {
3073        use std::os::unix::fs::symlink;
3074
3075        let temp = TempDir::new().expect("tempdir");
3076        let root = temp.path().join("mirror");
3077        let outside = temp.path().join("outside");
3078        let target = root.join("sessions");
3079        std::fs::create_dir_all(&root).expect("create root");
3080        std::fs::create_dir_all(&outside).expect("create outside");
3081        symlink(&outside, &target).expect("symlink target");
3082
3083        let err = prepare_local_sync_container(&root, &target)
3084            .expect_err("sync container symlink should be rejected");
3085
3086        assert!(err.contains("Refusing to write"));
3087        assert!(err.contains("sessions"));
3088    }
3089
3090    #[cfg(unix)]
3091    #[test]
3092    fn test_prepare_local_sync_container_rejects_root_symlink() {
3093        use std::os::unix::fs::symlink;
3094
3095        let temp = TempDir::new().expect("tempdir");
3096        let outside = temp.path().join("outside");
3097        let root = temp.path().join("mirror-link");
3098        let target = root.join("sessions");
3099        std::fs::create_dir_all(&outside).expect("create outside");
3100        symlink(&outside, &root).expect("symlink root");
3101
3102        let err = prepare_local_sync_container(&root, &target)
3103            .expect_err("sync root symlink should be rejected");
3104
3105        assert!(err.contains("Refusing to write"));
3106        assert!(err.contains("mirror-link"));
3107    }
3108
3109    #[test]
3110    fn test_sftp_file_stat_is_symlink_detects_link_modes() {
3111        let symlink = FileStat {
3112            size: None,
3113            uid: None,
3114            gid: None,
3115            perm: Some(0o120000 | 0o777),
3116            atime: None,
3117            mtime: None,
3118        };
3119        let regular = FileStat {
3120            size: None,
3121            uid: None,
3122            gid: None,
3123            perm: Some(0o100000 | 0o644),
3124            atime: None,
3125            mtime: None,
3126        };
3127
3128        assert!(sftp_file_stat_is_symlink(&symlink));
3129        assert!(!sftp_file_stat_is_symlink(&regular));
3130    }
3131
3132    #[test]
3133    fn test_sftp_entry_file_name_accepts_regular_names() {
3134        let parent = Path::new("/remote");
3135        let entry = parent.join("session.jsonl");
3136
3137        assert_eq!(sftp_entry_file_name(&entry, parent), Some("session.jsonl"));
3138    }
3139
3140    #[test]
3141    fn test_sftp_entry_file_name_skips_dot_entries() {
3142        let parent = Path::new("/remote");
3143
3144        assert_eq!(sftp_entry_file_name(Path::new("."), parent), None);
3145        assert_eq!(sftp_entry_file_name(Path::new(".."), parent), None);
3146    }
3147
3148    #[cfg(unix)]
3149    #[test]
3150    fn test_sftp_entry_file_name_rejects_non_utf8_names() {
3151        use std::ffi::OsStr;
3152        use std::os::unix::ffi::OsStrExt;
3153
3154        let parent = Path::new("/remote");
3155        let bad_component = Path::new(OsStr::from_bytes(b"bad-\xff-name"));
3156        let entry = parent.join(bad_component);
3157
3158        assert_eq!(sftp_entry_file_name(&entry, parent), None);
3159    }
3160
3161    #[test]
3162    fn test_parse_rsync_stats() {
3163        let output = r#"
3164Number of files: 42
3165Number of regular files transferred: 10
3166Total transferred file size: 1,234 bytes
3167        "#;
3168
3169        let stats = parse_rsync_stats(output);
3170        assert_eq!(stats.files_transferred, 10);
3171        assert_eq!(stats.bytes_transferred, 1234);
3172    }
3173
3174    #[test]
3175    fn test_parse_rsync_stats_empty() {
3176        let stats = parse_rsync_stats("");
3177        assert_eq!(stats.files_transferred, 0);
3178        assert_eq!(stats.bytes_transferred, 0);
3179    }
3180
3181    #[test]
3182    fn test_quote_remote_shell_path_handles_spaces_and_quotes() {
3183        assert_eq!(
3184            quote_remote_shell_path("/Users/me/Library/Application Support/Cursor"),
3185            "'/Users/me/Library/Application Support/Cursor'"
3186        );
3187        assert_eq!(
3188            quote_remote_shell_path("/tmp/that's all"),
3189            "'/tmp/that'\\''s all'"
3190        );
3191    }
3192
3193    #[test]
3194    fn test_remote_spec_for_rsync_quotes_only_when_needed() {
3195        assert_eq!(
3196            remote_spec_for_rsync("work-mac", "/tmp/has space", true),
3197            "work-mac:/tmp/has space"
3198        );
3199        assert_eq!(
3200            remote_spec_for_rsync("work-mac", "/tmp/that's all", true),
3201            "work-mac:/tmp/that's all"
3202        );
3203        assert_eq!(
3204            remote_spec_for_rsync("work-mac", "/tmp/has space", false),
3205            "work-mac:'/tmp/has space'"
3206        );
3207    }
3208
3209    #[test]
3210    fn rsync_arg_protection_enum_maps_flags_correctly() {
3211        // Regression for #191: Homebrew rsync 3.4.1 renamed the flag to
3212        // --secluded-args; earlier 3.0–3.3 use --protect-args. The caller
3213        // must pass the name the installed rsync actually accepts in its
3214        // own --help listing.
3215        assert_eq!(
3216            RsyncArgProtection::ProtectArgs.flag(),
3217            Some("--protect-args")
3218        );
3219        assert_eq!(
3220            RsyncArgProtection::SecludedArgs.flag(),
3221            Some("--secluded-args")
3222        );
3223        assert_eq!(RsyncArgProtection::None.flag(), None);
3224        assert!(RsyncArgProtection::ProtectArgs.is_supported());
3225        assert!(RsyncArgProtection::SecludedArgs.is_supported());
3226        assert!(!RsyncArgProtection::None.is_supported());
3227    }
3228
3229    #[test]
3230    fn test_remote_spec_for_shell_bound_copy_quotes_remote_path() {
3231        assert_eq!(
3232            remote_spec_for_shell_bound_copy("work-mac", "/tmp/has space"),
3233            "work-mac:'/tmp/has space'"
3234        );
3235    }
3236
3237    #[test]
3238    fn test_remote_spec_for_scp_always_quotes_remote_path() {
3239        assert_eq!(
3240            remote_spec_for_scp("work-mac", "/tmp/that's all"),
3241            "work-mac:'/tmp/that'\\''s all'"
3242        );
3243    }
3244
3245    #[test]
3246    fn test_sync_report_totals() {
3247        let mut report = SyncReport::new("test", SyncMethod::Rsync);
3248        report.add_path_result(PathSyncResult {
3249            files_transferred: 5,
3250            bytes_transferred: 100,
3251            success: true,
3252            ..Default::default()
3253        });
3254        report.add_path_result(PathSyncResult {
3255            files_transferred: 3,
3256            bytes_transferred: 50,
3257            success: true,
3258            ..Default::default()
3259        });
3260
3261        assert_eq!(report.total_files(), 8);
3262        assert_eq!(report.total_bytes(), 150);
3263        assert!(report.all_succeeded);
3264    }
3265
3266    #[test]
3267    fn test_sync_report_with_failure() {
3268        let mut report = SyncReport::new("test", SyncMethod::Rsync);
3269        report.add_path_result(PathSyncResult {
3270            success: true,
3271            ..Default::default()
3272        });
3273        report.add_path_result(PathSyncResult {
3274            success: false,
3275            error: Some("Connection refused".into()),
3276            ..Default::default()
3277        });
3278
3279        assert!(!report.all_succeeded);
3280        assert_eq!(report.successful_paths(), 1);
3281        assert_eq!(report.failed_paths(), 1);
3282    }
3283
3284    #[test]
3285    fn test_detect_sync_method() {
3286        // This test is platform-dependent but should at least not panic
3287        let method = SyncEngine::detect_sync_method();
3288        assert!(matches!(
3289            method,
3290            SyncMethod::Rsync | SyncMethod::WslRsync | SyncMethod::Scp | SyncMethod::Sftp
3291        ));
3292    }
3293
3294    #[test]
3295    fn test_sync_engine_mirror_dir() {
3296        let engine = SyncEngine::new(Path::new("/data/cass"));
3297        let mirror = engine.mirror_dir("laptop");
3298        assert_eq!(mirror, PathBuf::from("/data/cass/remotes/laptop/mirror"));
3299    }
3300
3301    #[test]
3302    fn test_sync_method_display() {
3303        for (method, expected) in [
3304            (SyncMethod::Rsync, "rsync"),
3305            (SyncMethod::WslRsync, "wsl-rsync"),
3306            (SyncMethod::Scp, "scp"),
3307            (SyncMethod::Sftp, "sftp"),
3308        ] {
3309            assert_eq!(method.as_str(), expected);
3310            assert_eq!(method.to_string(), expected);
3311        }
3312    }
3313
3314    #[test]
3315    fn test_windows_path_to_wsl_drive() {
3316        assert_eq!(
3317            windows_path_to_wsl("C:\\Users\\george\\AppData\\Roaming\\cass"),
3318            "/mnt/c/Users/george/AppData/Roaming/cass"
3319        );
3320    }
3321
3322    #[test]
3323    fn test_windows_path_to_wsl_forward_slash() {
3324        assert_eq!(
3325            windows_path_to_wsl("C:/Users/george/data"),
3326            "/mnt/c/Users/george/data"
3327        );
3328    }
3329
3330    #[test]
3331    fn test_windows_path_to_wsl_non_windows_path_unchanged() {
3332        // A Unix absolute path should pass through unchanged.
3333        assert_eq!(
3334            windows_path_to_wsl("/home/george/data"),
3335            "/home/george/data"
3336        );
3337    }
3338
3339    #[test]
3340    fn test_expand_tilde_with_home() {
3341        // No tilde - returns unchanged
3342        assert_eq!(
3343            SyncEngine::expand_tilde_with_home("/home/user/projects", Some("/home/user")),
3344            "/home/user/projects"
3345        );
3346
3347        // Tilde with home provided
3348        assert_eq!(
3349            SyncEngine::expand_tilde_with_home("~/.claude/projects", Some("/home/user")),
3350            "/home/user/.claude/projects"
3351        );
3352
3353        // Just tilde
3354        assert_eq!(
3355            SyncEngine::expand_tilde_with_home("~", Some("/home/user")),
3356            "/home/user"
3357        );
3358
3359        // Tilde without home - returns unchanged
3360        assert_eq!(
3361            SyncEngine::expand_tilde_with_home("~/.claude/projects", None),
3362            "~/.claude/projects"
3363        );
3364
3365        // ~otheruser/path case - not expanded
3366        assert_eq!(
3367            SyncEngine::expand_tilde_with_home("~otheruser/projects", Some("/home/user")),
3368            "~otheruser/projects"
3369        );
3370    }
3371
3372    #[test]
3373    fn test_sync_report_failed() {
3374        let report = SyncReport::failed("test-source", SyncError::NoHost);
3375        assert_eq!(report.source_name, "test-source");
3376        assert!(!report.all_succeeded);
3377        assert_eq!(report.path_results.len(), 1);
3378        assert!(!report.path_results[0].success);
3379        assert!(report.path_results[0].error.is_some());
3380    }
3381
3382    #[test]
3383    fn test_sync_result_default() {
3384        let result = SyncResult::default();
3385        assert!(matches!(result, SyncResult::Skipped));
3386        assert_eq!(result.label(), "never");
3387    }
3388
3389    #[test]
3390    fn test_source_sync_info_default() {
3391        let info = SourceSyncInfo::default();
3392        assert!(info.last_sync.is_none());
3393        assert_eq!(info.files_synced, 0);
3394        assert_eq!(info.bytes_transferred, 0);
3395        assert_eq!(info.duration_ms, 0);
3396    }
3397
3398    #[test]
3399    fn test_sync_status_update() {
3400        let mut status = SyncStatus::default();
3401
3402        let mut report = SyncReport::new("laptop", SyncMethod::Rsync);
3403        report.add_path_result(PathSyncResult {
3404            files_transferred: 10,
3405            bytes_transferred: 1000,
3406            success: true,
3407            ..Default::default()
3408        });
3409        report.total_duration_ms = 500;
3410
3411        status.update("laptop", &report);
3412
3413        let info = status.get("laptop").unwrap();
3414        assert!(info.last_sync.is_some());
3415        assert!(matches!(info.last_result, SyncResult::Success));
3416        assert_eq!(info.files_synced, 10);
3417        assert_eq!(info.bytes_transferred, 1000);
3418        assert_eq!(info.duration_ms, 500);
3419    }
3420
3421    #[test]
3422    fn test_sync_status_partial_failure() {
3423        let mut status = SyncStatus::default();
3424
3425        let mut report = SyncReport::new("server", SyncMethod::Rsync);
3426        report.add_path_result(PathSyncResult {
3427            success: true,
3428            files_transferred: 5,
3429            ..Default::default()
3430        });
3431        report.add_path_result(PathSyncResult {
3432            success: false,
3433            error: Some("Connection refused".into()),
3434            ..Default::default()
3435        });
3436
3437        status.update("server", &report);
3438
3439        let info = status.get("server").unwrap();
3440        assert!(matches!(info.last_result, SyncResult::PartialFailure(_)));
3441    }
3442
3443    #[test]
3444    fn test_sync_status_full_failure() {
3445        let mut status = SyncStatus::default();
3446
3447        let mut report = SyncReport::new("dead-host", SyncMethod::Rsync);
3448        report.add_path_result(PathSyncResult {
3449            success: false,
3450            error: Some("Host unreachable".into()),
3451            ..Default::default()
3452        });
3453
3454        status.update("dead-host", &report);
3455
3456        let info = status.get("dead-host").unwrap();
3457        assert!(matches!(info.last_result, SyncResult::Failed(_)));
3458    }
3459
3460    #[test]
3461    fn test_sync_status_save_round_trips() {
3462        let temp = TempDir::new().expect("tempdir");
3463        let mut status = SyncStatus::default();
3464        let mut report = SyncReport::new("laptop", SyncMethod::Rsync);
3465        report.add_path_result(PathSyncResult {
3466            files_transferred: 3,
3467            bytes_transferred: 42,
3468            success: true,
3469            ..Default::default()
3470        });
3471        status.update("laptop", &report);
3472
3473        status.save(temp.path()).expect("save status");
3474        let loaded = SyncStatus::load(temp.path()).expect("load status");
3475
3476        let info = loaded.get("laptop").expect("round-tripped source");
3477        assert_eq!(info.files_synced, 3);
3478        assert_eq!(info.bytes_transferred, 42);
3479        assert!(matches!(info.last_result, SyncResult::Success));
3480    }
3481
3482    #[test]
3483    fn test_sync_status_retain_sources_prunes_removed_entries() {
3484        let mut status = SyncStatus::default();
3485        status.sources.insert(
3486            "laptop".into(),
3487            SourceSyncInfo {
3488                files_synced: 3,
3489                ..Default::default()
3490            },
3491        );
3492        status.sources.insert(
3493            "desktop".into(),
3494            SourceSyncInfo {
3495                files_synced: 5,
3496                ..Default::default()
3497            },
3498        );
3499
3500        let removed_any = status.retain_sources(["laptop"]);
3501
3502        assert!(removed_any);
3503        assert!(status.get("laptop").is_some());
3504        assert!(status.get("desktop").is_none());
3505    }
3506
3507    fn source_with_schedule(schedule: SyncSchedule) -> SourceDefinition {
3508        let mut source = SourceDefinition::ssh("laptop", "user@laptop.local");
3509        source.sync_schedule = schedule;
3510        source.paths = vec!["~/.claude/projects".to_string()];
3511        source
3512    }
3513
3514    fn status_with_info(info: SourceSyncInfo) -> SyncStatus {
3515        let mut status = SyncStatus::default();
3516        status.set_info("laptop", info);
3517        status
3518    }
3519
3520    #[test]
3521    fn source_sync_decision_skips_healthy_source_until_schedule_due() {
3522        let now_ms = 1_700_000_000_000;
3523        let source = source_with_schedule(SyncSchedule::Hourly);
3524        let status = status_with_info(SourceSyncInfo {
3525            last_sync: Some(now_ms - 10 * 60 * 1000),
3526            last_result: SyncResult::Success,
3527            duration_ms: 250,
3528            ..Default::default()
3529        });
3530
3531        let decision = status.decision_for_source_at(&source, now_ms, false);
3532
3533        assert_eq!(decision.action, SourceSyncAction::Skip);
3534        assert_eq!(decision.health, SourceHealthKind::Healthy);
3535        assert!(!decision.fallback_active);
3536        assert_eq!(
3537            decision.next_eligible_sync_ms,
3538            Some(now_ms + 50 * 60 * 1000)
3539        );
3540        assert_eq!(decision.staleness_ms, Some(10 * 60 * 1000));
3541        assert_eq!(decision.stale_value_score, 16);
3542    }
3543
3544    #[test]
3545    fn source_sync_decision_syncs_stale_scheduled_source() {
3546        let now_ms = 1_700_000_000_000;
3547        let source = source_with_schedule(SyncSchedule::Hourly);
3548        let status = status_with_info(SourceSyncInfo {
3549            last_sync: Some(now_ms - 2 * 60 * 60 * 1000),
3550            last_result: SyncResult::Success,
3551            duration_ms: 250,
3552            ..Default::default()
3553        });
3554
3555        let decision = status.decision_for_source_at(&source, now_ms, false);
3556
3557        assert_eq!(decision.action, SourceSyncAction::Sync);
3558        assert_eq!(decision.health, SourceHealthKind::Stale);
3559        assert_eq!(decision.stale_value_score, 100);
3560        assert!(
3561            decision
3562                .reasons
3563                .iter()
3564                .any(|reason| reason.contains("schedule is due"))
3565        );
3566    }
3567
3568    #[test]
3569    fn source_sync_decision_defers_auth_failures_with_fallback_reason() {
3570        let now_ms = 1_700_000_000_000;
3571        let source = source_with_schedule(SyncSchedule::Hourly);
3572        let status = status_with_info(SourceSyncInfo {
3573            last_sync: Some(now_ms - 10 * 60 * 1000),
3574            last_result: SyncResult::Failed("Permission denied (publickey)".into()),
3575            duration_ms: 800,
3576            consecutive_failures: 1,
3577            ..Default::default()
3578        });
3579
3580        let decision = status.decision_for_source_at(&source, now_ms, false);
3581
3582        assert_eq!(decision.action, SourceSyncAction::Defer);
3583        assert_eq!(decision.health, SourceHealthKind::AuthFailed);
3584        assert!(decision.fallback_active);
3585        assert_eq!(decision.health_score, 10);
3586    }
3587
3588    #[test]
3589    fn source_sync_decision_marks_partial_success_as_flapping() {
3590        let now_ms = 1_700_000_000_000;
3591        let source = source_with_schedule(SyncSchedule::Hourly);
3592        let status = status_with_info(SourceSyncInfo {
3593            last_sync: Some(now_ms - 10 * 60 * 1000),
3594            last_result: SyncResult::PartialFailure("one path failed".into()),
3595            files_synced: 7,
3596            duration_ms: 900,
3597            consecutive_failures: 1,
3598            ..Default::default()
3599        });
3600
3601        let decision = status.decision_for_source_at(&source, now_ms, false);
3602
3603        assert_eq!(decision.action, SourceSyncAction::Skip);
3604        assert_eq!(decision.health, SourceHealthKind::Flapping);
3605        assert!(decision.fallback_active);
3606    }
3607
3608    #[test]
3609    fn source_sync_decision_keeps_local_fallback_after_unreachable_backoff_expires() {
3610        let now_ms = 1_700_000_000_000;
3611        let source = source_with_schedule(SyncSchedule::Hourly);
3612        let last_sync = now_ms - 10 * 60 * 1000;
3613        let status = status_with_info(SourceSyncInfo {
3614            last_sync: Some(last_sync),
3615            last_result: SyncResult::Failed("Host unreachable".into()),
3616            duration_ms: 900,
3617            consecutive_failures: 1,
3618            ..Default::default()
3619        });
3620
3621        let decision = status.decision_for_source_at(&source, now_ms, false);
3622
3623        assert_eq!(decision.action, SourceSyncAction::Skip);
3624        assert_eq!(decision.health, SourceHealthKind::Flapping);
3625        assert!(decision.fallback_active);
3626        assert_eq!(
3627            decision.backoff_until_ms,
3628            Some(last_sync + SOURCE_FAILURE_BACKOFF_BASE_MS)
3629        );
3630        assert!(
3631            decision
3632                .reasons
3633                .iter()
3634                .any(|reason| reason.contains("local fallback remains active"))
3635        );
3636    }
3637
3638    #[test]
3639    fn source_sync_decision_marks_slow_source_as_high_latency() {
3640        let now_ms = 1_700_000_000_000;
3641        let source = source_with_schedule(SyncSchedule::Hourly);
3642        let status = status_with_info(SourceSyncInfo {
3643            last_sync: Some(now_ms - 10 * 60 * 1000),
3644            last_result: SyncResult::Success,
3645            duration_ms: SOURCE_HIGH_LATENCY_MS + 1,
3646            ..Default::default()
3647        });
3648
3649        let decision = status.decision_for_source_at(&source, now_ms, false);
3650
3651        assert_eq!(decision.action, SourceSyncAction::Skip);
3652        assert_eq!(decision.health, SourceHealthKind::HighLatency);
3653        assert!(decision.fallback_active);
3654    }
3655
3656    #[test]
3657    fn source_sync_decision_manual_override_forces_sync() {
3658        let now_ms = 1_700_000_000_000;
3659        let source = source_with_schedule(SyncSchedule::Manual);
3660        let status = status_with_info(SourceSyncInfo {
3661            last_sync: Some(now_ms),
3662            last_result: SyncResult::Success,
3663            duration_ms: 100,
3664            ..Default::default()
3665        });
3666
3667        let decision = status.decision_for_source_at(&source, now_ms, true);
3668
3669        assert_eq!(decision.action, SourceSyncAction::Sync);
3670        assert!(decision.manual_override);
3671        assert!(
3672            decision
3673                .reasons
3674                .iter()
3675                .any(|reason| reason.contains("overrides automatic scheduling"))
3676        );
3677    }
3678
3679    #[test]
3680    fn test_unique_atomic_temp_path_changes_each_call() {
3681        let final_path = Path::new("/tmp/sync_status.json");
3682        let first = unique_atomic_temp_path(final_path);
3683        let second = unique_atomic_temp_path(final_path);
3684
3685        assert_ne!(first, second);
3686        assert_eq!(first.parent(), final_path.parent());
3687        assert_eq!(second.parent(), final_path.parent());
3688    }
3689
3690    #[test]
3691    fn test_replace_file_from_temp_overwrites_existing_file() {
3692        let temp = TempDir::new().expect("tempdir");
3693        let final_path = temp.path().join("sync_status.json");
3694        let first_tmp = temp.path().join("first.tmp");
3695        let second_tmp = temp.path().join("second.tmp");
3696
3697        std::fs::write(&first_tmp, "{\"first\":true}").expect("write first temp");
3698        replace_file_from_temp(&first_tmp, &final_path).expect("initial replace");
3699        assert_eq!(
3700            std::fs::read_to_string(&final_path).expect("read first final"),
3701            "{\"first\":true}"
3702        );
3703
3704        std::fs::write(&second_tmp, "{\"second\":true}").expect("write second temp");
3705        replace_file_from_temp(&second_tmp, &final_path).expect("overwrite replace");
3706        assert_eq!(
3707            std::fs::read_to_string(&final_path).expect("read second final"),
3708            "{\"second\":true}"
3709        );
3710    }
3711
3712    #[test]
3713    fn test_sync_engine_with_timeouts() {
3714        let engine = SyncEngine::new(Path::new("/data"))
3715            .with_connection_timeout(30)
3716            .with_transfer_timeout(600);
3717
3718        assert_eq!(engine.connection_timeout, 30);
3719        assert_eq!(engine.transfer_timeout, 600);
3720    }
3721
3722    #[test]
3723    fn test_sync_error_display() {
3724        assert_eq!(
3725            SyncError::NoHost.to_string(),
3726            "Source has no host configured"
3727        );
3728        assert_eq!(
3729            SyncError::NoPaths.to_string(),
3730            "Source has no paths configured"
3731        );
3732        assert_eq!(
3733            SyncError::InvalidPath("paths[0] cannot be empty".to_string()).to_string(),
3734            "Invalid source path: paths[0] cannot be empty"
3735        );
3736        assert_eq!(
3737            SyncError::Timeout(30).to_string(),
3738            "Connection timed out after 30 seconds"
3739        );
3740        assert_eq!(SyncError::Cancelled.to_string(), "Sync cancelled");
3741    }
3742
3743    // =========================================================================
3744    // SFTP helper function tests
3745    // =========================================================================
3746
3747    #[test]
3748    fn test_parse_ssh_host_simple() {
3749        let (user, host) = parse_ssh_host("myserver");
3750        assert!(user.is_none());
3751        assert_eq!(host, "myserver");
3752    }
3753
3754    #[test]
3755    fn test_parse_ssh_host_with_user() {
3756        let (user, host) = parse_ssh_host("admin@myserver");
3757        assert_eq!(user, Some("admin"));
3758        assert_eq!(host, "myserver");
3759    }
3760
3761    #[test]
3762    fn test_parse_ssh_host_with_domain() {
3763        let (user, host) = parse_ssh_host("deploy@server.example.com");
3764        assert_eq!(user, Some("deploy"));
3765        assert_eq!(host, "server.example.com");
3766    }
3767
3768    #[test]
3769    fn test_parse_ssh_host_email_like() {
3770        // Edge case: user looks like email prefix
3771        let (user, host) = parse_ssh_host("user@host");
3772        assert_eq!(user, Some("user"));
3773        assert_eq!(host, "host");
3774    }
3775
3776    #[test]
3777    fn test_first_nonblank_username_priority_and_trimming() {
3778        assert_eq!(
3779            first_nonblank_username([Some("  alice  "), Some("bob")]),
3780            Some("alice".to_string())
3781        );
3782        assert_eq!(
3783            first_nonblank_username([Some("  "), None, Some("carol")]),
3784            Some("carol".to_string())
3785        );
3786        assert_eq!(first_nonblank_username([None, Some("\t")]), None);
3787    }
3788
3789    #[test]
3790    fn test_expand_tilde_local_with_tilde_prefix() {
3791        let expanded = expand_tilde_local("~/Documents/file.txt");
3792        // Should start with home directory, not tilde
3793        assert!(!expanded.starts_with('~'));
3794        assert!(expanded.ends_with("/Documents/file.txt"));
3795    }
3796
3797    #[test]
3798    fn test_expand_tilde_local_just_tilde() {
3799        let expanded = expand_tilde_local("~");
3800        // Should be just home directory
3801        assert!(!expanded.starts_with('~'));
3802        assert!(!expanded.is_empty());
3803    }
3804
3805    #[test]
3806    fn test_expand_tilde_local_no_tilde() {
3807        let path = "/absolute/path/to/file";
3808        let expanded = expand_tilde_local(path);
3809        assert_eq!(expanded, path);
3810    }
3811
3812    #[test]
3813    fn test_expand_tilde_local_tilde_in_middle() {
3814        // Tilde in middle should not be expanded
3815        let path = "/path/with/~tilde/inside";
3816        let expanded = expand_tilde_local(path);
3817        assert_eq!(expanded, path);
3818    }
3819}