Skip to main content

coding_agent_search/sources/
sync.rs

1//! Sync engine for pulling agent sessions from remote sources.
2//!
3//! This module provides the core sync functionality using rsync over SSH
4//! for efficient delta transfers, with progress reporting and error recovery.
5//!
6//! # Safety
7//!
8//! **IMPORTANT**: The sync engine uses rsync WITHOUT the `--delete` flag
9//! to ensure safe additive syncs. This prevents accidental data loss if
10//! a remote is misconfigured or temporarily empty.
11//!
12//! # Example
13//!
14//! ```rust,ignore
15//! use coding_agent_search::sources::sync::SyncEngine;
16//! use coding_agent_search::sources::config::SourcesConfig;
17//!
18//! let config = SourcesConfig::load()?;
19//! let engine = SyncEngine::new(&data_dir);
20//!
21//! for source in config.remote_sources() {
22//!     let report = engine.sync_source(source)?;
23//!     println!("Synced {}: {} files", source.name, report.total_files());
24//! }
25//! ```
26
27use std::path::{Path, PathBuf};
28use std::process::{Command, Stdio};
29use std::sync::OnceLock;
30use std::time::{Duration, Instant};
31
32use thiserror::Error;
33
34use super::{
35    config::{
36        SourceDefinition, SyncSchedule, discover_ssh_hosts, source_path_entry_error,
37        ssh_host_has_safe_token_chars, validate_optional_user_host_shape,
38    },
39    host_key_verification_error, is_host_key_verification_failure, strict_ssh_cli_tokens,
40    strict_ssh_command_for_rsync, wait_for_child_output_with_timeout,
41};
42use ssh2::{FileStat, Session, Sftp};
43use std::io::{Read as IoRead, Write as IoWrite};
44use std::net::{Shutdown, TcpStream};
45
46/// Which variant of rsync's "pass args protected to the remote" flag the
47/// system `rsync` accepts. The flag was introduced in rsync 3.0.0 as
48/// `--protect-args`; rsync 3.4.0 renamed the primary form to
49/// `--secluded-args` (`-s`) and current Homebrew `rsync 3.4.1` prints only
50/// the new name in `--help`, so a simple substring probe for `--protect-args`
51/// mis-classifies it as unsupported and falls through to the quoted-path
52/// rsync branch — which breaks (#191). openrsync (macOS 15+) supports
53/// neither.
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55enum RsyncArgProtection {
56    /// Neither flag supported — callers must manually quote remote paths for
57    /// the remote login shell.
58    None,
59    /// rsync 3.0.0..3.4.0 — original flag name.
60    ProtectArgs,
61    /// rsync 3.4.0+ (incl. Homebrew 3.4.1) — renamed primary form.
62    SecludedArgs,
63}
64
65impl RsyncArgProtection {
66    fn is_supported(self) -> bool {
67        !matches!(self, Self::None)
68    }
69
70    /// CLI flag to pass to rsync, or `None` if no protection variant is
71    /// available.
72    fn flag(self) -> Option<&'static str> {
73        match self {
74            Self::ProtectArgs => Some("--protect-args"),
75            Self::SecludedArgs => Some("--secluded-args"),
76            Self::None => None,
77        }
78    }
79}
80
81fn detect_rsync_arg_protection() -> RsyncArgProtection {
82    static CACHED: OnceLock<RsyncArgProtection> = OnceLock::new();
83    *CACHED.get_or_init(|| {
84        let Some(out) = Command::new("rsync").arg("--help").output().ok() else {
85            return RsyncArgProtection::None;
86        };
87        // rsync prints to stdout on GNU/Linux and Homebrew macOS, but some
88        // forks / older builds print help on stderr — check both so we never
89        // misclassify a supported rsync as unsupported.
90        let mut combined = String::from_utf8_lossy(&out.stdout).into_owned();
91        combined.push_str(&String::from_utf8_lossy(&out.stderr));
92        // Prefer the newer name when both are listed (forward-compat with a
93        // hypothetical rsync that keeps both as aliases): `--secluded-args`
94        // is what current rsync actually prints in help output, and using
95        // the printed name is the one guaranteed to be accepted.
96        if combined.contains("--secluded-args") {
97            RsyncArgProtection::SecludedArgs
98        } else if combined.contains("--protect-args") {
99            RsyncArgProtection::ProtectArgs
100        } else {
101            RsyncArgProtection::None
102        }
103    })
104}
105
106fn quote_remote_shell_path(path: &str) -> String {
107    // POSIX shell single-quote escape:
108    // 1. Wrap the whole thing in single quotes.
109    // 2. Escape existing single quotes by closing the current quote,
110    //    inserting a backslash-escaped quote, and opening a new one.
111    // Result: 'foo'\''bar'
112    format!("'{}'", path.replace('\'', r#"'\''"#))
113}
114
115fn remote_spec_for_shell_bound_copy(host: &str, remote_path: &str) -> String {
116    // host itself might contain user@ or be an alias, but we should not quote it
117    // if it's already a single token. However, if it contains spaces or other
118    // weirdness it's already broken for SSH. We focus on the path part.
119    format!("{host}:{}", quote_remote_shell_path(remote_path))
120}
121
122fn remote_spec_for_rsync(host: &str, remote_path: &str, protect_args_supported: bool) -> String {
123    if protect_args_supported {
124        // With --protect-args, rsync handles its own escaping over the wire
125        format!("{host}:{remote_path}")
126    } else {
127        // Without it (e.g. openrsync), we must manually quote for the remote shell
128        remote_spec_for_shell_bound_copy(host, remote_path)
129    }
130}
131
132fn remote_spec_for_scp(host: &str, remote_path: &str) -> String {
133    // scp still executes a remote shell command for the source operand, so the
134    // path side must be quoted even though we pass it as one local argv token.
135    remote_spec_for_shell_bound_copy(host, remote_path)
136}
137
138fn remote_find_regular_files_command(remote_path: &str) -> String {
139    format!(
140        "find -P {} -type f -print0",
141        quote_remote_shell_path(remote_path)
142    )
143}
144
145fn parse_remote_home_stdout(stdout: &[u8]) -> Option<String> {
146    let output = String::from_utf8_lossy(stdout);
147    for line in output.lines() {
148        if let Some(home) = line.trim().strip_prefix("CASS_HOME_MARKER:")
149            && home.starts_with('/')
150            && !home.contains('\0')
151        {
152            return Some(home.to_string());
153        }
154    }
155    None
156}
157
158fn parse_null_terminated_utf8_paths(bytes: &[u8]) -> Vec<String> {
159    bytes
160        .split(|byte| *byte == 0)
161        .filter(|part| !part.is_empty())
162        .filter_map(|part| std::str::from_utf8(part).ok())
163        .map(ToOwned::to_owned)
164        .collect()
165}
166
167fn validate_remote_sync_path_entry(index: usize, path: &str) -> Result<(), SyncError> {
168    match source_path_entry_error(index, path) {
169        Some(message) => Err(SyncError::InvalidPath(message)),
170        None => Ok(()),
171    }
172}
173
174fn invalid_remote_sync_path_result(remote_path: &str, err: SyncError) -> PathSyncResult {
175    PathSyncResult {
176        remote_path: remote_path.to_string(),
177        success: false,
178        error: Some(err.to_string()),
179        ..Default::default()
180    }
181}
182
183fn remote_file_to_safe_local_path(
184    remote_root: &Path,
185    remote_file: &Path,
186    local_container: &Path,
187    leaf_name: &str,
188) -> Option<PathBuf> {
189    let mut local_path = local_container.join(leaf_name);
190    if remote_file == remote_root {
191        return Some(local_path);
192    }
193
194    let relative = remote_file.strip_prefix(remote_root).ok()?;
195    for component in relative.components() {
196        match component {
197            std::path::Component::Normal(name) => local_path.push(name),
198            std::path::Component::CurDir => {}
199            _ => return None,
200        }
201    }
202
203    Some(local_path)
204}
205
206fn existing_local_symlink_below_root(root: &Path, path: &Path) -> Result<Option<PathBuf>, String> {
207    let rel = path.strip_prefix(root).map_err(|_| {
208        format!(
209            "Local path {} is outside sync root {}",
210            path.display(),
211            root.display()
212        )
213    })?;
214
215    let mut current = root.to_path_buf();
216    if let Some(link) = existing_path_symlink(&current)? {
217        return Ok(Some(link));
218    }
219
220    for component in rel.components() {
221        match component {
222            std::path::Component::Normal(name) => current.push(name),
223            std::path::Component::CurDir => continue,
224            _ => {
225                return Err(format!(
226                    "Local path {} contains unsafe component below sync root {}",
227                    path.display(),
228                    root.display()
229                ));
230            }
231        }
232
233        if let Some(link) = existing_path_symlink(&current)? {
234            return Ok(Some(link));
235        }
236    }
237
238    Ok(None)
239}
240
241fn existing_path_symlink(path: &Path) -> Result<Option<PathBuf>, String> {
242    match std::fs::symlink_metadata(path) {
243        Ok(metadata) if metadata.file_type().is_symlink() => Ok(Some(path.to_path_buf())),
244        Ok(_) => Ok(None),
245        Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
246        Err(e) => Err(format!("Failed to inspect {}: {}", path.display(), e)),
247    }
248}
249
250fn reject_local_symlink_below_root(root: &Path, path: &Path) -> Result<(), String> {
251    if let Some(link) = existing_local_symlink_below_root(root, path)? {
252        return Err(format!(
253            "Refusing to write {} through local symlink {}",
254            path.display(),
255            link.display()
256        ));
257    }
258
259    Ok(())
260}
261
262fn prepare_local_sync_container(sync_root: &Path, local_path: &Path) -> Result<(), String> {
263    reject_local_symlink_below_root(sync_root, local_path)?;
264    std::fs::create_dir_all(local_path)
265        .map_err(|e| format!("Failed to create directory: {}", e))?;
266    reject_local_symlink_below_root(sync_root, local_path)?;
267    Ok(())
268}
269
270fn prepare_local_sync_root(local_store: &Path, mirror_dir: &Path) -> Result<(), String> {
271    reject_local_symlink_below_root(local_store, mirror_dir)?;
272    std::fs::create_dir_all(mirror_dir)
273        .map_err(|e| format!("Failed to create directory: {}", e))?;
274    reject_local_symlink_below_root(local_store, mirror_dir)?;
275    Ok(())
276}
277
278fn sftp_file_stat_is_symlink(stat: &FileStat) -> bool {
279    stat.file_type().is_symlink()
280}
281
282/// Errors that can occur during sync operations.
283#[derive(Error, Debug)]
284pub enum SyncError {
285    #[error("Source has no host configured")]
286    NoHost,
287
288    #[error("Source has no paths configured")]
289    NoPaths,
290
291    #[error("Invalid source path: {0}")]
292    InvalidPath(String),
293
294    #[error("Invalid source definition: {0}")]
295    InvalidSource(String),
296
297    #[error("rsync command failed: {0}")]
298    RsyncFailed(String),
299
300    #[error("Failed to create local directory: {0}")]
301    CreateDirFailed(#[from] std::io::Error),
302
303    #[error("SSH connection failed: {0}")]
304    SshFailed(String),
305
306    #[error("Connection timed out after {0} seconds")]
307    Timeout(u64),
308
309    #[error("Sync cancelled")]
310    Cancelled,
311}
312
313/// Method used for syncing files from remote.
314#[derive(Debug, Clone, Copy, PartialEq, Eq)]
315pub enum SyncMethod {
316    /// rsync over SSH - preferred for delta transfers
317    Rsync,
318    /// rsync invoked via WSL (`wsl rsync`) - used on Windows when native rsync is unavailable
319    /// but WSL is installed with rsync available inside it.
320    WslRsync,
321    /// SCP-based transfer using the system `scp` command.
322    ///
323    /// Used on Windows (and other platforms) when rsync is unavailable. Delegates all
324    /// authentication to the system `ssh`/`scp` binary so it inherits OpenSSH agent,
325    /// `~/.ssh/` keys, and `~/.ssh/config` correctly – avoiding the `ssh2` library
326    /// which does not integrate with the Windows OpenSSH agent.
327    Scp,
328    /// SFTP fallback using the `ssh2` crate – last resort only.
329    ///
330    /// Deprecated in favour of [`SyncMethod::Scp`] which uses the native system SSH
331    /// binary. Kept for backward compatibility with callers that pattern-match on this
332    /// variant.
333    Sftp,
334}
335
336impl SyncMethod {
337    pub fn as_str(self) -> &'static str {
338        match self {
339            Self::Rsync => "rsync",
340            Self::WslRsync => "wsl-rsync",
341            Self::Scp => "scp",
342            Self::Sftp => "sftp",
343        }
344    }
345}
346
347impl std::fmt::Display for SyncMethod {
348    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
349        f.write_str(self.as_str())
350    }
351}
352
353/// Result of syncing a single path.
354#[derive(Debug, Clone, Default)]
355pub struct PathSyncResult {
356    /// Remote path that was synced.
357    pub remote_path: String,
358    /// Local destination path.
359    pub local_path: PathBuf,
360    /// Number of files transferred.
361    pub files_transferred: u64,
362    /// Total bytes transferred.
363    pub bytes_transferred: u64,
364    /// Whether the sync succeeded.
365    pub success: bool,
366    /// Error message if sync failed.
367    pub error: Option<String>,
368    /// Duration of the sync operation.
369    pub duration_ms: u64,
370}
371
372/// Report from syncing an entire source.
373#[derive(Debug, Clone)]
374pub struct SyncReport {
375    /// Name of the source that was synced.
376    pub source_name: String,
377    /// Method used for syncing.
378    pub method: SyncMethod,
379    /// Results for each path.
380    pub path_results: Vec<PathSyncResult>,
381    /// Total duration of the sync.
382    pub total_duration_ms: u64,
383    /// Whether all paths synced successfully.
384    pub all_succeeded: bool,
385}
386
387impl SyncReport {
388    /// Create a new report for a source.
389    pub fn new(source_name: impl Into<String>, method: SyncMethod) -> Self {
390        Self {
391            source_name: source_name.into(),
392            method,
393            path_results: Vec::new(),
394            total_duration_ms: 0,
395            all_succeeded: true,
396        }
397    }
398
399    /// Create a failed report when sync couldn't even start.
400    pub fn failed(source_name: impl Into<String>, error: SyncError) -> Self {
401        Self {
402            source_name: source_name.into(),
403            method: SyncMethod::Rsync,
404            path_results: vec![PathSyncResult {
405                error: Some(error.to_string()),
406                success: false,
407                ..Default::default()
408            }],
409            total_duration_ms: 0,
410            all_succeeded: false,
411        }
412    }
413
414    /// Add a path result to the report.
415    pub fn add_path_result(&mut self, result: PathSyncResult) {
416        if !result.success {
417            self.all_succeeded = false;
418        }
419        self.path_results.push(result);
420    }
421
422    /// Get total files transferred across all paths.
423    pub fn total_files(&self) -> u64 {
424        self.path_results.iter().map(|r| r.files_transferred).sum()
425    }
426
427    /// Get total bytes transferred across all paths.
428    pub fn total_bytes(&self) -> u64 {
429        self.path_results.iter().map(|r| r.bytes_transferred).sum()
430    }
431
432    /// Get count of successful path syncs.
433    pub fn successful_paths(&self) -> usize {
434        self.path_results.iter().filter(|r| r.success).count()
435    }
436
437    /// Get count of failed path syncs.
438    pub fn failed_paths(&self) -> usize {
439        self.path_results.iter().filter(|r| !r.success).count()
440    }
441
442    /// Summarize the overall sync outcome.
443    pub fn sync_result(&self) -> SyncResult {
444        if self.all_succeeded {
445            SyncResult::Success
446        } else {
447            let errors: Vec<String> = self
448                .path_results
449                .iter()
450                .filter_map(|r| r.error.clone())
451                .collect();
452            if self.successful_paths() > 0 {
453                SyncResult::PartialFailure(errors.join("; "))
454            } else {
455                SyncResult::Failed(errors.join("; "))
456            }
457        }
458    }
459}
460
461/// Statistics parsed from rsync output.
462#[derive(Debug, Default)]
463struct RsyncStats {
464    files_transferred: u64,
465    bytes_transferred: u64,
466}
467
468/// Sync engine for pulling sessions from remote sources.
469pub struct SyncEngine {
470    /// Base directory for storing synced data.
471    /// Structure: `{local_store}/remotes/{source_name}/mirror/`
472    local_store: PathBuf,
473    /// Connection timeout in seconds.
474    connection_timeout: u64,
475    /// Transfer timeout in seconds (0 = no timeout).
476    transfer_timeout: u64,
477}
478
479impl SyncEngine {
480    /// Create a new sync engine.
481    ///
482    /// # Arguments
483    /// * `data_dir` - The cass data directory (e.g., ~/.local/share/coding-agent-search)
484    pub fn new(data_dir: &Path) -> Self {
485        Self {
486            local_store: data_dir.to_path_buf(),
487            connection_timeout: 10,
488            transfer_timeout: 300, // 5 minutes
489        }
490    }
491
492    /// Set the connection timeout.
493    pub fn with_connection_timeout(mut self, seconds: u64) -> Self {
494        self.connection_timeout = seconds;
495        self
496    }
497
498    /// Set the transfer timeout.
499    pub fn with_transfer_timeout(mut self, seconds: u64) -> Self {
500        self.transfer_timeout = seconds;
501        self
502    }
503
504    /// Get the local mirror directory for a source.
505    pub fn mirror_dir(&self, source_name: &str) -> PathBuf {
506        self.local_store
507            .join("remotes")
508            .join(source_name)
509            .join("mirror")
510    }
511
512    /// Get the remote home directory by SSH-ing to the host and printing `$HOME`.
513    ///
514    /// This is called once per source sync to avoid repeated SSH calls for each path.
515    fn get_remote_home(&self, host: &str) -> Result<String, SyncError> {
516        // Validate host doesn't contain shell metacharacters to prevent injection
517        if host.trim().is_empty()
518            || host.starts_with('-')
519            || !ssh_host_has_safe_token_chars(host)
520            || validate_optional_user_host_shape(host).is_err()
521        {
522            return Err(SyncError::SshFailed(format!(
523                "Invalid characters in host: {}",
524                host
525            )));
526        }
527
528        let timeout_secs = self.connection_timeout.max(1);
529        let mut cmd = Command::new("ssh");
530        cmd.args(strict_ssh_cli_tokens(timeout_secs))
531            .arg("--")
532            .arg(host)
533            .arg("printf 'CASS_HOME_MARKER:%s\\n' \"$HOME\"")
534            .stdout(Stdio::piped())
535            .stderr(Stdio::piped());
536
537        let child = cmd
538            .spawn()
539            .map_err(|e| SyncError::SshFailed(format!("Failed to execute ssh: {}", e)))?;
540        let output = wait_for_child_output_with_timeout(child, Duration::from_secs(timeout_secs))
541            .map_err(|e| SyncError::SshFailed(format!("SSH command failed: {}", e)))?
542            .ok_or(SyncError::Timeout(timeout_secs))?;
543
544        if !output.status.success() {
545            let stderr = String::from_utf8_lossy(&output.stderr);
546            if is_host_key_verification_failure(&stderr) {
547                return Err(SyncError::SshFailed(host_key_verification_error(host)));
548            }
549            return Err(SyncError::SshFailed(format!(
550                "Failed to get remote home directory: {}",
551                stderr.trim()
552            )));
553        }
554
555        let remote_home = parse_remote_home_stdout(&output.stdout).ok_or_else(|| {
556            SyncError::SshFailed(
557                "Unable to parse remote home directory from SSH output".to_string(),
558            )
559        })?;
560
561        tracing::debug!(host = %host, remote_home = %remote_home, "got remote home directory");
562        Ok(remote_home)
563    }
564
565    /// Expand ~ in a remote path using the provided home directory.
566    ///
567    /// If `remote_home` is None, returns the path unchanged.
568    fn expand_tilde_with_home(path: &str, remote_home: Option<&str>) -> String {
569        if !path.starts_with('~') {
570            return path.to_string();
571        }
572
573        let Some(home) = remote_home else {
574            return path.to_string();
575        };
576
577        if path == "~" {
578            home.to_string()
579        } else if let Some(rest) = path.strip_prefix("~/") {
580            format!("{}/{}", home, rest)
581        } else {
582            // ~user/path case - not supported, return as-is
583            path.to_string()
584        }
585    }
586
587    /// Detect the available sync method.
588    ///
589    /// Detection order:
590    /// 1. Native `rsync` → [`SyncMethod::Rsync`]
591    /// 2. `wsl rsync` (Windows only) → [`SyncMethod::WslRsync`]
592    /// 3. System `scp` available → [`SyncMethod::Scp`]
593    /// 4. Last resort → [`SyncMethod::Sftp`] (ssh2-based, no native-agent integration)
594    ///
595    /// On Windows the `ssh2` SFTP path is intentionally avoided whenever possible
596    /// because it bypasses the Windows OpenSSH agent and `~/.ssh/config`, leading to
597    /// "No valid authentication method found" errors even when SSH keys are properly
598    /// configured. Using the system `scp` binary instead lets OpenSSH handle auth the
599    /// same way `ssh` and `cass sources doctor` do.
600    pub fn detect_sync_method() -> SyncMethod {
601        // 1. Native rsync
602        if Command::new("rsync")
603            .arg("--version")
604            .output()
605            .map(|o| o.status.success())
606            .unwrap_or(false)
607        {
608            return SyncMethod::Rsync;
609        }
610
611        // 2. WSL rsync (Windows-only: rsync inside WSL invoked via `wsl rsync`)
612        #[cfg(target_os = "windows")]
613        if Command::new("wsl")
614            .args(["rsync", "--version"])
615            .output()
616            .map(|o| o.status.success())
617            .unwrap_or(false)
618        {
619            return SyncMethod::WslRsync;
620        }
621
622        // 3. System scp – preferred over ssh2/SFTP because it inherits the native
623        //    OpenSSH agent and ~/.ssh/config on all platforms (especially Windows).
624        if Command::new("scp")
625            .arg("-S")
626            .arg("ssh")
627            .arg("--")
628            // pass a harmless flag; scp prints usage and exits non-zero, but if the
629            // binary exists the spawn itself succeeds which is all we need to check.
630            .output()
631            .is_ok()
632        {
633            // Confirm scp is a real binary by checking for the executable
634            if which_scp_exists() {
635                return SyncMethod::Scp;
636            }
637        }
638
639        // 4. Last resort: ssh2-based SFTP
640        SyncMethod::Sftp
641    }
642
643    /// Sync a single source.
644    ///
645    /// Syncs all configured paths from the source to the local mirror directory.
646    /// Individual path failures don't abort the entire sync.
647    pub fn sync_source(&self, source: &SourceDefinition) -> Result<SyncReport, SyncError> {
648        if !source.is_remote() {
649            return Err(SyncError::NoHost);
650        }
651
652        let host = source.host.as_ref().ok_or(SyncError::NoHost)?;
653
654        if source.paths.is_empty() {
655            return Err(SyncError::NoPaths);
656        }
657
658        source
659            .validate_structure()
660            .map_err(|e| SyncError::InvalidSource(e.to_string()))?;
661
662        let method = Self::detect_sync_method();
663        let mut report = SyncReport::new(&source.name, method);
664        let overall_start = Instant::now();
665
666        // Create the mirror directory
667        let mirror_dir = self.mirror_dir(&source.name);
668        prepare_local_sync_root(&self.local_store, &mirror_dir)
669            .map_err(|e| SyncError::CreateDirFailed(std::io::Error::other(e)))?;
670
671        // Pre-fetch remote home directory if any paths use tilde (avoids multiple SSH calls)
672        let remote_home = if source.paths.iter().enumerate().any(|(index, path)| {
673            path.starts_with('~') && validate_remote_sync_path_entry(index, path).is_ok()
674        }) {
675            match self.get_remote_home(host) {
676                Ok(home) => Some(home),
677                Err(e) => {
678                    tracing::warn!(host = %host, error = %e, "Failed to get remote home directory");
679                    None
680                }
681            }
682        } else {
683            None
684        };
685
686        for (index, remote_path) in source.paths.iter().enumerate() {
687            if let Err(err) = validate_remote_sync_path_entry(index, remote_path) {
688                report.add_path_result(invalid_remote_sync_path_result(remote_path, err));
689                continue;
690            }
691
692            let result = match method {
693                SyncMethod::Rsync => {
694                    self.sync_path_rsync(host, remote_path, &mirror_dir, remote_home.as_deref())
695                }
696                SyncMethod::WslRsync => {
697                    self.sync_path_wsl_rsync(host, remote_path, &mirror_dir, remote_home.as_deref())
698                }
699                SyncMethod::Scp => {
700                    self.sync_path_scp(host, remote_path, &mirror_dir, remote_home.as_deref())
701                }
702                SyncMethod::Sftp => {
703                    self.sync_path_sftp(host, remote_path, &mirror_dir, remote_home.as_deref())
704                }
705            };
706            report.add_path_result(result);
707        }
708
709        report.total_duration_ms = overall_start.elapsed().as_millis() as u64;
710        Ok(report)
711    }
712
713    /// Sync all remote sources from a config.
714    ///
715    /// Continues even if individual sources fail.
716    pub fn sync_all(
717        &self,
718        sources: impl Iterator<Item = impl std::borrow::Borrow<SourceDefinition>>,
719    ) -> Vec<SyncReport> {
720        sources
721            .map(|source| {
722                let source = source.borrow();
723                self.sync_source(source)
724                    .unwrap_or_else(|e| SyncReport::failed(&source.name, e))
725            })
726            .collect()
727    }
728
729    /// Sync a single path using rsync.
730    ///
731    /// **IMPORTANT**: Uses rsync WITHOUT --delete for safe additive syncs.
732    ///
733    /// The `remote_home` parameter should be pre-fetched via `get_remote_home()` to avoid
734    /// repeated SSH calls for each path.
735    fn sync_path_rsync(
736        &self,
737        host: &str,
738        remote_path: &str,
739        dest_dir: &Path,
740        remote_home: Option<&str>,
741    ) -> PathSyncResult {
742        let start = Instant::now();
743        if remote_path.starts_with('~') && remote_home.is_none() {
744            let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
745            return PathSyncResult {
746                remote_path: remote_path.to_string(),
747                local_path,
748                success: false,
749                error: Some(
750                    "Cannot expand '~' in remote path; failed to determine remote home directory"
751                        .to_string(),
752                ),
753                duration_ms: start.elapsed().as_millis() as u64,
754                ..Default::default()
755            };
756        }
757
758        // Expand ~ using pre-fetched home directory (no SSH call here)
759        let expanded_path = Self::expand_tilde_with_home(remote_path, remote_home);
760
761        // If tilde expansion failed (no remote_home provided), log a warning
762        if remote_path.starts_with('~') && expanded_path == remote_path {
763            tracing::warn!(
764                remote_path = %remote_path,
765                "Could not expand tilde in path (remote home directory not available)"
766            );
767        }
768
769        // Convert remote path to safe local directory name
770        // Use raw remote_path for stability (independent of home expansion success)
771        let safe_name = path_to_safe_dirname(remote_path);
772        let local_path = dest_dir.join(&safe_name);
773
774        // Create local directory without following any pre-existing mirror symlink.
775        if let Err(e) = prepare_local_sync_container(dest_dir, &local_path) {
776            return PathSyncResult {
777                remote_path: remote_path.to_string(),
778                local_path: local_path.clone(),
779                success: false,
780                error: Some(e),
781                duration_ms: start.elapsed().as_millis() as u64,
782                ..Default::default()
783            };
784        }
785
786        // Build rsync command
787        // NOTE: NO --delete flag! Safe additive sync only.
788        let arg_protection = detect_rsync_arg_protection();
789        let protect_args_supported = arg_protection.is_supported();
790        let remote_spec = remote_spec_for_rsync(host, &expanded_path, protect_args_supported);
791        let ssh_opts = strict_ssh_command_for_rsync(self.connection_timeout);
792
793        let local_path_str = match local_path.to_str() {
794            Some(s) => s,
795            None => {
796                return PathSyncResult {
797                    remote_path: remote_path.to_string(),
798                    local_path,
799                    success: false,
800                    error: Some("Local path contains invalid UTF-8".to_string()),
801                    duration_ms: start.elapsed().as_millis() as u64,
802                    ..Default::default()
803                };
804            }
805        };
806
807        let timeout_str = self.transfer_timeout.to_string();
808        let mut cmd = Command::new("rsync");
809        cmd.args(["-avz", "--links", "--safe-links", "--stats", "--partial"]);
810        if let Some(flag) = arg_protection.flag() {
811            cmd.arg(flag);
812        }
813        cmd.args([
814            "--timeout",
815            &timeout_str,
816            "-e",
817            &ssh_opts,
818            "--",
819            &remote_spec,
820            local_path_str,
821        ]);
822
823        tracing::debug!(
824            host = %host,
825            remote_path = %expanded_path,
826            local_path = %local_path.display(),
827            "starting rsync"
828        );
829
830        let output = match cmd.output() {
831            Ok(o) => o,
832            Err(e) => {
833                return PathSyncResult {
834                    remote_path: remote_path.to_string(),
835                    local_path,
836                    success: false,
837                    error: Some(format!("Failed to execute rsync: {}", e)),
838                    duration_ms: start.elapsed().as_millis() as u64,
839                    ..Default::default()
840                };
841            }
842        };
843
844        let duration_ms = start.elapsed().as_millis() as u64;
845        let stdout = String::from_utf8_lossy(&output.stdout);
846        let stderr = String::from_utf8_lossy(&output.stderr);
847
848        if !output.status.success() {
849            // Check for specific error types
850            let error_msg = if stderr.contains("Connection refused")
851                || stderr.contains("Connection timed out")
852            {
853                format!("SSH connection failed: {}", stderr.trim())
854            } else if is_host_key_verification_failure(&stderr) {
855                host_key_verification_error(host)
856            } else if stderr.contains("No such file or directory") {
857                format!("Remote path not found: {}", expanded_path)
858            } else if stderr.contains("Permission denied") {
859                format!("Permission denied: {}", stderr.trim())
860            } else {
861                format!("rsync failed: {}", stderr.trim())
862            };
863
864            tracing::warn!(
865                host = %host,
866                remote_path = %expanded_path,
867                error = %error_msg,
868                "rsync failed"
869            );
870
871            return PathSyncResult {
872                remote_path: remote_path.to_string(),
873                local_path,
874                success: false,
875                error: Some(error_msg),
876                duration_ms,
877                ..Default::default()
878            };
879        }
880
881        // Parse stats from rsync output
882        let stats = parse_rsync_stats(&stdout);
883
884        tracing::info!(
885            host = %host,
886            remote_path = %expanded_path,
887            files = stats.files_transferred,
888            bytes = stats.bytes_transferred,
889            duration_ms,
890            "rsync completed"
891        );
892
893        PathSyncResult {
894            remote_path: remote_path.to_string(),
895            local_path,
896            files_transferred: stats.files_transferred,
897            bytes_transferred: stats.bytes_transferred,
898            success: true,
899            error: None,
900            duration_ms,
901        }
902    }
903
904    /// Sync a single path using rsync invoked through WSL (`wsl rsync …`).
905    ///
906    /// Used on Windows when native rsync is absent but WSL with rsync is available.
907    /// WSL paths use the `\\wsl$\…` UNC convention for the local destination.
908    fn sync_path_wsl_rsync(
909        &self,
910        host: &str,
911        remote_path: &str,
912        dest_dir: &Path,
913        remote_home: Option<&str>,
914    ) -> PathSyncResult {
915        let start = Instant::now();
916
917        if remote_path.starts_with('~') && remote_home.is_none() {
918            let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
919            return PathSyncResult {
920                remote_path: remote_path.to_string(),
921                local_path,
922                success: false,
923                error: Some(
924                    "Cannot expand '~' in remote path; failed to determine remote home directory"
925                        .to_string(),
926                ),
927                duration_ms: start.elapsed().as_millis() as u64,
928                ..Default::default()
929            };
930        }
931
932        let expanded_path = Self::expand_tilde_with_home(remote_path, remote_home);
933        let safe_name = path_to_safe_dirname(remote_path);
934        let local_path = dest_dir.join(&safe_name);
935
936        if let Err(e) = prepare_local_sync_container(dest_dir, &local_path) {
937            return PathSyncResult {
938                remote_path: remote_path.to_string(),
939                local_path,
940                success: false,
941                error: Some(e),
942                duration_ms: start.elapsed().as_millis() as u64,
943                ..Default::default()
944            };
945        }
946
947        let local_path_str = match local_path.to_str() {
948            Some(s) => s,
949            None => {
950                return PathSyncResult {
951                    remote_path: remote_path.to_string(),
952                    local_path,
953                    success: false,
954                    error: Some("Local path contains invalid UTF-8".to_string()),
955                    duration_ms: start.elapsed().as_millis() as u64,
956                    ..Default::default()
957                };
958            }
959        };
960
961        // Convert Windows path to a WSL-accessible path.
962        // WSL can access Windows paths via /mnt/<drive>/... conventions.
963        // E.g. C:\Users\george\AppData\... → /mnt/c/Users/george/AppData/...
964        let wsl_dest = windows_path_to_wsl(local_path_str);
965
966        let remote_spec = remote_spec_for_rsync(host, &expanded_path, true);
967        let ssh_opts = strict_ssh_command_for_rsync(self.connection_timeout);
968        let timeout_str = self.transfer_timeout.to_string();
969
970        let mut cmd = Command::new("wsl");
971        cmd.args([
972            "rsync",
973            "-avz",
974            "--links",
975            "--safe-links",
976            "--stats",
977            "--partial",
978        ]);
979        // WSL rsync is the real rsync (not openrsync), so --protect-args is safe.
980        cmd.arg("--protect-args");
981        cmd.args([
982            "--timeout",
983            &timeout_str,
984            "-e",
985            &ssh_opts,
986            "--",
987            &remote_spec,
988            &wsl_dest,
989        ]);
990
991        tracing::debug!(
992            host = %host,
993            remote_path = %expanded_path,
994            local_path = %local_path.display(),
995            wsl_dest = %wsl_dest,
996            "starting wsl rsync"
997        );
998
999        let output = match cmd.output() {
1000            Ok(o) => o,
1001            Err(e) => {
1002                return PathSyncResult {
1003                    remote_path: remote_path.to_string(),
1004                    local_path,
1005                    success: false,
1006                    error: Some(format!("Failed to execute wsl rsync: {}", e)),
1007                    duration_ms: start.elapsed().as_millis() as u64,
1008                    ..Default::default()
1009                };
1010            }
1011        };
1012
1013        let duration_ms = start.elapsed().as_millis() as u64;
1014        let stdout = String::from_utf8_lossy(&output.stdout);
1015        let stderr = String::from_utf8_lossy(&output.stderr);
1016
1017        if !output.status.success() {
1018            let error_msg = if stderr.contains("Connection refused")
1019                || stderr.contains("Connection timed out")
1020            {
1021                format!("SSH connection failed: {}", stderr.trim())
1022            } else if is_host_key_verification_failure(&stderr) {
1023                host_key_verification_error(host)
1024            } else if stderr.contains("No such file or directory") {
1025                format!("Remote path not found: {}", expanded_path)
1026            } else if stderr.contains("Permission denied") {
1027                format!("Permission denied: {}", stderr.trim())
1028            } else {
1029                format!("wsl rsync failed: {}", stderr.trim())
1030            };
1031
1032            tracing::warn!(
1033                host = %host,
1034                remote_path = %expanded_path,
1035                error = %error_msg,
1036                "wsl rsync failed"
1037            );
1038
1039            return PathSyncResult {
1040                remote_path: remote_path.to_string(),
1041                local_path,
1042                success: false,
1043                error: Some(error_msg),
1044                duration_ms,
1045                ..Default::default()
1046            };
1047        }
1048
1049        let stats = parse_rsync_stats(&stdout);
1050
1051        tracing::info!(
1052            host = %host,
1053            remote_path = %expanded_path,
1054            files = stats.files_transferred,
1055            bytes = stats.bytes_transferred,
1056            duration_ms,
1057            "wsl rsync completed"
1058        );
1059
1060        PathSyncResult {
1061            remote_path: remote_path.to_string(),
1062            local_path,
1063            files_transferred: stats.files_transferred,
1064            bytes_transferred: stats.bytes_transferred,
1065            success: true,
1066            error: None,
1067            duration_ms,
1068        }
1069    }
1070
1071    /// Sync a single path using SCP after a physical `find -P` regular-file listing.
1072    ///
1073    /// This method delegates all authentication to the native system `scp`/`ssh`
1074    /// binary, which correctly reads `~/.ssh/config`, the OpenSSH agent (including
1075    /// the Windows OpenSSH agent on Windows), and all standard key locations.
1076    ///
1077    /// This avoids the "No valid authentication method found" failure that occurs
1078    /// in the `ssh2`-based SFTP path on Windows, where the library does not
1079    /// integrate with the Windows OpenSSH agent (`ssh-agent.exe`).
1080    fn sync_path_scp(
1081        &self,
1082        host: &str,
1083        remote_path: &str,
1084        dest_dir: &Path,
1085        remote_home: Option<&str>,
1086    ) -> PathSyncResult {
1087        let start = Instant::now();
1088
1089        if remote_path.starts_with('~') && remote_home.is_none() {
1090            let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
1091            return PathSyncResult {
1092                remote_path: remote_path.to_string(),
1093                local_path,
1094                success: false,
1095                error: Some(
1096                    "Cannot expand '~' in remote path; failed to determine remote home directory"
1097                        .to_string(),
1098                ),
1099                duration_ms: start.elapsed().as_millis() as u64,
1100                ..Default::default()
1101            };
1102        }
1103
1104        let expanded_path = Self::expand_tilde_with_home(remote_path, remote_home);
1105        let safe_name = path_to_safe_dirname(remote_path);
1106        let local_path = dest_dir.join(&safe_name);
1107
1108        if let Err(e) = prepare_local_sync_container(dest_dir, &local_path) {
1109            return PathSyncResult {
1110                remote_path: remote_path.to_string(),
1111                local_path,
1112                success: false,
1113                error: Some(e),
1114                duration_ms: start.elapsed().as_millis() as u64,
1115                ..Default::default()
1116            };
1117        }
1118
1119        // `scp -r` follows symlinks on some OpenSSH paths. Enumerate only regular
1120        // files with physical traversal first, then copy those files individually.
1121        let connect_timeout = self.connection_timeout.to_string();
1122        let find_command = remote_find_regular_files_command(&expanded_path);
1123
1124        tracing::debug!(
1125            host = %host,
1126            remote_path = %expanded_path,
1127            local_path = %local_path.display(),
1128            "listing regular files for scp sync"
1129        );
1130
1131        let timeout_secs = self.connection_timeout.max(1);
1132        let mut cmd = Command::new("ssh");
1133        cmd.args(strict_ssh_cli_tokens(timeout_secs))
1134            .arg("--")
1135            .arg(host)
1136            .arg(&find_command)
1137            .stdout(Stdio::piped())
1138            .stderr(Stdio::piped());
1139
1140        let output = match cmd.spawn().and_then(|child| {
1141            wait_for_child_output_with_timeout(child, Duration::from_secs(timeout_secs))
1142        }) {
1143            Ok(Some(o)) => o,
1144            Ok(None) => {
1145                return PathSyncResult {
1146                    remote_path: remote_path.to_string(),
1147                    local_path,
1148                    success: false,
1149                    error: Some(format!(
1150                        "SSH file listing timed out after {timeout_secs} seconds"
1151                    )),
1152                    duration_ms: start.elapsed().as_millis() as u64,
1153                    ..Default::default()
1154                };
1155            }
1156            Err(e) => {
1157                return PathSyncResult {
1158                    remote_path: remote_path.to_string(),
1159                    local_path,
1160                    success: false,
1161                    error: Some(format!("Failed to execute ssh file listing: {}", e)),
1162                    duration_ms: start.elapsed().as_millis() as u64,
1163                    ..Default::default()
1164                };
1165            }
1166        };
1167
1168        let stderr = String::from_utf8_lossy(&output.stderr);
1169        if !output.status.success() {
1170            let error_msg = if stderr.contains("Connection refused")
1171                || stderr.contains("Connection timed out")
1172            {
1173                format!("SSH connection failed: {}", stderr.trim())
1174            } else if is_host_key_verification_failure(&stderr) {
1175                host_key_verification_error(host)
1176            } else if stderr.contains("No such file or directory") {
1177                format!("Remote path not found: {}", expanded_path)
1178            } else if stderr.contains("Permission denied") {
1179                format!("Permission denied: {}", stderr.trim())
1180            } else {
1181                format!("Remote file listing failed: {}", stderr.trim())
1182            };
1183
1184            tracing::warn!(
1185                host = %host,
1186                remote_path = %expanded_path,
1187                error = %error_msg,
1188                "scp file listing failed"
1189            );
1190
1191            return PathSyncResult {
1192                remote_path: remote_path.to_string(),
1193                local_path,
1194                success: false,
1195                error: Some(error_msg),
1196                duration_ms: start.elapsed().as_millis() as u64,
1197                ..Default::default()
1198            };
1199        }
1200
1201        let remote_files = parse_null_terminated_utf8_paths(&output.stdout);
1202        let remote_root = Path::new(&expanded_path);
1203        let leaf_name = Path::new(remote_path)
1204            .file_name()
1205            .and_then(|n| n.to_str())
1206            .unwrap_or("remote");
1207        let mut files_transferred = 0u64;
1208        let mut bytes_transferred = 0u64;
1209
1210        for remote_file in remote_files {
1211            let remote_file_path = Path::new(&remote_file);
1212            let Some(local_file) = remote_file_to_safe_local_path(
1213                remote_root,
1214                remote_file_path,
1215                &local_path,
1216                leaf_name,
1217            ) else {
1218                tracing::warn!(
1219                    remote_path = %remote_file,
1220                    root = %expanded_path,
1221                    "skipping scp file outside listed root"
1222                );
1223                continue;
1224            };
1225
1226            if let Err(e) = reject_local_symlink_below_root(&local_path, &local_file) {
1227                return PathSyncResult {
1228                    remote_path: remote_path.to_string(),
1229                    local_path,
1230                    success: false,
1231                    error: Some(e),
1232                    duration_ms: start.elapsed().as_millis() as u64,
1233                    ..Default::default()
1234                };
1235            }
1236
1237            if let Some(parent) = local_file.parent() {
1238                if let Err(e) = std::fs::create_dir_all(parent) {
1239                    return PathSyncResult {
1240                        remote_path: remote_path.to_string(),
1241                        local_path,
1242                        success: false,
1243                        error: Some(format!("Failed to create {}: {}", parent.display(), e)),
1244                        duration_ms: start.elapsed().as_millis() as u64,
1245                        ..Default::default()
1246                    };
1247                }
1248
1249                if let Err(e) = reject_local_symlink_below_root(&local_path, parent) {
1250                    return PathSyncResult {
1251                        remote_path: remote_path.to_string(),
1252                        local_path,
1253                        success: false,
1254                        error: Some(e),
1255                        duration_ms: start.elapsed().as_millis() as u64,
1256                        ..Default::default()
1257                    };
1258                }
1259            }
1260
1261            if let Err(e) = reject_local_symlink_below_root(&local_path, &local_file) {
1262                return PathSyncResult {
1263                    remote_path: remote_path.to_string(),
1264                    local_path,
1265                    success: false,
1266                    error: Some(e),
1267                    duration_ms: start.elapsed().as_millis() as u64,
1268                    ..Default::default()
1269                };
1270            }
1271
1272            let temp_path =
1273                unique_atomic_sidecar_path(&local_file, "download", "cass-sync-scp-download");
1274            let Some(temp_path_str) = temp_path.to_str() else {
1275                return PathSyncResult {
1276                    remote_path: remote_path.to_string(),
1277                    local_path,
1278                    success: false,
1279                    error: Some("Local path contains invalid UTF-8".to_string()),
1280                    duration_ms: start.elapsed().as_millis() as u64,
1281                    ..Default::default()
1282                };
1283            };
1284            if let Err(e) = std::fs::OpenOptions::new()
1285                .write(true)
1286                .create_new(true)
1287                .open(&temp_path)
1288                .and_then(|file| file.sync_all())
1289            {
1290                return PathSyncResult {
1291                    remote_path: remote_path.to_string(),
1292                    local_path,
1293                    success: false,
1294                    error: Some(format!("Failed to create {}: {}", temp_path.display(), e)),
1295                    duration_ms: start.elapsed().as_millis() as u64,
1296                    ..Default::default()
1297                };
1298            }
1299
1300            let remote_spec = remote_spec_for_scp(host, &remote_file);
1301            let mut cmd = Command::new("scp");
1302            cmd.args([
1303                "-B",
1304                "-o",
1305                &format!("ConnectTimeout={}", connect_timeout),
1306                "-o",
1307                "ServerAliveInterval=15",
1308                "-o",
1309                "ServerAliveCountMax=3",
1310                "-o",
1311                "StrictHostKeyChecking=yes",
1312                "--",
1313                &remote_spec,
1314                temp_path_str,
1315            ]);
1316
1317            let output = match cmd.output() {
1318                Ok(o) => o,
1319                Err(e) => {
1320                    return PathSyncResult {
1321                        remote_path: remote_path.to_string(),
1322                        local_path,
1323                        success: false,
1324                        error: Some(format!("Failed to execute scp: {}", e)),
1325                        duration_ms: start.elapsed().as_millis() as u64,
1326                        ..Default::default()
1327                    };
1328                }
1329            };
1330
1331            if !output.status.success() {
1332                let _ = std::fs::remove_file(&temp_path);
1333                let stderr = String::from_utf8_lossy(&output.stderr);
1334                let error_msg = if is_host_key_verification_failure(&stderr) {
1335                    host_key_verification_error(host)
1336                } else if stderr.contains("Permission denied") {
1337                    format!("Permission denied: {}", stderr.trim())
1338                } else {
1339                    format!("scp failed: {}", stderr.trim())
1340                };
1341
1342                tracing::warn!(
1343                    host = %host,
1344                    remote_path = %remote_file,
1345                    error = %error_msg,
1346                    "scp file transfer failed"
1347                );
1348
1349                return PathSyncResult {
1350                    remote_path: remote_path.to_string(),
1351                    local_path,
1352                    success: false,
1353                    error: Some(error_msg),
1354                    duration_ms: start.elapsed().as_millis() as u64,
1355                    ..Default::default()
1356                };
1357            }
1358
1359            files_transferred += 1;
1360            if let Err(e) = sync_file_path(&temp_path) {
1361                return PathSyncResult {
1362                    remote_path: remote_path.to_string(),
1363                    local_path,
1364                    success: false,
1365                    error: Some(format!("Failed to sync {}: {}", temp_path.display(), e)),
1366                    duration_ms: start.elapsed().as_millis() as u64,
1367                    ..Default::default()
1368                };
1369            }
1370            if let Ok(metadata) = std::fs::metadata(&temp_path) {
1371                bytes_transferred = bytes_transferred.saturating_add(metadata.len());
1372            }
1373            if let Err(e) = replace_file_from_temp(&temp_path, &local_file) {
1374                return PathSyncResult {
1375                    remote_path: remote_path.to_string(),
1376                    local_path,
1377                    success: false,
1378                    error: Some(format!(
1379                        "Failed to publish {} to {}: {}",
1380                        temp_path.display(),
1381                        local_file.display(),
1382                        e
1383                    )),
1384                    duration_ms: start.elapsed().as_millis() as u64,
1385                    ..Default::default()
1386                };
1387            }
1388        }
1389
1390        let duration_ms = start.elapsed().as_millis() as u64;
1391
1392        tracing::info!(
1393            host = %host,
1394            remote_path = %expanded_path,
1395            files = files_transferred,
1396            bytes = bytes_transferred,
1397            duration_ms,
1398            "scp sync completed"
1399        );
1400
1401        PathSyncResult {
1402            remote_path: remote_path.to_string(),
1403            local_path,
1404            files_transferred,
1405            bytes_transferred,
1406            success: true,
1407            error: None,
1408            duration_ms,
1409        }
1410    }
1411
1412    /// Sync a single path using SFTP (fallback when rsync unavailable).
1413    ///
1414    /// Uses the ssh2 crate for SFTP transfers. Authenticates via SSH agent
1415    /// or key file from SSH config.
1416    fn sync_path_sftp(
1417        &self,
1418        host: &str,
1419        remote_path: &str,
1420        dest_dir: &Path,
1421        remote_home: Option<&str>,
1422    ) -> PathSyncResult {
1423        let start = Instant::now();
1424        if remote_path.starts_with('~') && remote_home.is_none() {
1425            let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
1426            return PathSyncResult {
1427                remote_path: remote_path.to_string(),
1428                local_path,
1429                success: false,
1430                error: Some(
1431                    "Cannot expand '~' in remote path; failed to determine remote home directory"
1432                        .to_string(),
1433                ),
1434                duration_ms: start.elapsed().as_millis() as u64,
1435                ..Default::default()
1436            };
1437        }
1438        let expanded_path = Self::expand_tilde_with_home(remote_path, remote_home);
1439        // Use raw remote_path for stability (independent of home expansion success)
1440        let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
1441
1442        // Create local directory without following any pre-existing mirror symlink.
1443        if let Err(e) = prepare_local_sync_container(dest_dir, &local_path) {
1444            return PathSyncResult {
1445                remote_path: remote_path.to_string(),
1446                local_path,
1447                success: false,
1448                error: Some(e),
1449                duration_ms: start.elapsed().as_millis() as u64,
1450                ..Default::default()
1451            };
1452        }
1453
1454        // Parse host to extract user if present (user@host format)
1455        let (ssh_user, ssh_host) = parse_ssh_host(host);
1456
1457        // Look up host in SSH config for connection details
1458        // First try matching by SSH config alias (Host line), then by actual hostname
1459        let ssh_config = discover_ssh_hosts();
1460        let host_config = ssh_config.iter().find(|h| h.name == ssh_host).or_else(|| {
1461            ssh_config
1462                .iter()
1463                .find(|h| h.hostname.as_deref() == Some(ssh_host))
1464        });
1465
1466        // Determine connection parameters
1467        let hostname = host_config
1468            .and_then(|h| h.hostname.as_deref())
1469            .unwrap_or(ssh_host);
1470        let port = host_config.and_then(|h| h.port).unwrap_or(22);
1471        // Resolve username deterministically; never guess with a sentinel value.
1472        let username = match first_nonblank_username([
1473            ssh_user,
1474            host_config.and_then(|h| h.user.as_deref()),
1475        ])
1476        .or_else(|| env_username("USER"))
1477        .or_else(|| env_username("LOGNAME"))
1478        {
1479            Some(user) => user,
1480            None => {
1481                return PathSyncResult {
1482                    remote_path: remote_path.to_string(),
1483                    local_path,
1484                    success: false,
1485                    error: Some(format!(
1486                        "Unable to determine SSH username for host '{}' (missing/blank user@host, SSH config user, USER, and LOGNAME)",
1487                        host
1488                    )),
1489                    duration_ms: start.elapsed().as_millis() as u64,
1490                    ..Default::default()
1491                };
1492            }
1493        };
1494        let identity_file = host_config.and_then(|h| h.identity_file.as_deref());
1495
1496        tracing::debug!(
1497            hostname = %hostname,
1498            port,
1499            username = %username,
1500            identity_file = ?identity_file,
1501            remote_path = %expanded_path,
1502            "SFTP connection parameters"
1503        );
1504
1505        // Connect via TCP with connection timeout
1506        let conn_timeout = std::time::Duration::from_secs(self.connection_timeout);
1507        let addr = format!("{}:{}", hostname, port);
1508        let sock_addr: std::net::SocketAddr = match addr.parse().or_else(|_| {
1509            // Resolve hostname to socket address
1510            use std::net::ToSocketAddrs;
1511            (hostname, port)
1512                .to_socket_addrs()
1513                .ok()
1514                .and_then(|mut addrs| addrs.next())
1515                .ok_or(std::io::Error::new(
1516                    std::io::ErrorKind::InvalidInput,
1517                    "cannot resolve hostname",
1518                ))
1519        }) {
1520            Ok(a) => a,
1521            Err(e) => {
1522                return PathSyncResult {
1523                    remote_path: remote_path.to_string(),
1524                    local_path,
1525                    success: false,
1526                    error: Some(format!("DNS resolution failed for {hostname}:{port}: {e}")),
1527                    duration_ms: start.elapsed().as_millis() as u64,
1528                    ..Default::default()
1529                };
1530            }
1531        };
1532        let tcp = match TcpStream::connect_timeout(&sock_addr, conn_timeout) {
1533            Ok(t) => t,
1534            Err(e) => {
1535                return PathSyncResult {
1536                    remote_path: remote_path.to_string(),
1537                    local_path,
1538                    success: false,
1539                    error: Some(format!(
1540                        "TCP connection failed to {}:{}: {}",
1541                        hostname, port, e
1542                    )),
1543                    duration_ms: start.elapsed().as_millis() as u64,
1544                    ..Default::default()
1545                };
1546            }
1547        };
1548
1549        // Set TCP read/write timeout (use transfer_timeout, not connection_timeout)
1550        let timeout = std::time::Duration::from_secs(self.transfer_timeout);
1551        if let Err(e) = tcp.set_read_timeout(Some(timeout)) {
1552            tracing::warn!("Failed to set TCP read timeout: {}", e);
1553        }
1554        if let Err(e) = tcp.set_write_timeout(Some(timeout)) {
1555            tracing::warn!("Failed to set TCP write timeout: {}", e);
1556        }
1557        let tcp_shutdown = tcp.try_clone().ok();
1558
1559        // Create SSH session
1560        let mut session = match Session::new() {
1561            Ok(s) => s,
1562            Err(e) => {
1563                let _ = tcp.shutdown(Shutdown::Both);
1564                return PathSyncResult {
1565                    remote_path: remote_path.to_string(),
1566                    local_path,
1567                    success: false,
1568                    error: Some(format!("Failed to create SSH session: {}", e)),
1569                    duration_ms: start.elapsed().as_millis() as u64,
1570                    ..Default::default()
1571                };
1572            }
1573        };
1574
1575        session.set_tcp_stream(tcp);
1576        let close_connections = |session: &mut Session, reason: &str| {
1577            let _ = session.disconnect(None, reason, None);
1578            if let Some(stream) = tcp_shutdown.as_ref() {
1579                let _ = stream.shutdown(Shutdown::Both);
1580            }
1581        };
1582
1583        if let Err(e) = session.handshake() {
1584            close_connections(&mut session, "handshake failed");
1585            return PathSyncResult {
1586                remote_path: remote_path.to_string(),
1587                local_path,
1588                success: false,
1589                error: Some(format!("SSH handshake failed: {}", e)),
1590                duration_ms: start.elapsed().as_millis() as u64,
1591                ..Default::default()
1592            };
1593        }
1594
1595        // Authenticate - try agent first, then key file
1596        if let Err(e) = self.authenticate_ssh(&session, &username, identity_file) {
1597            close_connections(&mut session, "authentication failed");
1598            return PathSyncResult {
1599                remote_path: remote_path.to_string(),
1600                local_path,
1601                success: false,
1602                error: Some(format!("SSH authentication failed: {}", e)),
1603                duration_ms: start.elapsed().as_millis() as u64,
1604                ..Default::default()
1605            };
1606        }
1607
1608        // Open SFTP session
1609        let sftp = match session.sftp() {
1610            Ok(s) => s,
1611            Err(e) => {
1612                close_connections(&mut session, "sftp open failed");
1613                return PathSyncResult {
1614                    remote_path: remote_path.to_string(),
1615                    local_path,
1616                    success: false,
1617                    error: Some(format!("Failed to open SFTP session: {}", e)),
1618                    duration_ms: start.elapsed().as_millis() as u64,
1619                    ..Default::default()
1620                };
1621            }
1622        };
1623
1624        tracing::info!(
1625            host = %host,
1626            remote_path = %expanded_path,
1627            local_path = %local_path.display(),
1628            "starting SFTP sync"
1629        );
1630
1631        // Recursively download the remote path
1632        let mut files_transferred = 0u64;
1633        let mut bytes_transferred = 0u64;
1634
1635        // For consistency with rsync and scp, we should create a subdirectory
1636        // with the remote path's leaf name inside the container directory.
1637        let leaf_name = Path::new(remote_path)
1638            .file_name()
1639            .and_then(|n| n.to_str())
1640            .unwrap_or("remote");
1641        let target_local_path = local_path.join(leaf_name);
1642
1643        if let Err(e) = self.sftp_download_recursive(
1644            &sftp,
1645            Path::new(&expanded_path),
1646            &target_local_path,
1647            &local_path,
1648            &mut files_transferred,
1649            &mut bytes_transferred,
1650        ) {
1651            close_connections(&mut session, "sftp download failed");
1652            return PathSyncResult {
1653                remote_path: remote_path.to_string(),
1654                local_path,
1655                files_transferred,
1656                bytes_transferred,
1657                success: false,
1658                error: Some(format!("SFTP download failed: {}", e)),
1659                duration_ms: start.elapsed().as_millis() as u64,
1660            };
1661        }
1662
1663        let duration_ms = start.elapsed().as_millis() as u64;
1664
1665        tracing::info!(
1666            host = %host,
1667            remote_path = %expanded_path,
1668            files = files_transferred,
1669            bytes = bytes_transferred,
1670            duration_ms,
1671            "SFTP sync completed"
1672        );
1673
1674        close_connections(&mut session, "sync complete");
1675        PathSyncResult {
1676            remote_path: remote_path.to_string(),
1677            local_path,
1678            files_transferred,
1679            bytes_transferred,
1680            success: true,
1681            error: None,
1682            duration_ms,
1683        }
1684    }
1685
1686    /// Authenticate SSH session using agent or key file.
1687    fn authenticate_ssh(
1688        &self,
1689        session: &Session,
1690        username: &str,
1691        identity_file: Option<&str>,
1692    ) -> Result<(), String> {
1693        // Try SSH agent first
1694        if let Ok(mut agent) = session.agent()
1695            && agent.connect().is_ok()
1696            && agent.list_identities().is_ok()
1697        {
1698            for identity in agent.identities().unwrap_or_default() {
1699                if agent.userauth(username, &identity).is_ok() && session.authenticated() {
1700                    tracing::debug!("Authenticated via SSH agent");
1701                    return Ok(());
1702                }
1703            }
1704        }
1705
1706        // Try key file if specified
1707        if let Some(key_path) = identity_file {
1708            let key_path_expanded = expand_tilde_local(key_path);
1709            let key_path_buf = Path::new(&key_path_expanded);
1710
1711            if key_path_buf.exists()
1712                && session
1713                    .userauth_pubkey_file(username, None, key_path_buf, None)
1714                    .is_ok()
1715                && session.authenticated()
1716            {
1717                tracing::debug!(key = %key_path_buf.display(), "Authenticated via key file");
1718                return Ok(());
1719            }
1720        }
1721
1722        // Try default key locations
1723        if let Some(home) = dirs::home_dir() {
1724            for key_name in ["id_ed25519", "id_rsa", "id_ecdsa"] {
1725                let key_path = home.join(".ssh").join(key_name);
1726                if key_path.exists()
1727                    && session
1728                        .userauth_pubkey_file(username, None, &key_path, None)
1729                        .is_ok()
1730                    && session.authenticated()
1731                {
1732                    tracing::debug!(key = %key_path.display(), "Authenticated via default key");
1733                    return Ok(());
1734                }
1735            }
1736        }
1737
1738        Err(format!(
1739            "No valid authentication method found for user '{}'",
1740            username
1741        ))
1742    }
1743
1744    /// Recursively download a remote path via SFTP.
1745    fn sftp_download_recursive(
1746        &self,
1747        sftp: &Sftp,
1748        remote_path: &Path,
1749        local_path: &Path,
1750        local_root: &Path,
1751        files_transferred: &mut u64,
1752        bytes_transferred: &mut u64,
1753    ) -> Result<(), String> {
1754        // Use lstat so a remote symlink is classified as a symlink rather than
1755        // followed to a file or directory outside the configured source root.
1756        let stat = sftp
1757            .lstat(remote_path)
1758            .map_err(|e| format!("Failed to lstat {}: {}", remote_path.display(), e))?;
1759
1760        if sftp_file_stat_is_symlink(&stat) {
1761            tracing::warn!(
1762                path = %remote_path.display(),
1763                "Skipping remote symlink during SFTP sync"
1764            );
1765            return Ok(());
1766        }
1767
1768        if stat.is_dir() {
1769            // Create local directory for this directory item
1770            reject_local_symlink_below_root(local_root, local_path)?;
1771            std::fs::create_dir_all(local_path)
1772                .map_err(|e| format!("Failed to create {}: {}", local_path.display(), e))?;
1773            reject_local_symlink_below_root(local_root, local_path)?;
1774
1775            // List directory contents
1776            let entries = sftp
1777                .readdir(remote_path)
1778                .map_err(|e| format!("Failed to list {}: {}", remote_path.display(), e))?;
1779
1780            for (entry_path, _entry_stat) in entries {
1781                let Some(file_name) = sftp_entry_file_name(&entry_path, remote_path) else {
1782                    continue;
1783                };
1784
1785                let entry_stat = sftp
1786                    .lstat(&entry_path)
1787                    .map_err(|e| format!("Failed to lstat {}: {}", entry_path.display(), e))?;
1788                if sftp_file_stat_is_symlink(&entry_stat) {
1789                    tracing::warn!(
1790                        path = %entry_path.display(),
1791                        "Skipping remote symlink during SFTP sync"
1792                    );
1793                    continue;
1794                }
1795
1796                let local_entry = local_path.join(file_name);
1797
1798                if entry_stat.is_dir() {
1799                    // Recurse into subdirectory
1800                    self.sftp_download_recursive(
1801                        sftp,
1802                        &entry_path,
1803                        &local_entry,
1804                        local_root,
1805                        files_transferred,
1806                        bytes_transferred,
1807                    )?;
1808                } else if entry_stat.is_file() {
1809                    // Download file
1810                    if self.sftp_download_file(
1811                        sftp,
1812                        &entry_path,
1813                        &local_entry,
1814                        local_root,
1815                        bytes_transferred,
1816                    )? {
1817                        *files_transferred += 1;
1818                    }
1819                }
1820                // Skip symlinks and other types for safety
1821            }
1822        } else if stat.is_file() {
1823            // Ensure the parent directory exists
1824            if let Some(parent) = local_path.parent() {
1825                reject_local_symlink_below_root(local_root, parent)?;
1826                std::fs::create_dir_all(parent).map_err(|e| {
1827                    format!("Failed to create local dir {}: {}", parent.display(), e)
1828                })?;
1829                reject_local_symlink_below_root(local_root, parent)?;
1830            }
1831
1832            if self.sftp_download_file(
1833                sftp,
1834                remote_path,
1835                local_path,
1836                local_root,
1837                bytes_transferred,
1838            )? {
1839                *files_transferred += 1;
1840            }
1841        } else {
1842            // Not a regular file or directory (symlink, socket, etc.) - skip with warning
1843            tracing::warn!(
1844                path = %remote_path.display(),
1845                "Skipping remote path: not a regular file or directory"
1846            );
1847        }
1848
1849        Ok(())
1850    }
1851
1852    /// Download a single file via SFTP.
1853    fn sftp_download_file(
1854        &self,
1855        sftp: &Sftp,
1856        remote_path: &Path,
1857        local_path: &Path,
1858        local_root: &Path,
1859        bytes_transferred: &mut u64,
1860    ) -> Result<bool, String> {
1861        let stat = sftp
1862            .lstat(remote_path)
1863            .map_err(|e| format!("Failed to lstat {}: {}", remote_path.display(), e))?;
1864        if sftp_file_stat_is_symlink(&stat) {
1865            tracing::warn!(
1866                path = %remote_path.display(),
1867                "Skipping remote symlink during SFTP sync"
1868            );
1869            return Ok(false);
1870        }
1871        if !stat.is_file() {
1872            tracing::warn!(
1873                path = %remote_path.display(),
1874                "Skipping remote path: not a regular file"
1875            );
1876            return Ok(false);
1877        }
1878
1879        let mut remote_file = sftp
1880            .open(remote_path)
1881            .map_err(|e| format!("Failed to open {}: {}", remote_path.display(), e))?;
1882
1883        reject_local_symlink_below_root(local_root, local_path)?;
1884
1885        let temp_path = unique_atomic_sidecar_path(local_path, "download", "cass-sync-download");
1886        let mut local_file = std::fs::OpenOptions::new()
1887            .write(true)
1888            .create_new(true)
1889            .open(&temp_path)
1890            .map_err(|e| format!("Failed to create {}: {}", temp_path.display(), e))?;
1891
1892        // Transfer in chunks
1893        let mut buffer = [0u8; 32768]; // 32KB chunks
1894        loop {
1895            let bytes_read = remote_file
1896                .read(&mut buffer)
1897                .map_err(|e| format!("Failed to read {}: {}", remote_path.display(), e))?;
1898
1899            if bytes_read == 0 {
1900                break;
1901            }
1902
1903            local_file
1904                .write_all(&buffer[..bytes_read])
1905                .map_err(|e| format!("Failed to write {}: {}", local_path.display(), e))?;
1906
1907            *bytes_transferred += bytes_read as u64;
1908        }
1909
1910        tracing::trace!(
1911            remote = %remote_path.display(),
1912            local = %local_path.display(),
1913            "downloaded file"
1914        );
1915
1916        local_file
1917            .sync_all()
1918            .map_err(|e| format!("Failed to sync {}: {}", temp_path.display(), e))?;
1919        drop(local_file);
1920        replace_file_from_temp(&temp_path, local_path).map_err(|e| {
1921            format!(
1922                "Failed to publish {} to {}: {}",
1923                temp_path.display(),
1924                local_path.display(),
1925                e
1926            )
1927        })?;
1928
1929        Ok(true)
1930    }
1931}
1932
1933/// Resolve an SFTP entry's basename for local mirroring.
1934fn sftp_entry_file_name<'a>(entry_path: &'a Path, parent_path: &Path) -> Option<&'a str> {
1935    let Some(file_name) = entry_path.file_name() else {
1936        tracing::warn!(
1937            parent = %parent_path.display(),
1938            entry = ?entry_path,
1939            "Skipping SFTP entry without a file name"
1940        );
1941        return None;
1942    };
1943
1944    let Some(file_name) = file_name.to_str() else {
1945        tracing::warn!(
1946            parent = %parent_path.display(),
1947            entry = ?entry_path,
1948            "Skipping SFTP entry with non-UTF-8 file name"
1949        );
1950        return None;
1951    };
1952
1953    if file_name.is_empty() {
1954        tracing::warn!(
1955            parent = %parent_path.display(),
1956            entry = ?entry_path,
1957            "Skipping SFTP entry with empty file name"
1958        );
1959        return None;
1960    }
1961
1962    if file_name == "." || file_name == ".." {
1963        return None;
1964    }
1965
1966    Some(file_name)
1967}
1968
1969/// Check whether the `scp` executable exists on this system.
1970///
1971/// Uses a simple PATH search rather than running `scp` (which exits non-zero
1972/// when invoked without arguments on many platforms).
1973fn which_scp_exists() -> bool {
1974    std::env::var_os("PATH")
1975        .map(|path_var| {
1976            std::env::split_paths(&path_var).any(|dir| {
1977                let candidate = dir.join(if cfg!(target_os = "windows") {
1978                    "scp.exe"
1979                } else {
1980                    "scp"
1981                });
1982                candidate.is_file()
1983            })
1984        })
1985        .unwrap_or(false)
1986}
1987
1988/// Convert a Windows absolute path to a WSL-accessible `/mnt/<drive>/…` path.
1989///
1990/// E.g. `C:\Users\george\AppData\Roaming\cass` →
1991///      `/mnt/c/Users/george/AppData/Roaming/cass`
1992///
1993/// If the path does not look like a Windows drive path it is returned unchanged.
1994fn windows_path_to_wsl(path: &str) -> String {
1995    // Match "C:\..." or "C:/..."
1996    if path.len() >= 3 {
1997        let bytes = path.as_bytes();
1998        if bytes[1] == b':' && (bytes[2] == b'\\' || bytes[2] == b'/') {
1999            let drive = (bytes[0] as char).to_lowercase().next().unwrap_or('c');
2000            let rest = path[3..].replace('\\', "/");
2001            return format!("/mnt/{}/{}", drive, rest);
2002        }
2003    }
2004    path.to_string()
2005}
2006
2007/// Parse SSH host string into (optional_user, host).
2008///
2009/// Examples:
2010/// - "myserver" -> (None, "myserver")
2011/// - "user@myserver" -> (Some("user"), "myserver")
2012fn parse_ssh_host(host: &str) -> (Option<&str>, &str) {
2013    if let Some(at_pos) = host.find('@') {
2014        let user = &host[..at_pos];
2015        let hostname = &host[at_pos + 1..];
2016        (Some(user), hostname)
2017    } else {
2018        (None, host)
2019    }
2020}
2021
2022fn first_nonblank_username<'a>(
2023    candidates: impl IntoIterator<Item = Option<&'a str>>,
2024) -> Option<String> {
2025    candidates.into_iter().find_map(|candidate| {
2026        let trimmed = candidate?.trim();
2027        if trimmed.is_empty() {
2028            None
2029        } else {
2030            Some(trimmed.to_string())
2031        }
2032    })
2033}
2034
2035fn env_username(key: &str) -> Option<String> {
2036    dotenvy::var(key)
2037        .ok()
2038        .and_then(|value| first_nonblank_username([Some(value.as_str())]))
2039}
2040
2041/// Expand tilde in local paths.
2042fn expand_tilde_local(path: &str) -> String {
2043    if let Some(stripped) = path.strip_prefix("~/")
2044        && let Some(home) = dirs::home_dir()
2045    {
2046        return format!("{}/{}", home.display(), stripped);
2047    } else if path == "~"
2048        && let Some(home) = dirs::home_dir()
2049    {
2050        return home.display().to_string();
2051    }
2052    path.to_string()
2053}
2054
2055/// Convert a remote path to a safe directory name.
2056///
2057/// Sanitizes path by:
2058/// - Removing leading `~` and `/`
2059/// - Replacing path separators and spaces with underscores
2060/// - Removing parent directory references (`..`) to prevent traversal attacks
2061/// - Removing current directory references (`.`)
2062/// - Appending a stable hash to prevent collisions (e.g., "foo/bar" vs "foo_bar")
2063pub fn path_to_safe_dirname(path: &str) -> String {
2064    use std::path::{Component, Path};
2065
2066    let path_obj = Path::new(path);
2067    let mut parts: Vec<&str> = Vec::new();
2068
2069    for component in path_obj.components() {
2070        match component {
2071            Component::Normal(name) => {
2072                if let Some(s) = name.to_str() {
2073                    // Skip "~" (home directory marker) and empty/dot-only components
2074                    if !s.is_empty() && s != "." && s != "~" {
2075                        parts.push(s);
2076                    }
2077                }
2078            }
2079            // Skip all traversal components for security
2080            Component::ParentDir
2081            | Component::CurDir
2082            | Component::RootDir
2083            | Component::Prefix(_) => {}
2084        }
2085    }
2086
2087    let cleaned = parts.join("_").replace([' ', '\\'], "_");
2088
2089    // Append stable hash to prevent collisions
2090    let hash = fnv1a_hash(path);
2091    let hash_suffix = format!("{:08x}", hash);
2092
2093    if cleaned.is_empty() {
2094        format!("root_{}", hash_suffix)
2095    } else {
2096        format!("{}_{}", cleaned, hash_suffix)
2097    }
2098}
2099
2100fn fnv1a_hash(text: &str) -> u64 {
2101    let mut hash: u64 = 0xcbf29ce484222325;
2102    for byte in text.bytes() {
2103        hash ^= u64::from(byte);
2104        hash = hash.wrapping_mul(0x100000001b3);
2105    }
2106    hash
2107}
2108
2109/// Parse transfer statistics from rsync --stats output.
2110fn parse_rsync_stats(output: &str) -> RsyncStats {
2111    let mut stats = RsyncStats::default();
2112
2113    for line in output.lines() {
2114        let line = line.trim();
2115
2116        // Parse "Number of regular files transferred: N"
2117        if line.starts_with("Number of regular files transferred:")
2118            && let Some(num_str) = line.split(':').nth(1)
2119        {
2120            stats.files_transferred = num_str.trim().replace(',', "").parse().unwrap_or(0);
2121        }
2122
2123        // Parse "Total transferred file size: N bytes"
2124        if line.starts_with("Total transferred file size:")
2125            && let Some(size_part) = line.split(':').nth(1)
2126        {
2127            // Handle formats like "1,234 bytes" or "1234"
2128            let size_str = size_part
2129                .split_whitespace()
2130                .next()
2131                .unwrap_or("0")
2132                .replace(',', "");
2133            stats.bytes_transferred = size_str.parse().unwrap_or(0);
2134        }
2135    }
2136
2137    stats
2138}
2139
2140// =============================================================================
2141// Sync Status Persistence
2142// =============================================================================
2143
2144/// Result of a sync operation for a source.
2145#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
2146#[serde(rename_all = "snake_case")]
2147pub enum SyncResult {
2148    /// Sync completed successfully.
2149    Success,
2150    /// Some paths synced, some failed.
2151    PartialFailure(String),
2152    /// Sync failed completely.
2153    Failed(String),
2154    /// Sync was skipped (e.g., dry run).
2155    #[default]
2156    Skipped,
2157}
2158
2159impl SyncResult {
2160    /// Short display label for the result.
2161    pub fn label(&self) -> &'static str {
2162        match self {
2163            Self::Success => "success",
2164            Self::PartialFailure(_) => "partial",
2165            Self::Failed(_) => "failed",
2166            Self::Skipped => "never",
2167        }
2168    }
2169
2170    /// Error text for partial/full failures.
2171    pub fn error_message(&self) -> Option<&str> {
2172        match self {
2173            Self::PartialFailure(error) | Self::Failed(error) => Some(error.as_str()),
2174            Self::Success | Self::Skipped => None,
2175        }
2176    }
2177}
2178
2179/// Scheduler action for a remote source.
2180#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
2181#[serde(rename_all = "snake_case")]
2182pub enum SourceSyncAction {
2183    /// The source is eligible to sync now.
2184    Sync,
2185    /// The source is healthy enough but not due under its configured schedule.
2186    Skip,
2187    /// The source is temporarily or operationally unsafe to sync automatically.
2188    Defer,
2189}
2190
2191impl SourceSyncAction {
2192    pub fn as_str(self) -> &'static str {
2193        match self {
2194            Self::Sync => "sync",
2195            Self::Skip => "skip",
2196            Self::Defer => "defer",
2197        }
2198    }
2199}
2200
2201/// Health class used by the adaptive source scheduler.
2202#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
2203#[serde(rename_all = "snake_case")]
2204pub enum SourceHealthKind {
2205    NeverSynced,
2206    Healthy,
2207    Stale,
2208    HighLatency,
2209    Flapping,
2210    AuthFailed,
2211    BackingOff,
2212}
2213
2214impl SourceHealthKind {
2215    pub fn as_str(self) -> &'static str {
2216        match self {
2217            Self::NeverSynced => "never_synced",
2218            Self::Healthy => "healthy",
2219            Self::Stale => "stale",
2220            Self::HighLatency => "high_latency",
2221            Self::Flapping => "flapping",
2222            Self::AuthFailed => "auth_failed",
2223            Self::BackingOff => "backing_off",
2224        }
2225    }
2226}
2227
2228/// Evidence-backed scheduling decision for one source.
2229#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
2230pub struct SourceSyncDecision {
2231    /// Decision action the scheduler would take.
2232    pub action: SourceSyncAction,
2233    /// Current health class inferred from durable sync state.
2234    pub health: SourceHealthKind,
2235    /// Coarse 0..=100 health score for sorting/explanations.
2236    pub health_score: u8,
2237    /// Age of the last sync attempt, capped at zero when clocks move backward.
2238    pub staleness_ms: Option<i64>,
2239    /// Coarse 0..=100 estimate of value from refreshing stale remote data.
2240    pub stale_value_score: u8,
2241    /// Whether an explicit operator request is overriding automatic scheduling.
2242    pub manual_override: bool,
2243    /// Whether the decision is using the conservative fallback path.
2244    pub fallback_active: bool,
2245    /// Next time this source is eligible under its configured schedule.
2246    pub next_eligible_sync_ms: Option<i64>,
2247    /// End of transient failure backoff when applicable.
2248    pub backoff_until_ms: Option<i64>,
2249    /// Human-readable evidence terms, stable enough for robot consumers.
2250    pub reasons: Vec<String>,
2251}
2252
2253impl SourceSyncDecision {
2254    fn evaluate(
2255        source: &SourceDefinition,
2256        info: Option<&SourceSyncInfo>,
2257        now_ms: i64,
2258        manual_override: bool,
2259    ) -> Self {
2260        let period_ms = sync_schedule_period_ms(source.sync_schedule);
2261        let next_eligible_sync_ms = info
2262            .and_then(|info| info.last_sync)
2263            .and_then(|last_sync| period_ms.map(|period| last_sync.saturating_add(period)));
2264        let backoff_until_ms = info.and_then(failure_backoff_until_ms);
2265        let staleness_ms = info.and_then(|info| {
2266            info.last_sync
2267                .map(|last_sync| now_ms.saturating_sub(last_sync).max(0))
2268        });
2269        let stale_value_score =
2270            stale_value_score_for_source(source.sync_schedule, staleness_ms, info);
2271        let mut reasons = Vec::new();
2272
2273        let health = match info {
2274            None => {
2275                reasons.push("no durable sync status exists for this source".to_string());
2276                SourceHealthKind::NeverSynced
2277            }
2278            Some(info) if info.last_sync.is_none() => {
2279                reasons.push("source has never completed or attempted a sync".to_string());
2280                SourceHealthKind::NeverSynced
2281            }
2282            Some(info) if sync_result_auth_failure(&info.last_result) => {
2283                reasons
2284                    .push("last sync failed with an authentication or host-key error".to_string());
2285                SourceHealthKind::AuthFailed
2286            }
2287            Some(info) if matches!(info.last_result, SyncResult::PartialFailure(_)) => {
2288                reasons.push("last sync partially succeeded and partially failed".to_string());
2289                SourceHealthKind::Flapping
2290            }
2291            Some(info)
2292                if info.consecutive_failures > 0
2293                    && backoff_until_ms.is_some_and(|until| until > now_ms) =>
2294            {
2295                reasons.push(format!(
2296                    "{} consecutive failure(s) are inside retry backoff",
2297                    info.consecutive_failures
2298                ));
2299                SourceHealthKind::BackingOff
2300            }
2301            Some(info) if matches!(info.last_result, SyncResult::Failed(_)) => {
2302                let error = info.last_result.error_message().unwrap_or("unknown error");
2303                reasons.push(format!(
2304                    "last sync failed completely ({error}); local fallback remains active"
2305                ));
2306                SourceHealthKind::Flapping
2307            }
2308            Some(info) if info.duration_ms >= SOURCE_HIGH_LATENCY_MS => {
2309                reasons.push(format!(
2310                    "last sync took {}ms, above {}ms high-latency guard",
2311                    info.duration_ms, SOURCE_HIGH_LATENCY_MS
2312                ));
2313                SourceHealthKind::HighLatency
2314            }
2315            Some(info) if sync_schedule_due(info.last_sync, period_ms, now_ms) => {
2316                reasons.push("configured sync schedule is due".to_string());
2317                SourceHealthKind::Stale
2318            }
2319            Some(_) => SourceHealthKind::Healthy,
2320        };
2321
2322        let fallback_active = matches!(
2323            health,
2324            SourceHealthKind::AuthFailed
2325                | SourceHealthKind::BackingOff
2326                | SourceHealthKind::Flapping
2327                | SourceHealthKind::HighLatency
2328        );
2329
2330        let mut action = if manual_override {
2331            reasons.push("explicit sync command overrides automatic scheduling".to_string());
2332            SourceSyncAction::Sync
2333        } else {
2334            automatic_source_sync_action(source.sync_schedule, health, info, now_ms)
2335        };
2336
2337        if !manual_override && matches!(health, SourceHealthKind::AuthFailed) {
2338            action = SourceSyncAction::Defer;
2339        }
2340
2341        if !manual_override && matches!(source.sync_schedule, SyncSchedule::Manual) {
2342            reasons.push("sync_schedule=manual requires an explicit sync command".to_string());
2343        }
2344
2345        if !manual_override
2346            && matches!(action, SourceSyncAction::Skip)
2347            && let Some(next_ms) = next_eligible_sync_ms
2348        {
2349            reasons.push(format!(
2350                "next scheduled sync is eligible at unix_ms={next_ms}"
2351            ));
2352        }
2353
2354        if reasons.is_empty() {
2355            reasons.push("source is healthy and within schedule".to_string());
2356        }
2357
2358        Self {
2359            action,
2360            health,
2361            health_score: health_score_for_source(health),
2362            staleness_ms,
2363            stale_value_score,
2364            manual_override,
2365            fallback_active,
2366            next_eligible_sync_ms,
2367            backoff_until_ms,
2368            reasons,
2369        }
2370    }
2371}
2372
2373/// Sync information for a single source.
2374#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
2375pub struct SourceSyncInfo {
2376    /// Timestamp of last sync attempt.
2377    pub last_sync: Option<i64>,
2378    /// Result of last sync.
2379    pub last_result: SyncResult,
2380    /// Number of files synced in last sync.
2381    pub files_synced: u64,
2382    /// Number of bytes transferred in last sync.
2383    pub bytes_transferred: u64,
2384    /// Duration of last sync in milliseconds.
2385    pub duration_ms: u64,
2386    /// Consecutive failed sync attempts, reset to zero by a fully successful sync.
2387    #[serde(default)]
2388    pub consecutive_failures: u32,
2389}
2390
2391impl SourceSyncInfo {
2392    /// Build sync info from a sync report using the current wall clock time.
2393    pub fn from_report(report: &SyncReport) -> Self {
2394        let last_result = report.sync_result();
2395        Self {
2396            last_sync: Some(current_unix_ms()),
2397            consecutive_failures: u32::from(!report.all_succeeded),
2398            last_result,
2399            files_synced: report.total_files(),
2400            bytes_transferred: report.total_bytes(),
2401            duration_ms: report.total_duration_ms,
2402        }
2403    }
2404}
2405
2406/// Persistent sync status for all sources.
2407#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
2408pub struct SyncStatus {
2409    /// Sync info per source (keyed by source name).
2410    pub sources: std::collections::HashMap<String, SourceSyncInfo>,
2411}
2412
2413impl SyncStatus {
2414    /// Load sync status from disk.
2415    pub fn load(data_dir: &Path) -> Result<Self, std::io::Error> {
2416        let path = Self::status_path(data_dir);
2417        match std::fs::read_to_string(&path) {
2418            Ok(content) => serde_json::from_str(&content)
2419                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
2420            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()),
2421            Err(e) => Err(e),
2422        }
2423    }
2424
2425    /// Save sync status to disk.
2426    ///
2427    /// Uses an atomic rename on Unix. On Windows, falls back to remove-then-rename
2428    /// because replacing an existing destination with `std::fs::rename` fails.
2429    pub fn save(&self, data_dir: &Path) -> Result<(), std::io::Error> {
2430        let path = Self::status_path(data_dir);
2431        if let Some(parent) = path.parent() {
2432            std::fs::create_dir_all(parent)?;
2433        }
2434        let content = serde_json::to_string_pretty(self)?;
2435        let tmp_path = unique_atomic_temp_path(&path);
2436        std::fs::write(&tmp_path, content)?;
2437        sync_file_path(&tmp_path)?;
2438        replace_file_from_temp(&tmp_path, &path)
2439    }
2440
2441    /// Update status for a source from a sync report.
2442    pub fn update(&mut self, source_name: &str, report: &SyncReport) {
2443        let previous_failures = self
2444            .get(source_name)
2445            .map(|info| info.consecutive_failures)
2446            .unwrap_or_default();
2447        let mut info = SourceSyncInfo::from_report(report);
2448        if report.all_succeeded {
2449            info.consecutive_failures = 0;
2450        } else {
2451            info.consecutive_failures = previous_failures.saturating_add(1);
2452        }
2453        self.set_info(source_name, info);
2454    }
2455
2456    /// Set status for a source from precomputed sync info.
2457    pub fn set_info(&mut self, source_name: &str, info: SourceSyncInfo) {
2458        self.sources.insert(source_name.to_string(), info);
2459    }
2460
2461    /// Drop sync status entries for sources that no longer exist.
2462    ///
2463    /// Returns `true` when at least one stale entry was removed.
2464    pub fn retain_sources<'a>(&mut self, source_names: impl IntoIterator<Item = &'a str>) -> bool {
2465        let allowed: std::collections::HashSet<&str> = source_names.into_iter().collect();
2466        let previous_len = self.sources.len();
2467        self.sources
2468            .retain(|source_name, _| allowed.contains(source_name.as_str()));
2469        self.sources.len() != previous_len
2470    }
2471
2472    /// Get sync info for a source.
2473    pub fn get(&self, source_name: &str) -> Option<&SourceSyncInfo> {
2474        self.sources.get(source_name)
2475    }
2476
2477    /// Evaluate automatic scheduling for one source at a deterministic timestamp.
2478    pub fn decision_for_source_at(
2479        &self,
2480        source: &SourceDefinition,
2481        now_ms: i64,
2482        manual_override: bool,
2483    ) -> SourceSyncDecision {
2484        SourceSyncDecision::evaluate(source, self.get(&source.name), now_ms, manual_override)
2485    }
2486
2487    /// Get the path to the status file.
2488    fn status_path(data_dir: &Path) -> PathBuf {
2489        data_dir.join("sync_status.json")
2490    }
2491}
2492
2493const SOURCE_HIGH_LATENCY_MS: u64 = 60_000;
2494const SOURCE_FAILURE_BACKOFF_BASE_MS: i64 = 5 * 60 * 1000;
2495const SOURCE_FAILURE_BACKOFF_MAX_MS: i64 = 60 * 60 * 1000;
2496
2497pub(crate) fn current_unix_ms() -> i64 {
2498    let now = std::time::SystemTime::now()
2499        .duration_since(std::time::UNIX_EPOCH)
2500        .unwrap_or_default()
2501        .as_millis();
2502    i64::try_from(now).unwrap_or(i64::MAX)
2503}
2504
2505fn sync_schedule_period_ms(schedule: SyncSchedule) -> Option<i64> {
2506    match schedule {
2507        SyncSchedule::Manual => None,
2508        SyncSchedule::Hourly => Some(60 * 60 * 1000),
2509        SyncSchedule::Daily => Some(24 * 60 * 60 * 1000),
2510    }
2511}
2512
2513fn sync_schedule_due(last_sync: Option<i64>, period_ms: Option<i64>, now_ms: i64) -> bool {
2514    match (last_sync, period_ms) {
2515        (None, _) => true,
2516        (Some(_), None) => false,
2517        (Some(last_sync), Some(period_ms)) => last_sync.saturating_add(period_ms) <= now_ms,
2518    }
2519}
2520
2521fn automatic_source_sync_action(
2522    schedule: SyncSchedule,
2523    health: SourceHealthKind,
2524    info: Option<&SourceSyncInfo>,
2525    now_ms: i64,
2526) -> SourceSyncAction {
2527    match health {
2528        SourceHealthKind::AuthFailed | SourceHealthKind::BackingOff => SourceSyncAction::Defer,
2529        _ if matches!(schedule, SyncSchedule::Manual) => SourceSyncAction::Skip,
2530        SourceHealthKind::NeverSynced | SourceHealthKind::Stale => SourceSyncAction::Sync,
2531        SourceHealthKind::Flapping | SourceHealthKind::HighLatency => {
2532            if sync_schedule_due(
2533                info.and_then(|info| info.last_sync),
2534                sync_schedule_period_ms(schedule),
2535                now_ms,
2536            ) {
2537                SourceSyncAction::Sync
2538            } else {
2539                SourceSyncAction::Skip
2540            }
2541        }
2542        SourceHealthKind::Healthy => {
2543            if sync_schedule_due(
2544                info.and_then(|info| info.last_sync),
2545                sync_schedule_period_ms(schedule),
2546                now_ms,
2547            ) {
2548                SourceSyncAction::Sync
2549            } else {
2550                SourceSyncAction::Skip
2551            }
2552        }
2553    }
2554}
2555
2556fn health_score_for_source(health: SourceHealthKind) -> u8 {
2557    match health {
2558        SourceHealthKind::Healthy => 100,
2559        SourceHealthKind::Stale => 75,
2560        SourceHealthKind::NeverSynced => 65,
2561        SourceHealthKind::HighLatency => 55,
2562        SourceHealthKind::Flapping => 40,
2563        SourceHealthKind::BackingOff => 25,
2564        SourceHealthKind::AuthFailed => 10,
2565    }
2566}
2567
2568fn stale_value_score_for_source(
2569    schedule: SyncSchedule,
2570    staleness_ms: Option<i64>,
2571    info: Option<&SourceSyncInfo>,
2572) -> u8 {
2573    let Some(info) = info else {
2574        return 100;
2575    };
2576    if info.last_sync.is_none() {
2577        return 100;
2578    }
2579
2580    let Some(staleness_ms) = staleness_ms else {
2581        return 100;
2582    };
2583
2584    let Some(period_ms) = sync_schedule_period_ms(schedule) else {
2585        return 0;
2586    };
2587
2588    let score = staleness_ms.saturating_mul(100) / period_ms.max(1);
2589    u8::try_from(score.clamp(0, 100)).unwrap_or(100)
2590}
2591
2592fn failure_backoff_until_ms(info: &SourceSyncInfo) -> Option<i64> {
2593    if info.consecutive_failures == 0 {
2594        return None;
2595    }
2596    let last_sync = info.last_sync?;
2597    let exponent = info.consecutive_failures.saturating_sub(1).min(4);
2598    let multiplier = 1_i64.checked_shl(exponent).unwrap_or(16);
2599    let backoff_ms = SOURCE_FAILURE_BACKOFF_BASE_MS
2600        .saturating_mul(multiplier)
2601        .min(SOURCE_FAILURE_BACKOFF_MAX_MS);
2602    Some(last_sync.saturating_add(backoff_ms))
2603}
2604
2605fn sync_result_auth_failure(result: &SyncResult) -> bool {
2606    let Some(error) = result.error_message() else {
2607        return false;
2608    };
2609    let error = error.to_ascii_lowercase();
2610    error.contains("permission denied")
2611        || error.contains("authentication")
2612        || error.contains("host key verification failed")
2613        || error.contains("known_hosts")
2614        || error.contains("no valid authentication")
2615}
2616
2617fn unique_atomic_temp_path(path: &Path) -> PathBuf {
2618    unique_atomic_sidecar_path(path, "tmp", "sync_status.json")
2619}
2620
2621fn replace_file_from_temp(temp_path: &Path, final_path: &Path) -> Result<(), std::io::Error> {
2622    #[cfg(windows)]
2623    {
2624        match std::fs::rename(temp_path, final_path) {
2625            Ok(()) => sync_parent_directory(final_path),
2626            Err(first_err)
2627                if final_path.exists()
2628                    && matches!(
2629                        first_err.kind(),
2630                        std::io::ErrorKind::AlreadyExists | std::io::ErrorKind::PermissionDenied
2631                    ) =>
2632            {
2633                let backup_path = unique_replace_backup_path(final_path);
2634                std::fs::rename(final_path, &backup_path).map_err(|backup_err| {
2635                    let _ = std::fs::remove_file(temp_path);
2636                    std::io::Error::other(format!(
2637                        "failed preparing backup {} before replacing {}: first error: {}; backup error: {}",
2638                        backup_path.display(),
2639                        final_path.display(),
2640                        first_err,
2641                        backup_err
2642                    ))
2643                })?;
2644                match std::fs::rename(temp_path, final_path) {
2645                    Ok(()) => {
2646                        let _ = std::fs::remove_file(&backup_path);
2647                        sync_parent_directory(final_path)
2648                    }
2649                    Err(second_err) => {
2650                        let restore_result = std::fs::rename(&backup_path, final_path);
2651                        match restore_result {
2652                            Ok(()) => {
2653                                let _ = std::fs::remove_file(temp_path);
2654                                sync_parent_directory(final_path).map_err(|sync_err| {
2655                                    std::io::Error::other(format!(
2656                                        "failed replacing {} with {}: first error: {}; second error: {}; restored original file but failed syncing parent directory: {}",
2657                                        final_path.display(),
2658                                        temp_path.display(),
2659                                        first_err,
2660                                        second_err,
2661                                        sync_err
2662                                    ))
2663                                })?;
2664                                Err(std::io::Error::new(
2665                                    second_err.kind(),
2666                                    format!(
2667                                        "failed replacing {} with {}: first error: {}; second error: {}; restored original file",
2668                                        final_path.display(),
2669                                        temp_path.display(),
2670                                        first_err,
2671                                        second_err
2672                                    ),
2673                                ))
2674                            }
2675                            Err(restore_err) => Err(std::io::Error::other(format!(
2676                                "failed replacing {} with {}: first error: {}; second error: {}; restore error: {}; temp file retained at {}",
2677                                final_path.display(),
2678                                temp_path.display(),
2679                                first_err,
2680                                second_err,
2681                                restore_err,
2682                                temp_path.display()
2683                            ))),
2684                        }
2685                    }
2686                }
2687            }
2688            Err(rename_err) => Err(rename_err),
2689        }
2690    }
2691
2692    #[cfg(not(windows))]
2693    {
2694        std::fs::rename(temp_path, final_path)?;
2695        sync_parent_directory(final_path)
2696    }
2697}
2698
2699fn sync_file_path(path: &Path) -> Result<(), std::io::Error> {
2700    std::fs::File::open(path)?.sync_all()
2701}
2702
2703#[cfg(not(windows))]
2704fn sync_parent_directory(path: &Path) -> Result<(), std::io::Error> {
2705    let Some(parent) = path.parent() else {
2706        return Ok(());
2707    };
2708    std::fs::File::open(parent)?.sync_all()
2709}
2710
2711#[cfg(windows)]
2712fn sync_parent_directory(_path: &Path) -> Result<(), std::io::Error> {
2713    Ok(())
2714}
2715
2716#[cfg(windows)]
2717fn unique_replace_backup_path(path: &Path) -> PathBuf {
2718    unique_atomic_sidecar_path(path, "bak", "sync_status.json")
2719}
2720
2721fn unique_atomic_sidecar_path(path: &Path, suffix: &str, fallback_name: &str) -> PathBuf {
2722    static NEXT_NONCE: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
2723
2724    let timestamp = std::time::SystemTime::now()
2725        .duration_since(std::time::UNIX_EPOCH)
2726        .unwrap_or_default()
2727        .as_nanos();
2728    let nonce = NEXT_NONCE.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2729    let file_name = path
2730        .file_name()
2731        .and_then(|name| name.to_str())
2732        .unwrap_or(fallback_name);
2733
2734    path.with_file_name(format!(
2735        ".{file_name}.{suffix}.{}.{}.{}",
2736        std::process::id(),
2737        timestamp,
2738        nonce
2739    ))
2740}
2741
2742#[cfg(test)]
2743mod tests {
2744    use super::*;
2745    use tempfile::TempDir;
2746
2747    #[test]
2748    fn test_path_to_safe_dirname() {
2749        let res = path_to_safe_dirname("~/.claude/projects");
2750        assert!(res.starts_with(".claude_projects_"));
2751
2752        let res = path_to_safe_dirname("/home/user/data");
2753        assert!(res.starts_with("home_user_data_"));
2754
2755        let res = path_to_safe_dirname("~/");
2756        assert!(res.starts_with("root_"));
2757
2758        let res = path_to_safe_dirname("");
2759        assert!(res.starts_with("root_"));
2760    }
2761
2762    #[test]
2763    fn test_path_to_safe_dirname_empty() {
2764        let res = path_to_safe_dirname("~");
2765        assert!(res.starts_with("root_"));
2766
2767        let res = path_to_safe_dirname("/");
2768        assert!(res.starts_with("root_"));
2769    }
2770
2771    #[test]
2772    fn test_path_to_safe_dirname_strips_traversal_components() {
2773        let res = path_to_safe_dirname("../../etc/passwd");
2774
2775        assert!(res.starts_with("etc_passwd_"));
2776        assert!(!res.contains(".."));
2777        assert!(!res.contains('/'));
2778        assert!(!res.contains('\\'));
2779    }
2780
2781    #[test]
2782    fn test_get_remote_home_rejects_unsafe_hosts_before_ssh() {
2783        let temp = TempDir::new().unwrap();
2784        let engine = SyncEngine::new(temp.path());
2785
2786        for host in [
2787            "work-mac;touch /tmp/cass-owned",
2788            "work mac",
2789            "work-mac\nhostname",
2790            "work-mac`hostname`",
2791            "work-mac/../../secret",
2792            "-oProxyCommand=evil",
2793            "",
2794            "@host",
2795            "user@",
2796            "user@host@extra",
2797        ] {
2798            let err = engine.get_remote_home(host).unwrap_err();
2799            assert!(
2800                matches!(err, SyncError::SshFailed(ref message) if message.contains("Invalid characters in host")),
2801                "expected invalid-host rejection for {host:?}, got {err}"
2802            );
2803        }
2804    }
2805
2806    #[test]
2807    fn test_sync_source_rejects_invalid_source_name_before_mirror_creation() {
2808        let temp = TempDir::new().unwrap();
2809        let engine = SyncEngine::new(temp.path());
2810        let mut source = SourceDefinition::ssh("../escape", "user@host");
2811        source.paths = vec!["/tmp/sessions".to_string()];
2812
2813        let err = engine
2814            .sync_source(&source)
2815            .expect_err("invalid source name should fail before local writes");
2816
2817        assert!(
2818            matches!(err, SyncError::InvalidSource(ref message) if message.contains("Source name cannot contain path separators")),
2819            "expected invalid source-name rejection, got {err}"
2820        );
2821        assert!(
2822            !temp.path().join("escape").exists(),
2823            "invalid source name must not escape the remotes mirror layout"
2824        );
2825        assert!(
2826            !temp.path().join("remotes").exists(),
2827            "invalid source name must be rejected before creating mirror roots"
2828        );
2829    }
2830
2831    #[test]
2832    fn test_sync_source_rejects_invalid_host_before_mirror_creation() {
2833        let temp = TempDir::new().unwrap();
2834        let engine = SyncEngine::new(temp.path());
2835        let mut source = SourceDefinition::ssh("unsafe-host", "user@host withspace");
2836        source.paths = vec!["/tmp/sessions".to_string()];
2837
2838        let err = engine
2839            .sync_source(&source)
2840            .expect_err("invalid host should fail before local writes");
2841
2842        assert!(
2843            matches!(err, SyncError::InvalidSource(ref message) if message.contains("SSH host cannot contain whitespace")),
2844            "expected invalid host rejection, got {err}"
2845        );
2846        assert!(
2847            !temp.path().join("remotes").exists(),
2848            "invalid host must be rejected before creating mirror roots"
2849        );
2850    }
2851
2852    #[test]
2853    fn test_sync_source_reports_invalid_remote_paths_without_transfer() {
2854        let temp = TempDir::new().unwrap();
2855        let engine = SyncEngine::new(temp.path());
2856
2857        for (path, expected) in [
2858            ("", "paths[0] cannot be empty"),
2859            ("   ", "paths[0] cannot be empty"),
2860            (" ~/.claude/projects", "paths[0] cannot have leading"),
2861            ("~/.claude/projects ", "paths[0] cannot have leading"),
2862            ("~/.claude\nprojects", "paths[0] cannot contain control"),
2863        ] {
2864            let mut source = SourceDefinition::ssh("laptop", "user@laptop.local");
2865            source.paths = vec![path.to_string()];
2866
2867            let report = engine.sync_source(&source).unwrap();
2868            assert_eq!(report.path_results.len(), 1);
2869            let result = &report.path_results[0];
2870            assert!(!result.success);
2871            assert_eq!(result.remote_path, path);
2872            assert!(
2873                result
2874                    .error
2875                    .as_deref()
2876                    .is_some_and(|message| message.contains(expected)),
2877                "expected invalid path rejection for {path:?}, got {result:?}"
2878            );
2879        }
2880    }
2881
2882    #[test]
2883    fn test_remote_sync_path_validation_allows_internal_spaces() {
2884        assert!(
2885            validate_remote_sync_path_entry(
2886                0,
2887                "~/Library/Application Support/Cursor/User/globalStorage"
2888            )
2889            .is_ok()
2890        );
2891    }
2892
2893    #[test]
2894    fn test_sync_source_preserves_path_result_order_for_mixed_invalid_paths() {
2895        let temp = TempDir::new().unwrap();
2896        let engine = SyncEngine::new(temp.path()).with_connection_timeout(1);
2897        // Use a validation-safe TEST-NET host so source structure checks pass,
2898        // but remote-home lookup still fails quickly before path result ordering.
2899        let mut source = SourceDefinition::ssh("laptop", "192.0.2.1");
2900        source.paths = vec![
2901            "~/.codex/sessions".to_string(),
2902            " ~/.claude/projects".to_string(),
2903            "~/.gemini/tmp".to_string(),
2904        ];
2905
2906        let report = engine.sync_source(&source).unwrap();
2907        let remote_paths = report
2908            .path_results
2909            .iter()
2910            .map(|result| result.remote_path.as_str())
2911            .collect::<Vec<_>>();
2912
2913        assert_eq!(
2914            remote_paths,
2915            vec!["~/.codex/sessions", " ~/.claude/projects", "~/.gemini/tmp"]
2916        );
2917        assert!(
2918            report.path_results[1]
2919                .error
2920                .as_deref()
2921                .is_some_and(|message| message.contains("paths[1] cannot have leading")),
2922            "expected invalid path error in original slot: {:?}",
2923            report.path_results
2924        );
2925    }
2926
2927    #[test]
2928    fn test_remote_find_regular_files_command_uses_physical_traversal() {
2929        assert_eq!(
2930            remote_find_regular_files_command("/tmp/has space"),
2931            "find -P '/tmp/has space' -type f -print0"
2932        );
2933        assert_eq!(
2934            remote_find_regular_files_command("/tmp/that's all"),
2935            "find -P '/tmp/that'\\''s all' -type f -print0"
2936        );
2937    }
2938
2939    #[test]
2940    fn test_parse_remote_home_stdout_accepts_single_absolute_candidate() {
2941        assert_eq!(
2942            parse_remote_home_stdout(b"Welcome to host\nCASS_HOME_MARKER:/home/user\n"),
2943            Some("/home/user".to_string())
2944        );
2945        assert_eq!(
2946            parse_remote_home_stdout(b"CASS_HOME_MARKER:/Users/test user\r\n"),
2947            Some("/Users/test user".to_string())
2948        );
2949    }
2950
2951    #[test]
2952    fn test_parse_remote_home_stdout_rejects_missing_or_ambiguous_home() {
2953        assert_eq!(parse_remote_home_stdout(b"Welcome to host\n"), None);
2954        assert_eq!(
2955            parse_remote_home_stdout(b"CASS_HOME_MARKER:not_absolute\n"),
2956            None
2957        );
2958    }
2959
2960    #[test]
2961    fn test_parse_null_terminated_utf8_paths_skips_invalid_entries() {
2962        let paths = parse_null_terminated_utf8_paths(
2963            b"/remote/sessions/a.jsonl\0bad-\xff-name\0/remote/sessions/b.jsonl\0",
2964        );
2965        assert_eq!(
2966            paths,
2967            vec![
2968                "/remote/sessions/a.jsonl".to_string(),
2969                "/remote/sessions/b.jsonl".to_string()
2970            ]
2971        );
2972    }
2973
2974    #[test]
2975    fn test_remote_file_to_safe_local_path_rejects_outside_root() {
2976        let root = Path::new("/remote/sessions");
2977        let local = Path::new("/mirror/root");
2978
2979        assert_eq!(
2980            remote_file_to_safe_local_path(
2981                root,
2982                Path::new("/remote/sessions/a/b.jsonl"),
2983                local,
2984                "sessions"
2985            ),
2986            Some(PathBuf::from("/mirror/root/sessions/a/b.jsonl"))
2987        );
2988        assert_eq!(
2989            remote_file_to_safe_local_path(
2990                Path::new("/remote/session.jsonl"),
2991                Path::new("/remote/session.jsonl"),
2992                local,
2993                "session.jsonl"
2994            ),
2995            Some(PathBuf::from("/mirror/root/session.jsonl"))
2996        );
2997        assert_eq!(
2998            remote_file_to_safe_local_path(
2999                root,
3000                Path::new("/remote/sessions/../secret.txt"),
3001                local,
3002                "sessions"
3003            ),
3004            None
3005        );
3006        assert_eq!(
3007            remote_file_to_safe_local_path(
3008                root,
3009                Path::new("/remote/other/secret.txt"),
3010                local,
3011                "sessions"
3012            ),
3013            None
3014        );
3015    }
3016
3017    #[test]
3018    fn test_local_symlink_guard_allows_regular_paths() {
3019        let temp = TempDir::new().expect("tempdir");
3020        let root = temp.path().join("mirror");
3021        let target = root.join("sessions/session.jsonl");
3022
3023        assert!(reject_local_symlink_below_root(&root, &target).is_ok());
3024
3025        std::fs::create_dir_all(target.parent().expect("target parent")).expect("create parent");
3026        std::fs::write(&target, "{}").expect("write target");
3027
3028        assert!(reject_local_symlink_below_root(&root, &target).is_ok());
3029    }
3030
3031    #[cfg(unix)]
3032    #[test]
3033    fn test_local_symlink_guard_rejects_nested_symlink() {
3034        use std::os::unix::fs::symlink;
3035
3036        let temp = TempDir::new().expect("tempdir");
3037        let root = temp.path().join("mirror");
3038        let outside = temp.path().join("outside");
3039        std::fs::create_dir_all(&root).expect("create root");
3040        std::fs::create_dir_all(&outside).expect("create outside");
3041        symlink(&outside, root.join("sessions")).expect("symlink nested dir");
3042
3043        let err = reject_local_symlink_below_root(&root, &root.join("sessions/session.jsonl"))
3044            .expect_err("nested symlink should be rejected");
3045
3046        assert!(err.contains("Refusing to write"));
3047        assert!(err.contains("sessions"));
3048    }
3049
3050    #[cfg(unix)]
3051    #[test]
3052    fn test_local_symlink_guard_rejects_root_symlink() {
3053        use std::os::unix::fs::symlink;
3054
3055        let temp = TempDir::new().expect("tempdir");
3056        let outside = temp.path().join("outside");
3057        let root = temp.path().join("mirror-link");
3058        std::fs::create_dir_all(&outside).expect("create outside");
3059        symlink(&outside, &root).expect("symlink root");
3060
3061        let err = reject_local_symlink_below_root(&root, &root.join("session.jsonl"))
3062            .expect_err("root symlink should be rejected");
3063
3064        assert!(err.contains("Refusing to write"));
3065        assert!(err.contains("mirror-link"));
3066    }
3067
3068    #[test]
3069    fn test_prepare_local_sync_container_creates_regular_container() {
3070        let temp = TempDir::new().expect("tempdir");
3071        let root = temp.path().join("mirror");
3072        let target = root.join("sessions");
3073
3074        prepare_local_sync_container(&root, &target).expect("regular container should be created");
3075
3076        assert!(target.is_dir());
3077    }
3078
3079    #[cfg(unix)]
3080    #[test]
3081    fn test_prepare_local_sync_container_rejects_preexisting_target_symlink() {
3082        use std::os::unix::fs::symlink;
3083
3084        let temp = TempDir::new().expect("tempdir");
3085        let root = temp.path().join("mirror");
3086        let outside = temp.path().join("outside");
3087        let target = root.join("sessions");
3088        std::fs::create_dir_all(&root).expect("create root");
3089        std::fs::create_dir_all(&outside).expect("create outside");
3090        symlink(&outside, &target).expect("symlink target");
3091
3092        let err = prepare_local_sync_container(&root, &target)
3093            .expect_err("sync container symlink should be rejected");
3094
3095        assert!(err.contains("Refusing to write"));
3096        assert!(err.contains("sessions"));
3097    }
3098
3099    #[cfg(unix)]
3100    #[test]
3101    fn test_prepare_local_sync_container_rejects_root_symlink() {
3102        use std::os::unix::fs::symlink;
3103
3104        let temp = TempDir::new().expect("tempdir");
3105        let outside = temp.path().join("outside");
3106        let root = temp.path().join("mirror-link");
3107        let target = root.join("sessions");
3108        std::fs::create_dir_all(&outside).expect("create outside");
3109        symlink(&outside, &root).expect("symlink root");
3110
3111        let err = prepare_local_sync_container(&root, &target)
3112            .expect_err("sync root symlink should be rejected");
3113
3114        assert!(err.contains("Refusing to write"));
3115        assert!(err.contains("mirror-link"));
3116    }
3117
3118    #[cfg(unix)]
3119    #[test]
3120    fn test_prepare_local_sync_root_rejects_symlinked_source_parent() {
3121        use std::os::unix::fs::symlink;
3122
3123        let temp = TempDir::new().expect("tempdir");
3124        let local_store = temp.path().join("data");
3125        let remotes = local_store.join("remotes");
3126        let outside = temp.path().join("outside");
3127        let source_link = remotes.join("laptop");
3128        let mirror_dir = source_link.join("mirror");
3129
3130        std::fs::create_dir_all(&remotes).expect("create remotes");
3131        std::fs::create_dir_all(&outside).expect("create outside");
3132        symlink(&outside, &source_link).expect("symlink source parent");
3133
3134        let err = prepare_local_sync_root(&local_store, &mirror_dir)
3135            .expect_err("symlinked source parent should be rejected before mkdir");
3136
3137        assert!(err.contains("Refusing to write"));
3138        assert!(err.contains("laptop"));
3139        assert!(
3140            !outside.join("mirror").exists(),
3141            "sync root preparation must not create directories through source parent symlinks"
3142        );
3143    }
3144
3145    #[test]
3146    fn test_sftp_file_stat_is_symlink_detects_link_modes() {
3147        let symlink = FileStat {
3148            size: None,
3149            uid: None,
3150            gid: None,
3151            perm: Some(0o120000 | 0o777),
3152            atime: None,
3153            mtime: None,
3154        };
3155        let regular = FileStat {
3156            size: None,
3157            uid: None,
3158            gid: None,
3159            perm: Some(0o100000 | 0o644),
3160            atime: None,
3161            mtime: None,
3162        };
3163
3164        assert!(sftp_file_stat_is_symlink(&symlink));
3165        assert!(!sftp_file_stat_is_symlink(&regular));
3166    }
3167
3168    #[test]
3169    fn test_sftp_entry_file_name_accepts_regular_names() {
3170        let parent = Path::new("/remote");
3171        let entry = parent.join("session.jsonl");
3172
3173        assert_eq!(sftp_entry_file_name(&entry, parent), Some("session.jsonl"));
3174    }
3175
3176    #[test]
3177    fn test_sftp_entry_file_name_skips_dot_entries() {
3178        let parent = Path::new("/remote");
3179
3180        assert_eq!(sftp_entry_file_name(Path::new("."), parent), None);
3181        assert_eq!(sftp_entry_file_name(Path::new(".."), parent), None);
3182    }
3183
3184    #[cfg(unix)]
3185    #[test]
3186    fn test_sftp_entry_file_name_rejects_non_utf8_names() {
3187        use std::ffi::OsStr;
3188        use std::os::unix::ffi::OsStrExt;
3189
3190        let parent = Path::new("/remote");
3191        let bad_component = Path::new(OsStr::from_bytes(b"bad-\xff-name"));
3192        let entry = parent.join(bad_component);
3193
3194        assert_eq!(sftp_entry_file_name(&entry, parent), None);
3195    }
3196
3197    #[test]
3198    fn test_parse_rsync_stats() {
3199        let output = r#"
3200Number of files: 42
3201Number of regular files transferred: 10
3202Total transferred file size: 1,234 bytes
3203        "#;
3204
3205        let stats = parse_rsync_stats(output);
3206        assert_eq!(stats.files_transferred, 10);
3207        assert_eq!(stats.bytes_transferred, 1234);
3208    }
3209
3210    #[test]
3211    fn test_parse_rsync_stats_empty() {
3212        let stats = parse_rsync_stats("");
3213        assert_eq!(stats.files_transferred, 0);
3214        assert_eq!(stats.bytes_transferred, 0);
3215    }
3216
3217    #[test]
3218    fn test_quote_remote_shell_path_handles_spaces_and_quotes() {
3219        assert_eq!(
3220            quote_remote_shell_path("/Users/me/Library/Application Support/Cursor"),
3221            "'/Users/me/Library/Application Support/Cursor'"
3222        );
3223        assert_eq!(
3224            quote_remote_shell_path("/tmp/that's all"),
3225            "'/tmp/that'\\''s all'"
3226        );
3227    }
3228
3229    #[test]
3230    fn test_remote_spec_for_rsync_quotes_only_when_needed() {
3231        assert_eq!(
3232            remote_spec_for_rsync("work-mac", "/tmp/has space", true),
3233            "work-mac:/tmp/has space"
3234        );
3235        assert_eq!(
3236            remote_spec_for_rsync("work-mac", "/tmp/that's all", true),
3237            "work-mac:/tmp/that's all"
3238        );
3239        assert_eq!(
3240            remote_spec_for_rsync("work-mac", "/tmp/has space", false),
3241            "work-mac:'/tmp/has space'"
3242        );
3243    }
3244
3245    #[test]
3246    fn rsync_arg_protection_enum_maps_flags_correctly() {
3247        // Regression for #191: Homebrew rsync 3.4.1 renamed the flag to
3248        // --secluded-args; earlier 3.0–3.3 use --protect-args. The caller
3249        // must pass the name the installed rsync actually accepts in its
3250        // own --help listing.
3251        assert_eq!(
3252            RsyncArgProtection::ProtectArgs.flag(),
3253            Some("--protect-args")
3254        );
3255        assert_eq!(
3256            RsyncArgProtection::SecludedArgs.flag(),
3257            Some("--secluded-args")
3258        );
3259        assert_eq!(RsyncArgProtection::None.flag(), None);
3260        assert!(RsyncArgProtection::ProtectArgs.is_supported());
3261        assert!(RsyncArgProtection::SecludedArgs.is_supported());
3262        assert!(!RsyncArgProtection::None.is_supported());
3263    }
3264
3265    #[test]
3266    fn test_remote_spec_for_shell_bound_copy_quotes_remote_path() {
3267        assert_eq!(
3268            remote_spec_for_shell_bound_copy("work-mac", "/tmp/has space"),
3269            "work-mac:'/tmp/has space'"
3270        );
3271    }
3272
3273    #[test]
3274    fn test_remote_spec_for_scp_always_quotes_remote_path() {
3275        assert_eq!(
3276            remote_spec_for_scp("work-mac", "/tmp/that's all"),
3277            "work-mac:'/tmp/that'\\''s all'"
3278        );
3279    }
3280
3281    #[test]
3282    fn test_sync_report_totals() {
3283        let mut report = SyncReport::new("test", SyncMethod::Rsync);
3284        report.add_path_result(PathSyncResult {
3285            files_transferred: 5,
3286            bytes_transferred: 100,
3287            success: true,
3288            ..Default::default()
3289        });
3290        report.add_path_result(PathSyncResult {
3291            files_transferred: 3,
3292            bytes_transferred: 50,
3293            success: true,
3294            ..Default::default()
3295        });
3296
3297        assert_eq!(report.total_files(), 8);
3298        assert_eq!(report.total_bytes(), 150);
3299        assert!(report.all_succeeded);
3300    }
3301
3302    #[test]
3303    fn test_sync_report_with_failure() {
3304        let mut report = SyncReport::new("test", SyncMethod::Rsync);
3305        report.add_path_result(PathSyncResult {
3306            success: true,
3307            ..Default::default()
3308        });
3309        report.add_path_result(PathSyncResult {
3310            success: false,
3311            error: Some("Connection refused".into()),
3312            ..Default::default()
3313        });
3314
3315        assert!(!report.all_succeeded);
3316        assert_eq!(report.successful_paths(), 1);
3317        assert_eq!(report.failed_paths(), 1);
3318    }
3319
3320    #[test]
3321    fn test_detect_sync_method() {
3322        // This test is platform-dependent but should at least not panic
3323        let method = SyncEngine::detect_sync_method();
3324        assert!(matches!(
3325            method,
3326            SyncMethod::Rsync | SyncMethod::WslRsync | SyncMethod::Scp | SyncMethod::Sftp
3327        ));
3328    }
3329
3330    #[test]
3331    fn test_sync_engine_mirror_dir() {
3332        let engine = SyncEngine::new(Path::new("/data/cass"));
3333        let mirror = engine.mirror_dir("laptop");
3334        assert_eq!(mirror, PathBuf::from("/data/cass/remotes/laptop/mirror"));
3335    }
3336
3337    #[test]
3338    fn test_sync_method_display() {
3339        for (method, expected) in [
3340            (SyncMethod::Rsync, "rsync"),
3341            (SyncMethod::WslRsync, "wsl-rsync"),
3342            (SyncMethod::Scp, "scp"),
3343            (SyncMethod::Sftp, "sftp"),
3344        ] {
3345            assert_eq!(method.as_str(), expected);
3346            assert_eq!(method.to_string(), expected);
3347        }
3348    }
3349
3350    #[test]
3351    fn test_windows_path_to_wsl_drive() {
3352        assert_eq!(
3353            windows_path_to_wsl("C:\\Users\\george\\AppData\\Roaming\\cass"),
3354            "/mnt/c/Users/george/AppData/Roaming/cass"
3355        );
3356    }
3357
3358    #[test]
3359    fn test_windows_path_to_wsl_forward_slash() {
3360        assert_eq!(
3361            windows_path_to_wsl("C:/Users/george/data"),
3362            "/mnt/c/Users/george/data"
3363        );
3364    }
3365
3366    #[test]
3367    fn test_windows_path_to_wsl_non_windows_path_unchanged() {
3368        // A Unix absolute path should pass through unchanged.
3369        assert_eq!(
3370            windows_path_to_wsl("/home/george/data"),
3371            "/home/george/data"
3372        );
3373    }
3374
3375    #[test]
3376    fn test_expand_tilde_with_home() {
3377        // No tilde - returns unchanged
3378        assert_eq!(
3379            SyncEngine::expand_tilde_with_home("/home/user/projects", Some("/home/user")),
3380            "/home/user/projects"
3381        );
3382
3383        // Tilde with home provided
3384        assert_eq!(
3385            SyncEngine::expand_tilde_with_home("~/.claude/projects", Some("/home/user")),
3386            "/home/user/.claude/projects"
3387        );
3388
3389        // Just tilde
3390        assert_eq!(
3391            SyncEngine::expand_tilde_with_home("~", Some("/home/user")),
3392            "/home/user"
3393        );
3394
3395        // Tilde without home - returns unchanged
3396        assert_eq!(
3397            SyncEngine::expand_tilde_with_home("~/.claude/projects", None),
3398            "~/.claude/projects"
3399        );
3400
3401        // ~otheruser/path case - not expanded
3402        assert_eq!(
3403            SyncEngine::expand_tilde_with_home("~otheruser/projects", Some("/home/user")),
3404            "~otheruser/projects"
3405        );
3406    }
3407
3408    #[test]
3409    fn test_sync_report_failed() {
3410        let report = SyncReport::failed("test-source", SyncError::NoHost);
3411        assert_eq!(report.source_name, "test-source");
3412        assert!(!report.all_succeeded);
3413        assert_eq!(report.path_results.len(), 1);
3414        assert!(!report.path_results[0].success);
3415        assert!(report.path_results[0].error.is_some());
3416    }
3417
3418    #[test]
3419    fn test_sync_result_default() {
3420        let result = SyncResult::default();
3421        assert!(matches!(result, SyncResult::Skipped));
3422        assert_eq!(result.label(), "never");
3423    }
3424
3425    #[test]
3426    fn test_source_sync_info_default() {
3427        let info = SourceSyncInfo::default();
3428        assert!(info.last_sync.is_none());
3429        assert_eq!(info.files_synced, 0);
3430        assert_eq!(info.bytes_transferred, 0);
3431        assert_eq!(info.duration_ms, 0);
3432    }
3433
3434    #[test]
3435    fn test_sync_status_update() {
3436        let mut status = SyncStatus::default();
3437
3438        let mut report = SyncReport::new("laptop", SyncMethod::Rsync);
3439        report.add_path_result(PathSyncResult {
3440            files_transferred: 10,
3441            bytes_transferred: 1000,
3442            success: true,
3443            ..Default::default()
3444        });
3445        report.total_duration_ms = 500;
3446
3447        status.update("laptop", &report);
3448
3449        let info = status.get("laptop").unwrap();
3450        assert!(info.last_sync.is_some());
3451        assert!(matches!(info.last_result, SyncResult::Success));
3452        assert_eq!(info.files_synced, 10);
3453        assert_eq!(info.bytes_transferred, 1000);
3454        assert_eq!(info.duration_ms, 500);
3455    }
3456
3457    #[test]
3458    fn test_sync_status_partial_failure() {
3459        let mut status = SyncStatus::default();
3460
3461        let mut report = SyncReport::new("server", SyncMethod::Rsync);
3462        report.add_path_result(PathSyncResult {
3463            success: true,
3464            files_transferred: 5,
3465            ..Default::default()
3466        });
3467        report.add_path_result(PathSyncResult {
3468            success: false,
3469            error: Some("Connection refused".into()),
3470            ..Default::default()
3471        });
3472
3473        status.update("server", &report);
3474
3475        let info = status.get("server").unwrap();
3476        assert!(matches!(info.last_result, SyncResult::PartialFailure(_)));
3477    }
3478
3479    #[test]
3480    fn test_sync_status_full_failure() {
3481        let mut status = SyncStatus::default();
3482
3483        let mut report = SyncReport::new("dead-host", SyncMethod::Rsync);
3484        report.add_path_result(PathSyncResult {
3485            success: false,
3486            error: Some("Host unreachable".into()),
3487            ..Default::default()
3488        });
3489
3490        status.update("dead-host", &report);
3491
3492        let info = status.get("dead-host").unwrap();
3493        assert!(matches!(info.last_result, SyncResult::Failed(_)));
3494    }
3495
3496    #[test]
3497    fn test_sync_status_save_round_trips() {
3498        let temp = TempDir::new().expect("tempdir");
3499        let mut status = SyncStatus::default();
3500        let mut report = SyncReport::new("laptop", SyncMethod::Rsync);
3501        report.add_path_result(PathSyncResult {
3502            files_transferred: 3,
3503            bytes_transferred: 42,
3504            success: true,
3505            ..Default::default()
3506        });
3507        status.update("laptop", &report);
3508
3509        status.save(temp.path()).expect("save status");
3510        let loaded = SyncStatus::load(temp.path()).expect("load status");
3511
3512        let info = loaded.get("laptop").expect("round-tripped source");
3513        assert_eq!(info.files_synced, 3);
3514        assert_eq!(info.bytes_transferred, 42);
3515        assert!(matches!(info.last_result, SyncResult::Success));
3516    }
3517
3518    #[test]
3519    fn test_sync_status_retain_sources_prunes_removed_entries() {
3520        let mut status = SyncStatus::default();
3521        status.sources.insert(
3522            "laptop".into(),
3523            SourceSyncInfo {
3524                files_synced: 3,
3525                ..Default::default()
3526            },
3527        );
3528        status.sources.insert(
3529            "desktop".into(),
3530            SourceSyncInfo {
3531                files_synced: 5,
3532                ..Default::default()
3533            },
3534        );
3535
3536        let removed_any = status.retain_sources(["laptop"]);
3537
3538        assert!(removed_any);
3539        assert!(status.get("laptop").is_some());
3540        assert!(status.get("desktop").is_none());
3541    }
3542
3543    fn source_with_schedule(schedule: SyncSchedule) -> SourceDefinition {
3544        let mut source = SourceDefinition::ssh("laptop", "user@laptop.local");
3545        source.sync_schedule = schedule;
3546        source.paths = vec!["~/.claude/projects".to_string()];
3547        source
3548    }
3549
3550    fn status_with_info(info: SourceSyncInfo) -> SyncStatus {
3551        let mut status = SyncStatus::default();
3552        status.set_info("laptop", info);
3553        status
3554    }
3555
3556    #[test]
3557    fn source_sync_decision_skips_healthy_source_until_schedule_due() {
3558        let now_ms = 1_700_000_000_000;
3559        let source = source_with_schedule(SyncSchedule::Hourly);
3560        let status = status_with_info(SourceSyncInfo {
3561            last_sync: Some(now_ms - 10 * 60 * 1000),
3562            last_result: SyncResult::Success,
3563            duration_ms: 250,
3564            ..Default::default()
3565        });
3566
3567        let decision = status.decision_for_source_at(&source, now_ms, false);
3568
3569        assert_eq!(decision.action, SourceSyncAction::Skip);
3570        assert_eq!(decision.health, SourceHealthKind::Healthy);
3571        assert!(!decision.fallback_active);
3572        assert_eq!(
3573            decision.next_eligible_sync_ms,
3574            Some(now_ms + 50 * 60 * 1000)
3575        );
3576        assert_eq!(decision.staleness_ms, Some(10 * 60 * 1000));
3577        assert_eq!(decision.stale_value_score, 16);
3578    }
3579
3580    #[test]
3581    fn source_sync_decision_syncs_stale_scheduled_source() {
3582        let now_ms = 1_700_000_000_000;
3583        let source = source_with_schedule(SyncSchedule::Hourly);
3584        let status = status_with_info(SourceSyncInfo {
3585            last_sync: Some(now_ms - 2 * 60 * 60 * 1000),
3586            last_result: SyncResult::Success,
3587            duration_ms: 250,
3588            ..Default::default()
3589        });
3590
3591        let decision = status.decision_for_source_at(&source, now_ms, false);
3592
3593        assert_eq!(decision.action, SourceSyncAction::Sync);
3594        assert_eq!(decision.health, SourceHealthKind::Stale);
3595        assert_eq!(decision.stale_value_score, 100);
3596        assert!(
3597            decision
3598                .reasons
3599                .iter()
3600                .any(|reason| reason.contains("schedule is due"))
3601        );
3602    }
3603
3604    #[test]
3605    fn source_sync_decision_defers_auth_failures_with_fallback_reason() {
3606        let now_ms = 1_700_000_000_000;
3607        let source = source_with_schedule(SyncSchedule::Hourly);
3608        let status = status_with_info(SourceSyncInfo {
3609            last_sync: Some(now_ms - 10 * 60 * 1000),
3610            last_result: SyncResult::Failed("Permission denied (publickey)".into()),
3611            duration_ms: 800,
3612            consecutive_failures: 1,
3613            ..Default::default()
3614        });
3615
3616        let decision = status.decision_for_source_at(&source, now_ms, false);
3617
3618        assert_eq!(decision.action, SourceSyncAction::Defer);
3619        assert_eq!(decision.health, SourceHealthKind::AuthFailed);
3620        assert!(decision.fallback_active);
3621        assert_eq!(decision.health_score, 10);
3622    }
3623
3624    #[test]
3625    fn source_sync_decision_marks_partial_success_as_flapping() {
3626        let now_ms = 1_700_000_000_000;
3627        let source = source_with_schedule(SyncSchedule::Hourly);
3628        let status = status_with_info(SourceSyncInfo {
3629            last_sync: Some(now_ms - 10 * 60 * 1000),
3630            last_result: SyncResult::PartialFailure("one path failed".into()),
3631            files_synced: 7,
3632            duration_ms: 900,
3633            consecutive_failures: 1,
3634            ..Default::default()
3635        });
3636
3637        let decision = status.decision_for_source_at(&source, now_ms, false);
3638
3639        assert_eq!(decision.action, SourceSyncAction::Skip);
3640        assert_eq!(decision.health, SourceHealthKind::Flapping);
3641        assert!(decision.fallback_active);
3642    }
3643
3644    #[test]
3645    fn source_sync_decision_keeps_local_fallback_after_unreachable_backoff_expires() {
3646        let now_ms = 1_700_000_000_000;
3647        let source = source_with_schedule(SyncSchedule::Hourly);
3648        let last_sync = now_ms - 10 * 60 * 1000;
3649        let status = status_with_info(SourceSyncInfo {
3650            last_sync: Some(last_sync),
3651            last_result: SyncResult::Failed("Host unreachable".into()),
3652            duration_ms: 900,
3653            consecutive_failures: 1,
3654            ..Default::default()
3655        });
3656
3657        let decision = status.decision_for_source_at(&source, now_ms, false);
3658
3659        assert_eq!(decision.action, SourceSyncAction::Skip);
3660        assert_eq!(decision.health, SourceHealthKind::Flapping);
3661        assert!(decision.fallback_active);
3662        assert_eq!(
3663            decision.backoff_until_ms,
3664            Some(last_sync + SOURCE_FAILURE_BACKOFF_BASE_MS)
3665        );
3666        assert!(
3667            decision
3668                .reasons
3669                .iter()
3670                .any(|reason| reason.contains("local fallback remains active"))
3671        );
3672    }
3673
3674    #[test]
3675    fn source_sync_decision_marks_slow_source_as_high_latency() {
3676        let now_ms = 1_700_000_000_000;
3677        let source = source_with_schedule(SyncSchedule::Hourly);
3678        let status = status_with_info(SourceSyncInfo {
3679            last_sync: Some(now_ms - 10 * 60 * 1000),
3680            last_result: SyncResult::Success,
3681            duration_ms: SOURCE_HIGH_LATENCY_MS + 1,
3682            ..Default::default()
3683        });
3684
3685        let decision = status.decision_for_source_at(&source, now_ms, false);
3686
3687        assert_eq!(decision.action, SourceSyncAction::Skip);
3688        assert_eq!(decision.health, SourceHealthKind::HighLatency);
3689        assert!(decision.fallback_active);
3690    }
3691
3692    #[test]
3693    fn source_sync_decision_manual_override_forces_sync() {
3694        let now_ms = 1_700_000_000_000;
3695        let source = source_with_schedule(SyncSchedule::Manual);
3696        let status = status_with_info(SourceSyncInfo {
3697            last_sync: Some(now_ms),
3698            last_result: SyncResult::Success,
3699            duration_ms: 100,
3700            ..Default::default()
3701        });
3702
3703        let decision = status.decision_for_source_at(&source, now_ms, true);
3704
3705        assert_eq!(decision.action, SourceSyncAction::Sync);
3706        assert!(decision.manual_override);
3707        assert!(
3708            decision
3709                .reasons
3710                .iter()
3711                .any(|reason| reason.contains("overrides automatic scheduling"))
3712        );
3713    }
3714
3715    #[test]
3716    fn test_unique_atomic_temp_path_changes_each_call() {
3717        let final_path = Path::new("/tmp/sync_status.json");
3718        let first = unique_atomic_temp_path(final_path);
3719        let second = unique_atomic_temp_path(final_path);
3720
3721        assert_ne!(first, second);
3722        assert_eq!(first.parent(), final_path.parent());
3723        assert_eq!(second.parent(), final_path.parent());
3724    }
3725
3726    #[test]
3727    fn test_replace_file_from_temp_overwrites_existing_file() {
3728        let temp = TempDir::new().expect("tempdir");
3729        let final_path = temp.path().join("sync_status.json");
3730        let first_tmp = temp.path().join("first.tmp");
3731        let second_tmp = temp.path().join("second.tmp");
3732
3733        std::fs::write(&first_tmp, "{\"first\":true}").expect("write first temp");
3734        replace_file_from_temp(&first_tmp, &final_path).expect("initial replace");
3735        assert_eq!(
3736            std::fs::read_to_string(&final_path).expect("read first final"),
3737            "{\"first\":true}"
3738        );
3739
3740        std::fs::write(&second_tmp, "{\"second\":true}").expect("write second temp");
3741        replace_file_from_temp(&second_tmp, &final_path).expect("overwrite replace");
3742        assert_eq!(
3743            std::fs::read_to_string(&final_path).expect("read second final"),
3744            "{\"second\":true}"
3745        );
3746    }
3747
3748    #[test]
3749    fn test_sync_engine_with_timeouts() {
3750        let engine = SyncEngine::new(Path::new("/data"))
3751            .with_connection_timeout(30)
3752            .with_transfer_timeout(600);
3753
3754        assert_eq!(engine.connection_timeout, 30);
3755        assert_eq!(engine.transfer_timeout, 600);
3756    }
3757
3758    #[test]
3759    fn test_sync_error_display() {
3760        assert_eq!(
3761            SyncError::NoHost.to_string(),
3762            "Source has no host configured"
3763        );
3764        assert_eq!(
3765            SyncError::NoPaths.to_string(),
3766            "Source has no paths configured"
3767        );
3768        assert_eq!(
3769            SyncError::InvalidPath("paths[0] cannot be empty".to_string()).to_string(),
3770            "Invalid source path: paths[0] cannot be empty"
3771        );
3772        assert_eq!(
3773            SyncError::Timeout(30).to_string(),
3774            "Connection timed out after 30 seconds"
3775        );
3776        assert_eq!(SyncError::Cancelled.to_string(), "Sync cancelled");
3777    }
3778
3779    // =========================================================================
3780    // SFTP helper function tests
3781    // =========================================================================
3782
3783    #[test]
3784    fn test_parse_ssh_host_simple() {
3785        let (user, host) = parse_ssh_host("myserver");
3786        assert!(user.is_none());
3787        assert_eq!(host, "myserver");
3788    }
3789
3790    #[test]
3791    fn test_parse_ssh_host_with_user() {
3792        let (user, host) = parse_ssh_host("admin@myserver");
3793        assert_eq!(user, Some("admin"));
3794        assert_eq!(host, "myserver");
3795    }
3796
3797    #[test]
3798    fn test_parse_ssh_host_with_domain() {
3799        let (user, host) = parse_ssh_host("deploy@server.example.com");
3800        assert_eq!(user, Some("deploy"));
3801        assert_eq!(host, "server.example.com");
3802    }
3803
3804    #[test]
3805    fn test_parse_ssh_host_email_like() {
3806        // Edge case: user looks like email prefix
3807        let (user, host) = parse_ssh_host("user@host");
3808        assert_eq!(user, Some("user"));
3809        assert_eq!(host, "host");
3810    }
3811
3812    #[test]
3813    fn test_first_nonblank_username_priority_and_trimming() {
3814        assert_eq!(
3815            first_nonblank_username([Some("  alice  "), Some("bob")]),
3816            Some("alice".to_string())
3817        );
3818        assert_eq!(
3819            first_nonblank_username([Some("  "), None, Some("carol")]),
3820            Some("carol".to_string())
3821        );
3822        assert_eq!(first_nonblank_username([None, Some("\t")]), None);
3823    }
3824
3825    #[test]
3826    fn test_expand_tilde_local_with_tilde_prefix() {
3827        let expanded = expand_tilde_local("~/Documents/file.txt");
3828        // Should start with home directory, not tilde
3829        assert!(!expanded.starts_with('~'));
3830        assert!(expanded.ends_with("/Documents/file.txt"));
3831    }
3832
3833    #[test]
3834    fn test_expand_tilde_local_just_tilde() {
3835        let expanded = expand_tilde_local("~");
3836        // Should be just home directory
3837        assert!(!expanded.starts_with('~'));
3838        assert!(!expanded.is_empty());
3839    }
3840
3841    #[test]
3842    fn test_expand_tilde_local_no_tilde() {
3843        let path = "/absolute/path/to/file";
3844        let expanded = expand_tilde_local(path);
3845        assert_eq!(expanded, path);
3846    }
3847
3848    #[test]
3849    fn test_expand_tilde_local_tilde_in_middle() {
3850        // Tilde in middle should not be expanded
3851        let path = "/path/with/~tilde/inside";
3852        let expanded = expand_tilde_local(path);
3853        assert_eq!(expanded, path);
3854    }
3855}