Skip to main content

coding_agent_search/sources/
sync.rs

1//! Sync engine for pulling agent sessions from remote sources.
2//!
3//! This module provides the core sync functionality using rsync over SSH
4//! for efficient delta transfers, with progress reporting and error recovery.
5//!
6//! # Safety
7//!
8//! **IMPORTANT**: The sync engine uses rsync WITHOUT the `--delete` flag
9//! to ensure safe additive syncs. This prevents accidental data loss if
10//! a remote is misconfigured or temporarily empty.
11//!
12//! # Example
13//!
14//! ```rust,ignore
15//! use coding_agent_search::sources::sync::SyncEngine;
16//! use coding_agent_search::sources::config::SourcesConfig;
17//!
18//! let config = SourcesConfig::load()?;
19//! let engine = SyncEngine::new(&data_dir);
20//!
21//! for source in config.remote_sources() {
22//!     let report = engine.sync_source(source)?;
23//!     println!("Synced {}: {} files", source.name, report.total_files());
24//! }
25//! ```
26
27use std::path::{Path, PathBuf};
28use std::process::{Command, Stdio};
29use std::sync::OnceLock;
30use std::time::{Duration, Instant};
31
32use thiserror::Error;
33
34use super::{
35    config::{
36        SourceDefinition, SyncSchedule, discover_ssh_hosts, source_path_entry_error,
37        ssh_host_has_safe_token_chars, validate_optional_user_host_shape,
38    },
39    host_key_verification_error, is_host_key_verification_failure, strict_ssh_cli_tokens,
40    strict_ssh_command_for_rsync, wait_for_child_output_with_timeout,
41};
42use ssh2::{FileStat, Session, Sftp};
43use std::io::{Read as IoRead, Write as IoWrite};
44use std::net::{Shutdown, TcpStream};
45
46/// Which variant of rsync's "pass args protected to the remote" flag the
47/// system `rsync` accepts. The flag was introduced in rsync 3.0.0 as
48/// `--protect-args`; rsync 3.4.0 renamed the primary form to
49/// `--secluded-args` (`-s`) and current Homebrew `rsync 3.4.1` prints only
50/// the new name in `--help`, so a simple substring probe for `--protect-args`
51/// mis-classifies it as unsupported and falls through to the quoted-path
52/// rsync branch — which breaks (#191). openrsync (macOS 15+) supports
53/// neither.
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55enum RsyncArgProtection {
56    /// Neither flag supported — callers must manually quote remote paths for
57    /// the remote login shell.
58    None,
59    /// rsync 3.0.0..3.4.0 — original flag name.
60    ProtectArgs,
61    /// rsync 3.4.0+ (incl. Homebrew 3.4.1) — renamed primary form.
62    SecludedArgs,
63}
64
65impl RsyncArgProtection {
66    fn is_supported(self) -> bool {
67        !matches!(self, Self::None)
68    }
69
70    /// CLI flag to pass to rsync, or `None` if no protection variant is
71    /// available.
72    fn flag(self) -> Option<&'static str> {
73        match self {
74            Self::ProtectArgs => Some("--protect-args"),
75            Self::SecludedArgs => Some("--secluded-args"),
76            Self::None => None,
77        }
78    }
79}
80
81fn detect_rsync_arg_protection() -> RsyncArgProtection {
82    static CACHED: OnceLock<RsyncArgProtection> = OnceLock::new();
83    *CACHED.get_or_init(|| {
84        let Some(out) = Command::new("rsync").arg("--help").output().ok() else {
85            return RsyncArgProtection::None;
86        };
87        // rsync prints to stdout on GNU/Linux and Homebrew macOS, but some
88        // forks / older builds print help on stderr — check both so we never
89        // misclassify a supported rsync as unsupported.
90        let mut combined = String::from_utf8_lossy(&out.stdout).into_owned();
91        combined.push_str(&String::from_utf8_lossy(&out.stderr));
92        // Prefer the newer name when both are listed (forward-compat with a
93        // hypothetical rsync that keeps both as aliases): `--secluded-args`
94        // is what current rsync actually prints in help output, and using
95        // the printed name is the one guaranteed to be accepted.
96        if combined.contains("--secluded-args") {
97            RsyncArgProtection::SecludedArgs
98        } else if combined.contains("--protect-args") {
99            RsyncArgProtection::ProtectArgs
100        } else {
101            RsyncArgProtection::None
102        }
103    })
104}
105
106fn quote_remote_shell_path(path: &str) -> String {
107    // POSIX shell single-quote escape:
108    // 1. Wrap the whole thing in single quotes.
109    // 2. Escape existing single quotes by closing the current quote,
110    //    inserting a backslash-escaped quote, and opening a new one.
111    // Result: 'foo'\''bar'
112    format!("'{}'", path.replace('\'', r#"'\''"#))
113}
114
115fn remote_spec_for_shell_bound_copy(host: &str, remote_path: &str) -> String {
116    // host itself might contain user@ or be an alias, but we should not quote it
117    // if it's already a single token. However, if it contains spaces or other
118    // weirdness it's already broken for SSH. We focus on the path part.
119    format!("{host}:{}", quote_remote_shell_path(remote_path))
120}
121
122fn remote_spec_for_rsync(host: &str, remote_path: &str, protect_args_supported: bool) -> String {
123    if protect_args_supported {
124        // With --protect-args, rsync handles its own escaping over the wire
125        format!("{host}:{remote_path}")
126    } else {
127        // Without it (e.g. openrsync), we must manually quote for the remote shell
128        remote_spec_for_shell_bound_copy(host, remote_path)
129    }
130}
131
132fn remote_spec_for_scp(host: &str, remote_path: &str) -> String {
133    // scp still executes a remote shell command for the source operand, so the
134    // path side must be quoted even though we pass it as one local argv token.
135    remote_spec_for_shell_bound_copy(host, remote_path)
136}
137
138fn remote_find_regular_files_command(remote_path: &str) -> String {
139    format!(
140        "find -P {} -type f -print0",
141        quote_remote_shell_path(remote_path)
142    )
143}
144
145fn parse_remote_home_stdout(stdout: &[u8]) -> Option<String> {
146    let output = String::from_utf8_lossy(stdout);
147    for line in output.lines() {
148        if let Some(home) = line.trim().strip_prefix("CASS_HOME_MARKER:")
149            && home.starts_with('/')
150            && !home.contains('\0')
151        {
152            return Some(home.to_string());
153        }
154    }
155    None
156}
157
158fn parse_null_terminated_utf8_paths(bytes: &[u8]) -> Vec<String> {
159    bytes
160        .split(|byte| *byte == 0)
161        .filter(|part| !part.is_empty())
162        .filter_map(|part| std::str::from_utf8(part).ok())
163        .map(ToOwned::to_owned)
164        .collect()
165}
166
167fn validate_remote_sync_path_entry(index: usize, path: &str) -> Result<(), SyncError> {
168    match source_path_entry_error(index, path) {
169        Some(message) => Err(SyncError::InvalidPath(message)),
170        None => Ok(()),
171    }
172}
173
174fn invalid_remote_sync_path_result(remote_path: &str, err: SyncError) -> PathSyncResult {
175    PathSyncResult {
176        remote_path: remote_path.to_string(),
177        success: false,
178        error: Some(err.to_string()),
179        ..Default::default()
180    }
181}
182
183fn remote_file_to_safe_local_path(
184    remote_root: &Path,
185    remote_file: &Path,
186    local_container: &Path,
187    leaf_name: &str,
188) -> Option<PathBuf> {
189    let mut local_path = local_container.join(leaf_name);
190    if remote_file == remote_root {
191        return Some(local_path);
192    }
193
194    let relative = remote_file.strip_prefix(remote_root).ok()?;
195    for component in relative.components() {
196        match component {
197            std::path::Component::Normal(name) => local_path.push(name),
198            std::path::Component::CurDir => {}
199            _ => return None,
200        }
201    }
202
203    Some(local_path)
204}
205
206fn existing_local_symlink_below_root(root: &Path, path: &Path) -> Result<Option<PathBuf>, String> {
207    let rel = path.strip_prefix(root).map_err(|_| {
208        format!(
209            "Local path {} is outside sync root {}",
210            path.display(),
211            root.display()
212        )
213    })?;
214
215    let mut current = root.to_path_buf();
216    if let Some(link) = existing_path_symlink(&current)? {
217        return Ok(Some(link));
218    }
219
220    for component in rel.components() {
221        match component {
222            std::path::Component::Normal(name) => current.push(name),
223            std::path::Component::CurDir => continue,
224            _ => {
225                return Err(format!(
226                    "Local path {} contains unsafe component below sync root {}",
227                    path.display(),
228                    root.display()
229                ));
230            }
231        }
232
233        if let Some(link) = existing_path_symlink(&current)? {
234            return Ok(Some(link));
235        }
236    }
237
238    Ok(None)
239}
240
241fn existing_path_symlink(path: &Path) -> Result<Option<PathBuf>, String> {
242    match std::fs::symlink_metadata(path) {
243        Ok(metadata) if metadata.file_type().is_symlink() => Ok(Some(path.to_path_buf())),
244        Ok(_) => Ok(None),
245        Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
246        Err(e) => Err(format!("Failed to inspect {}: {}", path.display(), e)),
247    }
248}
249
250fn reject_local_symlink_below_root(root: &Path, path: &Path) -> Result<(), String> {
251    if let Some(link) = existing_local_symlink_below_root(root, path)? {
252        return Err(format!(
253            "Refusing to write {} through local symlink {}",
254            path.display(),
255            link.display()
256        ));
257    }
258
259    Ok(())
260}
261
262fn prepare_local_sync_container(sync_root: &Path, local_path: &Path) -> Result<(), String> {
263    reject_local_symlink_below_root(sync_root, local_path)?;
264    std::fs::create_dir_all(local_path)
265        .map_err(|e| format!("Failed to create directory: {}", e))?;
266    reject_local_symlink_below_root(sync_root, local_path)?;
267    Ok(())
268}
269
270fn prepare_local_sync_root(local_store: &Path, mirror_dir: &Path) -> Result<(), String> {
271    reject_local_symlink_below_root(local_store, mirror_dir)?;
272    std::fs::create_dir_all(mirror_dir)
273        .map_err(|e| format!("Failed to create directory: {}", e))?;
274    reject_local_symlink_below_root(local_store, mirror_dir)?;
275    Ok(())
276}
277
278fn sftp_file_stat_is_symlink(stat: &FileStat) -> bool {
279    stat.file_type().is_symlink()
280}
281
282/// Errors that can occur during sync operations.
283#[derive(Error, Debug)]
284pub enum SyncError {
285    #[error("Source has no host configured")]
286    NoHost,
287
288    #[error("Source has no paths configured")]
289    NoPaths,
290
291    #[error("Invalid source path: {0}")]
292    InvalidPath(String),
293
294    #[error("Invalid source definition: {0}")]
295    InvalidSource(String),
296
297    #[error("rsync command failed: {0}")]
298    RsyncFailed(String),
299
300    #[error("Failed to create local directory: {0}")]
301    CreateDirFailed(#[from] std::io::Error),
302
303    #[error("SSH connection failed: {0}")]
304    SshFailed(String),
305
306    #[error("Connection timed out after {0} seconds")]
307    Timeout(u64),
308
309    #[error("Sync cancelled")]
310    Cancelled,
311}
312
313/// Method used for syncing files from remote.
314#[derive(Debug, Clone, Copy, PartialEq, Eq)]
315pub enum SyncMethod {
316    /// rsync over SSH - preferred for delta transfers
317    Rsync,
318    /// rsync invoked via WSL (`wsl rsync`) - used on Windows when native rsync is unavailable
319    /// but WSL is installed with rsync available inside it.
320    WslRsync,
321    /// SCP-based transfer using the system `scp` command.
322    ///
323    /// Used on Windows (and other platforms) when rsync is unavailable. Delegates all
324    /// authentication to the system `ssh`/`scp` binary so it inherits OpenSSH agent,
325    /// `~/.ssh/` keys, and `~/.ssh/config` correctly – avoiding the `ssh2` library
326    /// which does not integrate with the Windows OpenSSH agent.
327    Scp,
328    /// SFTP fallback using the `ssh2` crate – last resort only.
329    ///
330    /// Deprecated in favour of [`SyncMethod::Scp`] which uses the native system SSH
331    /// binary. Kept for backward compatibility with callers that pattern-match on this
332    /// variant.
333    Sftp,
334}
335
336impl SyncMethod {
337    pub fn as_str(self) -> &'static str {
338        match self {
339            Self::Rsync => "rsync",
340            Self::WslRsync => "wsl-rsync",
341            Self::Scp => "scp",
342            Self::Sftp => "sftp",
343        }
344    }
345}
346
347impl std::fmt::Display for SyncMethod {
348    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
349        f.write_str(self.as_str())
350    }
351}
352
353/// Result of syncing a single path.
354#[derive(Debug, Clone, Default)]
355pub struct PathSyncResult {
356    /// Remote path that was synced.
357    pub remote_path: String,
358    /// Local destination path.
359    pub local_path: PathBuf,
360    /// Number of files transferred.
361    pub files_transferred: u64,
362    /// Total bytes transferred.
363    pub bytes_transferred: u64,
364    /// Whether the sync succeeded.
365    pub success: bool,
366    /// Error message if sync failed.
367    pub error: Option<String>,
368    /// Duration of the sync operation.
369    pub duration_ms: u64,
370}
371
372/// Report from syncing an entire source.
373#[derive(Debug, Clone)]
374pub struct SyncReport {
375    /// Name of the source that was synced.
376    pub source_name: String,
377    /// Method used for syncing.
378    pub method: SyncMethod,
379    /// Results for each path.
380    pub path_results: Vec<PathSyncResult>,
381    /// Total duration of the sync.
382    pub total_duration_ms: u64,
383    /// Whether all paths synced successfully.
384    pub all_succeeded: bool,
385}
386
387impl SyncReport {
388    /// Create a new report for a source.
389    pub fn new(source_name: impl Into<String>, method: SyncMethod) -> Self {
390        Self {
391            source_name: source_name.into(),
392            method,
393            path_results: Vec::new(),
394            total_duration_ms: 0,
395            all_succeeded: true,
396        }
397    }
398
399    /// Create a failed report when sync couldn't even start.
400    pub fn failed(source_name: impl Into<String>, error: SyncError) -> Self {
401        Self {
402            source_name: source_name.into(),
403            method: SyncMethod::Rsync,
404            path_results: vec![PathSyncResult {
405                error: Some(error.to_string()),
406                success: false,
407                ..Default::default()
408            }],
409            total_duration_ms: 0,
410            all_succeeded: false,
411        }
412    }
413
414    /// Add a path result to the report.
415    pub fn add_path_result(&mut self, result: PathSyncResult) {
416        if !result.success {
417            self.all_succeeded = false;
418        }
419        self.path_results.push(result);
420    }
421
422    /// Get total files transferred across all paths.
423    pub fn total_files(&self) -> u64 {
424        self.path_results.iter().map(|r| r.files_transferred).sum()
425    }
426
427    /// Get total bytes transferred across all paths.
428    pub fn total_bytes(&self) -> u64 {
429        self.path_results.iter().map(|r| r.bytes_transferred).sum()
430    }
431
432    /// Get count of successful path syncs.
433    pub fn successful_paths(&self) -> usize {
434        self.path_results.iter().filter(|r| r.success).count()
435    }
436
437    /// Get count of failed path syncs.
438    pub fn failed_paths(&self) -> usize {
439        self.path_results.iter().filter(|r| !r.success).count()
440    }
441
442    /// Summarize the overall sync outcome.
443    pub fn sync_result(&self) -> SyncResult {
444        if self.all_succeeded {
445            SyncResult::Success
446        } else {
447            let errors: Vec<String> = self
448                .path_results
449                .iter()
450                .filter_map(|r| r.error.clone())
451                .collect();
452            if self.successful_paths() > 0 {
453                SyncResult::PartialFailure(errors.join("; "))
454            } else {
455                SyncResult::Failed(errors.join("; "))
456            }
457        }
458    }
459}
460
461/// Statistics parsed from rsync output.
462#[derive(Debug, Default)]
463struct RsyncStats {
464    files_transferred: u64,
465    bytes_transferred: u64,
466}
467
468/// Sync engine for pulling sessions from remote sources.
469pub struct SyncEngine {
470    /// Base directory for storing synced data.
471    /// Structure: `{local_store}/remotes/{source_name}/mirror/`
472    local_store: PathBuf,
473    /// Connection timeout in seconds.
474    connection_timeout: u64,
475    /// Transfer timeout in seconds (0 = no timeout).
476    transfer_timeout: u64,
477}
478
479impl SyncEngine {
480    /// Create a new sync engine.
481    ///
482    /// # Arguments
483    /// * `data_dir` - The cass data directory (e.g., ~/.local/share/coding-agent-search)
484    pub fn new(data_dir: &Path) -> Self {
485        Self {
486            local_store: data_dir.to_path_buf(),
487            connection_timeout: 10,
488            transfer_timeout: 300, // 5 minutes
489        }
490    }
491
492    /// Set the connection timeout.
493    pub fn with_connection_timeout(mut self, seconds: u64) -> Self {
494        self.connection_timeout = seconds;
495        self
496    }
497
498    /// Set the transfer timeout.
499    pub fn with_transfer_timeout(mut self, seconds: u64) -> Self {
500        self.transfer_timeout = seconds;
501        self
502    }
503
504    /// Get the local mirror directory for a source.
505    pub fn mirror_dir(&self, source_name: &str) -> PathBuf {
506        self.local_store
507            .join("remotes")
508            .join(source_name)
509            .join("mirror")
510    }
511
512    /// Get the remote home directory by SSH-ing to the host and printing `$HOME`.
513    ///
514    /// This is called once per source sync to avoid repeated SSH calls for each path.
515    fn get_remote_home(&self, host: &str) -> Result<String, SyncError> {
516        // Validate host doesn't contain shell metacharacters to prevent injection
517        if host.trim().is_empty()
518            || host.starts_with('-')
519            || !ssh_host_has_safe_token_chars(host)
520            || validate_optional_user_host_shape(host).is_err()
521        {
522            return Err(SyncError::SshFailed(format!(
523                "Invalid characters in host: {}",
524                host
525            )));
526        }
527
528        let timeout_secs = self.connection_timeout.max(1);
529        let mut cmd = Command::new("ssh");
530        cmd.args(strict_ssh_cli_tokens(timeout_secs))
531            .arg("--")
532            .arg(host)
533            .arg("printf 'CASS_HOME_MARKER:%s\\n' \"$HOME\"")
534            .stdout(Stdio::piped())
535            .stderr(Stdio::piped());
536
537        let child = cmd
538            .spawn()
539            .map_err(|e| SyncError::SshFailed(format!("Failed to execute ssh: {}", e)))?;
540        let output = wait_for_child_output_with_timeout(child, Duration::from_secs(timeout_secs))
541            .map_err(|e| SyncError::SshFailed(format!("SSH command failed: {}", e)))?
542            .ok_or(SyncError::Timeout(timeout_secs))?;
543
544        if !output.status.success() {
545            let stderr = String::from_utf8_lossy(&output.stderr);
546            if is_host_key_verification_failure(&stderr) {
547                return Err(SyncError::SshFailed(host_key_verification_error(host)));
548            }
549            return Err(SyncError::SshFailed(format!(
550                "Failed to get remote home directory: {}",
551                stderr.trim()
552            )));
553        }
554
555        let remote_home = parse_remote_home_stdout(&output.stdout).ok_or_else(|| {
556            SyncError::SshFailed(
557                "Unable to parse remote home directory from SSH output".to_string(),
558            )
559        })?;
560
561        tracing::debug!(host = %host, remote_home = %remote_home, "got remote home directory");
562        Ok(remote_home)
563    }
564
565    /// Expand ~ in a remote path using the provided home directory.
566    ///
567    /// If `remote_home` is None, returns the path unchanged.
568    fn expand_tilde_with_home(path: &str, remote_home: Option<&str>) -> String {
569        if !path.starts_with('~') {
570            return path.to_string();
571        }
572
573        let Some(home) = remote_home else {
574            return path.to_string();
575        };
576
577        if path == "~" {
578            home.to_string()
579        } else if let Some(rest) = path.strip_prefix("~/") {
580            format!("{}/{}", home, rest)
581        } else {
582            // ~user/path case - not supported, return as-is
583            path.to_string()
584        }
585    }
586
587    /// Detect the available sync method.
588    ///
589    /// Detection order:
590    /// 1. Native `rsync` → [`SyncMethod::Rsync`]
591    /// 2. `wsl rsync` (Windows only) → [`SyncMethod::WslRsync`]
592    /// 3. System `scp` available → [`SyncMethod::Scp`]
593    /// 4. Last resort → [`SyncMethod::Sftp`] (ssh2-based, no native-agent integration)
594    ///
595    /// On Windows the `ssh2` SFTP path is intentionally avoided whenever possible
596    /// because it bypasses the Windows OpenSSH agent and `~/.ssh/config`, leading to
597    /// "No valid authentication method found" errors even when SSH keys are properly
598    /// configured. Using the system `scp` binary instead lets OpenSSH handle auth the
599    /// same way `ssh` and `cass sources doctor` do.
600    pub fn detect_sync_method() -> SyncMethod {
601        // 1. Native rsync
602        if Command::new("rsync")
603            .arg("--version")
604            .output()
605            .map(|o| o.status.success())
606            .unwrap_or(false)
607        {
608            return SyncMethod::Rsync;
609        }
610
611        // 2. WSL rsync (Windows-only: rsync inside WSL invoked via `wsl rsync`)
612        #[cfg(target_os = "windows")]
613        if Command::new("wsl")
614            .args(["rsync", "--version"])
615            .output()
616            .map(|o| o.status.success())
617            .unwrap_or(false)
618        {
619            return SyncMethod::WslRsync;
620        }
621
622        // 3. System scp – preferred over ssh2/SFTP because it inherits the native
623        //    OpenSSH agent and ~/.ssh/config on all platforms (especially Windows).
624        if Command::new("scp")
625            .arg("-S")
626            .arg("ssh")
627            .arg("--")
628            // pass a harmless flag; scp prints usage and exits non-zero, but if the
629            // binary exists the spawn itself succeeds which is all we need to check.
630            .output()
631            .is_ok()
632        {
633            // Confirm scp is a real binary by checking for the executable
634            if which_scp_exists() {
635                return SyncMethod::Scp;
636            }
637        }
638
639        // 4. Last resort: ssh2-based SFTP
640        SyncMethod::Sftp
641    }
642
643    /// Sync a single source.
644    ///
645    /// Syncs all configured paths from the source to the local mirror directory.
646    /// Individual path failures don't abort the entire sync.
647    pub fn sync_source(&self, source: &SourceDefinition) -> Result<SyncReport, SyncError> {
648        if !source.is_remote() {
649            return Err(SyncError::NoHost);
650        }
651
652        let host = source.host.as_ref().ok_or(SyncError::NoHost)?;
653
654        if source.paths.is_empty() {
655            return Err(SyncError::NoPaths);
656        }
657
658        source
659            .validate_structure()
660            .map_err(|e| SyncError::InvalidSource(e.to_string()))?;
661
662        let method = Self::detect_sync_method();
663        let mut report = SyncReport::new(&source.name, method);
664        let overall_start = Instant::now();
665
666        // Create the mirror directory
667        let mirror_dir = self.mirror_dir(&source.name);
668        prepare_local_sync_root(&self.local_store, &mirror_dir)
669            .map_err(|e| SyncError::CreateDirFailed(std::io::Error::other(e)))?;
670
671        // Pre-fetch remote home directory if any paths use tilde (avoids multiple SSH calls)
672        let remote_home = if source.paths.iter().enumerate().any(|(index, path)| {
673            path.starts_with('~') && validate_remote_sync_path_entry(index, path).is_ok()
674        }) {
675            match self.get_remote_home(host) {
676                Ok(home) => Some(home),
677                Err(e) => {
678                    tracing::warn!(host = %host, error = %e, "Failed to get remote home directory");
679                    None
680                }
681            }
682        } else {
683            None
684        };
685
686        for (index, remote_path) in source.paths.iter().enumerate() {
687            if let Err(err) = validate_remote_sync_path_entry(index, remote_path) {
688                report.add_path_result(invalid_remote_sync_path_result(remote_path, err));
689                continue;
690            }
691
692            let result = match method {
693                SyncMethod::Rsync => {
694                    self.sync_path_rsync(host, remote_path, &mirror_dir, remote_home.as_deref())
695                }
696                SyncMethod::WslRsync => {
697                    self.sync_path_wsl_rsync(host, remote_path, &mirror_dir, remote_home.as_deref())
698                }
699                SyncMethod::Scp => {
700                    self.sync_path_scp(host, remote_path, &mirror_dir, remote_home.as_deref())
701                }
702                SyncMethod::Sftp => {
703                    self.sync_path_sftp(host, remote_path, &mirror_dir, remote_home.as_deref())
704                }
705            };
706            report.add_path_result(result);
707        }
708
709        report.total_duration_ms = overall_start.elapsed().as_millis() as u64;
710        Ok(report)
711    }
712
713    /// Sync all remote sources from a config.
714    ///
715    /// Continues even if individual sources fail.
716    pub fn sync_all(
717        &self,
718        sources: impl Iterator<Item = impl std::borrow::Borrow<SourceDefinition>>,
719    ) -> Vec<SyncReport> {
720        sources
721            .map(|source| {
722                let source = source.borrow();
723                self.sync_source(source)
724                    .unwrap_or_else(|e| SyncReport::failed(&source.name, e))
725            })
726            .collect()
727    }
728
729    /// Sync a single path using rsync.
730    ///
731    /// **IMPORTANT**: Uses rsync WITHOUT --delete for safe additive syncs.
732    ///
733    /// The `remote_home` parameter should be pre-fetched via `get_remote_home()` to avoid
734    /// repeated SSH calls for each path.
735    fn sync_path_rsync(
736        &self,
737        host: &str,
738        remote_path: &str,
739        dest_dir: &Path,
740        remote_home: Option<&str>,
741    ) -> PathSyncResult {
742        let start = Instant::now();
743        if remote_path.starts_with('~') && remote_home.is_none() {
744            let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
745            return PathSyncResult {
746                remote_path: remote_path.to_string(),
747                local_path,
748                success: false,
749                error: Some(
750                    "Cannot expand '~' in remote path; failed to determine remote home directory"
751                        .to_string(),
752                ),
753                duration_ms: start.elapsed().as_millis() as u64,
754                ..Default::default()
755            };
756        }
757
758        // Expand ~ using pre-fetched home directory (no SSH call here)
759        let expanded_path = Self::expand_tilde_with_home(remote_path, remote_home);
760
761        // If tilde expansion failed (no remote_home provided), log a warning
762        if remote_path.starts_with('~') && expanded_path == remote_path {
763            tracing::warn!(
764                remote_path = %remote_path,
765                "Could not expand tilde in path (remote home directory not available)"
766            );
767        }
768
769        // Convert remote path to safe local directory name
770        // Use raw remote_path for stability (independent of home expansion success)
771        let safe_name = path_to_safe_dirname(remote_path);
772        let local_path = dest_dir.join(&safe_name);
773
774        // Create local directory without following any pre-existing mirror symlink.
775        if let Err(e) = prepare_local_sync_container(dest_dir, &local_path) {
776            return PathSyncResult {
777                remote_path: remote_path.to_string(),
778                local_path: local_path.clone(),
779                success: false,
780                error: Some(e),
781                duration_ms: start.elapsed().as_millis() as u64,
782                ..Default::default()
783            };
784        }
785
786        // Build rsync command
787        // NOTE: NO --delete flag! Safe additive sync only.
788        let arg_protection = detect_rsync_arg_protection();
789        let protect_args_supported = arg_protection.is_supported();
790        let remote_spec = remote_spec_for_rsync(host, &expanded_path, protect_args_supported);
791        let ssh_opts = strict_ssh_command_for_rsync(self.connection_timeout);
792
793        let local_path_str = match local_path.to_str() {
794            Some(s) => s,
795            None => {
796                return PathSyncResult {
797                    remote_path: remote_path.to_string(),
798                    local_path,
799                    success: false,
800                    error: Some("Local path contains invalid UTF-8".to_string()),
801                    duration_ms: start.elapsed().as_millis() as u64,
802                    ..Default::default()
803                };
804            }
805        };
806
807        let timeout_str = self.transfer_timeout.to_string();
808        let mut cmd = Command::new("rsync");
809        cmd.args(["-avz", "--links", "--safe-links", "--stats", "--partial"]);
810        if let Some(flag) = arg_protection.flag() {
811            cmd.arg(flag);
812        }
813        cmd.args([
814            "--timeout",
815            &timeout_str,
816            "-e",
817            &ssh_opts,
818            "--",
819            &remote_spec,
820            local_path_str,
821        ]);
822
823        tracing::debug!(
824            host = %host,
825            remote_path = %expanded_path,
826            local_path = %local_path.display(),
827            "starting rsync"
828        );
829
830        let output = match cmd.output() {
831            Ok(o) => o,
832            Err(e) => {
833                return PathSyncResult {
834                    remote_path: remote_path.to_string(),
835                    local_path,
836                    success: false,
837                    error: Some(format!("Failed to execute rsync: {}", e)),
838                    duration_ms: start.elapsed().as_millis() as u64,
839                    ..Default::default()
840                };
841            }
842        };
843
844        let duration_ms = start.elapsed().as_millis() as u64;
845        let stdout = String::from_utf8_lossy(&output.stdout);
846        let stderr = String::from_utf8_lossy(&output.stderr);
847
848        if !output.status.success() {
849            // Check for specific error types
850            let error_msg = if stderr.contains("Connection refused")
851                || stderr.contains("Connection timed out")
852            {
853                format!("SSH connection failed: {}", stderr.trim())
854            } else if is_host_key_verification_failure(&stderr) {
855                host_key_verification_error(host)
856            } else if stderr.contains("No such file or directory") {
857                format!("Remote path not found: {}", expanded_path)
858            } else if stderr.contains("Permission denied") {
859                format!("Permission denied: {}", stderr.trim())
860            } else {
861                format!("rsync failed: {}", stderr.trim())
862            };
863
864            tracing::warn!(
865                host = %host,
866                remote_path = %expanded_path,
867                error = %error_msg,
868                "rsync failed"
869            );
870
871            return PathSyncResult {
872                remote_path: remote_path.to_string(),
873                local_path,
874                success: false,
875                error: Some(error_msg),
876                duration_ms,
877                ..Default::default()
878            };
879        }
880
881        // Parse stats from rsync output
882        let stats = parse_rsync_stats(&stdout);
883
884        tracing::info!(
885            host = %host,
886            remote_path = %expanded_path,
887            files = stats.files_transferred,
888            bytes = stats.bytes_transferred,
889            duration_ms,
890            "rsync completed"
891        );
892
893        PathSyncResult {
894            remote_path: remote_path.to_string(),
895            local_path,
896            files_transferred: stats.files_transferred,
897            bytes_transferred: stats.bytes_transferred,
898            success: true,
899            error: None,
900            duration_ms,
901        }
902    }
903
904    /// Sync a single path using rsync invoked through WSL (`wsl rsync …`).
905    ///
906    /// Used on Windows when native rsync is absent but WSL with rsync is available.
907    /// WSL paths use the `\\wsl$\…` UNC convention for the local destination.
908    fn sync_path_wsl_rsync(
909        &self,
910        host: &str,
911        remote_path: &str,
912        dest_dir: &Path,
913        remote_home: Option<&str>,
914    ) -> PathSyncResult {
915        let start = Instant::now();
916
917        if remote_path.starts_with('~') && remote_home.is_none() {
918            let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
919            return PathSyncResult {
920                remote_path: remote_path.to_string(),
921                local_path,
922                success: false,
923                error: Some(
924                    "Cannot expand '~' in remote path; failed to determine remote home directory"
925                        .to_string(),
926                ),
927                duration_ms: start.elapsed().as_millis() as u64,
928                ..Default::default()
929            };
930        }
931
932        let expanded_path = Self::expand_tilde_with_home(remote_path, remote_home);
933        let safe_name = path_to_safe_dirname(remote_path);
934        let local_path = dest_dir.join(&safe_name);
935
936        if let Err(e) = prepare_local_sync_container(dest_dir, &local_path) {
937            return PathSyncResult {
938                remote_path: remote_path.to_string(),
939                local_path,
940                success: false,
941                error: Some(e),
942                duration_ms: start.elapsed().as_millis() as u64,
943                ..Default::default()
944            };
945        }
946
947        let local_path_str = match local_path.to_str() {
948            Some(s) => s,
949            None => {
950                return PathSyncResult {
951                    remote_path: remote_path.to_string(),
952                    local_path,
953                    success: false,
954                    error: Some("Local path contains invalid UTF-8".to_string()),
955                    duration_ms: start.elapsed().as_millis() as u64,
956                    ..Default::default()
957                };
958            }
959        };
960
961        // Convert Windows path to a WSL-accessible path.
962        // WSL can access Windows paths via /mnt/<drive>/... conventions.
963        // E.g. C:\Users\george\AppData\... → /mnt/c/Users/george/AppData/...
964        let wsl_dest = windows_path_to_wsl(local_path_str);
965
966        let remote_spec = remote_spec_for_rsync(host, &expanded_path, true);
967        let ssh_opts = strict_ssh_command_for_rsync(self.connection_timeout);
968        let timeout_str = self.transfer_timeout.to_string();
969
970        let mut cmd = Command::new("wsl");
971        cmd.args([
972            "rsync",
973            "-avz",
974            "--links",
975            "--safe-links",
976            "--stats",
977            "--partial",
978        ]);
979        // WSL rsync is the real rsync (not openrsync), so --protect-args is safe.
980        cmd.arg("--protect-args");
981        cmd.args([
982            "--timeout",
983            &timeout_str,
984            "-e",
985            &ssh_opts,
986            "--",
987            &remote_spec,
988            &wsl_dest,
989        ]);
990
991        tracing::debug!(
992            host = %host,
993            remote_path = %expanded_path,
994            local_path = %local_path.display(),
995            wsl_dest = %wsl_dest,
996            "starting wsl rsync"
997        );
998
999        let output = match cmd.output() {
1000            Ok(o) => o,
1001            Err(e) => {
1002                return PathSyncResult {
1003                    remote_path: remote_path.to_string(),
1004                    local_path,
1005                    success: false,
1006                    error: Some(format!("Failed to execute wsl rsync: {}", e)),
1007                    duration_ms: start.elapsed().as_millis() as u64,
1008                    ..Default::default()
1009                };
1010            }
1011        };
1012
1013        let duration_ms = start.elapsed().as_millis() as u64;
1014        let stdout = String::from_utf8_lossy(&output.stdout);
1015        let stderr = String::from_utf8_lossy(&output.stderr);
1016
1017        if !output.status.success() {
1018            let error_msg = if stderr.contains("Connection refused")
1019                || stderr.contains("Connection timed out")
1020            {
1021                format!("SSH connection failed: {}", stderr.trim())
1022            } else if is_host_key_verification_failure(&stderr) {
1023                host_key_verification_error(host)
1024            } else if stderr.contains("No such file or directory") {
1025                format!("Remote path not found: {}", expanded_path)
1026            } else if stderr.contains("Permission denied") {
1027                format!("Permission denied: {}", stderr.trim())
1028            } else {
1029                format!("wsl rsync failed: {}", stderr.trim())
1030            };
1031
1032            tracing::warn!(
1033                host = %host,
1034                remote_path = %expanded_path,
1035                error = %error_msg,
1036                "wsl rsync failed"
1037            );
1038
1039            return PathSyncResult {
1040                remote_path: remote_path.to_string(),
1041                local_path,
1042                success: false,
1043                error: Some(error_msg),
1044                duration_ms,
1045                ..Default::default()
1046            };
1047        }
1048
1049        let stats = parse_rsync_stats(&stdout);
1050
1051        tracing::info!(
1052            host = %host,
1053            remote_path = %expanded_path,
1054            files = stats.files_transferred,
1055            bytes = stats.bytes_transferred,
1056            duration_ms,
1057            "wsl rsync completed"
1058        );
1059
1060        PathSyncResult {
1061            remote_path: remote_path.to_string(),
1062            local_path,
1063            files_transferred: stats.files_transferred,
1064            bytes_transferred: stats.bytes_transferred,
1065            success: true,
1066            error: None,
1067            duration_ms,
1068        }
1069    }
1070
1071    /// Sync a single path using SCP after a physical `find -P` regular-file listing.
1072    ///
1073    /// This method delegates all authentication to the native system `scp`/`ssh`
1074    /// binary, which correctly reads `~/.ssh/config`, the OpenSSH agent (including
1075    /// the Windows OpenSSH agent on Windows), and all standard key locations.
1076    ///
1077    /// This avoids the "No valid authentication method found" failure that occurs
1078    /// in the `ssh2`-based SFTP path on Windows, where the library does not
1079    /// integrate with the Windows OpenSSH agent (`ssh-agent.exe`).
1080    fn sync_path_scp(
1081        &self,
1082        host: &str,
1083        remote_path: &str,
1084        dest_dir: &Path,
1085        remote_home: Option<&str>,
1086    ) -> PathSyncResult {
1087        let start = Instant::now();
1088
1089        if remote_path.starts_with('~') && remote_home.is_none() {
1090            let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
1091            return PathSyncResult {
1092                remote_path: remote_path.to_string(),
1093                local_path,
1094                success: false,
1095                error: Some(
1096                    "Cannot expand '~' in remote path; failed to determine remote home directory"
1097                        .to_string(),
1098                ),
1099                duration_ms: start.elapsed().as_millis() as u64,
1100                ..Default::default()
1101            };
1102        }
1103
1104        let expanded_path = Self::expand_tilde_with_home(remote_path, remote_home);
1105        let safe_name = path_to_safe_dirname(remote_path);
1106        let local_path = dest_dir.join(&safe_name);
1107
1108        if let Err(e) = prepare_local_sync_container(dest_dir, &local_path) {
1109            return PathSyncResult {
1110                remote_path: remote_path.to_string(),
1111                local_path,
1112                success: false,
1113                error: Some(e),
1114                duration_ms: start.elapsed().as_millis() as u64,
1115                ..Default::default()
1116            };
1117        }
1118
1119        // `scp -r` follows symlinks on some OpenSSH paths. Enumerate only regular
1120        // files with physical traversal first, then copy those files individually.
1121        let connect_timeout = self.connection_timeout.to_string();
1122        let find_command = remote_find_regular_files_command(&expanded_path);
1123
1124        tracing::debug!(
1125            host = %host,
1126            remote_path = %expanded_path,
1127            local_path = %local_path.display(),
1128            "listing regular files for scp sync"
1129        );
1130
1131        let timeout_secs = self.connection_timeout.max(1);
1132        let mut cmd = Command::new("ssh");
1133        cmd.args(strict_ssh_cli_tokens(timeout_secs))
1134            .arg("--")
1135            .arg(host)
1136            .arg(&find_command)
1137            .stdout(Stdio::piped())
1138            .stderr(Stdio::piped());
1139
1140        let output = match cmd.spawn().and_then(|child| {
1141            wait_for_child_output_with_timeout(child, Duration::from_secs(timeout_secs))
1142        }) {
1143            Ok(Some(o)) => o,
1144            Ok(None) => {
1145                return PathSyncResult {
1146                    remote_path: remote_path.to_string(),
1147                    local_path,
1148                    success: false,
1149                    error: Some(format!(
1150                        "SSH file listing timed out after {timeout_secs} seconds"
1151                    )),
1152                    duration_ms: start.elapsed().as_millis() as u64,
1153                    ..Default::default()
1154                };
1155            }
1156            Err(e) => {
1157                return PathSyncResult {
1158                    remote_path: remote_path.to_string(),
1159                    local_path,
1160                    success: false,
1161                    error: Some(format!("Failed to execute ssh file listing: {}", e)),
1162                    duration_ms: start.elapsed().as_millis() as u64,
1163                    ..Default::default()
1164                };
1165            }
1166        };
1167
1168        let stderr = String::from_utf8_lossy(&output.stderr);
1169        if !output.status.success() {
1170            let error_msg = if stderr.contains("Connection refused")
1171                || stderr.contains("Connection timed out")
1172            {
1173                format!("SSH connection failed: {}", stderr.trim())
1174            } else if is_host_key_verification_failure(&stderr) {
1175                host_key_verification_error(host)
1176            } else if stderr.contains("No such file or directory") {
1177                format!("Remote path not found: {}", expanded_path)
1178            } else if stderr.contains("Permission denied") {
1179                format!("Permission denied: {}", stderr.trim())
1180            } else {
1181                format!("Remote file listing failed: {}", stderr.trim())
1182            };
1183
1184            tracing::warn!(
1185                host = %host,
1186                remote_path = %expanded_path,
1187                error = %error_msg,
1188                "scp file listing failed"
1189            );
1190
1191            return PathSyncResult {
1192                remote_path: remote_path.to_string(),
1193                local_path,
1194                success: false,
1195                error: Some(error_msg),
1196                duration_ms: start.elapsed().as_millis() as u64,
1197                ..Default::default()
1198            };
1199        }
1200
1201        let remote_files = parse_null_terminated_utf8_paths(&output.stdout);
1202        let remote_root = Path::new(&expanded_path);
1203        let leaf_name = Path::new(remote_path)
1204            .file_name()
1205            .and_then(|n| n.to_str())
1206            .unwrap_or("remote");
1207        let mut files_transferred = 0u64;
1208        let mut bytes_transferred = 0u64;
1209
1210        for remote_file in remote_files {
1211            let remote_file_path = Path::new(&remote_file);
1212            let Some(local_file) = remote_file_to_safe_local_path(
1213                remote_root,
1214                remote_file_path,
1215                &local_path,
1216                leaf_name,
1217            ) else {
1218                tracing::warn!(
1219                    remote_path = %remote_file,
1220                    root = %expanded_path,
1221                    "skipping scp file outside listed root"
1222                );
1223                continue;
1224            };
1225
1226            if let Err(e) = reject_local_symlink_below_root(&local_path, &local_file) {
1227                return PathSyncResult {
1228                    remote_path: remote_path.to_string(),
1229                    local_path,
1230                    success: false,
1231                    error: Some(e),
1232                    duration_ms: start.elapsed().as_millis() as u64,
1233                    ..Default::default()
1234                };
1235            }
1236
1237            if let Some(parent) = local_file.parent() {
1238                if let Err(e) = std::fs::create_dir_all(parent) {
1239                    return PathSyncResult {
1240                        remote_path: remote_path.to_string(),
1241                        local_path,
1242                        success: false,
1243                        error: Some(format!("Failed to create {}: {}", parent.display(), e)),
1244                        duration_ms: start.elapsed().as_millis() as u64,
1245                        ..Default::default()
1246                    };
1247                }
1248
1249                if let Err(e) = reject_local_symlink_below_root(&local_path, parent) {
1250                    return PathSyncResult {
1251                        remote_path: remote_path.to_string(),
1252                        local_path,
1253                        success: false,
1254                        error: Some(e),
1255                        duration_ms: start.elapsed().as_millis() as u64,
1256                        ..Default::default()
1257                    };
1258                }
1259            }
1260
1261            if let Err(e) = reject_local_symlink_below_root(&local_path, &local_file) {
1262                return PathSyncResult {
1263                    remote_path: remote_path.to_string(),
1264                    local_path,
1265                    success: false,
1266                    error: Some(e),
1267                    duration_ms: start.elapsed().as_millis() as u64,
1268                    ..Default::default()
1269                };
1270            }
1271
1272            let temp_path =
1273                unique_atomic_sidecar_path(&local_file, "download", "cass-sync-scp-download");
1274            let Some(temp_path_str) = temp_path.to_str() else {
1275                return PathSyncResult {
1276                    remote_path: remote_path.to_string(),
1277                    local_path,
1278                    success: false,
1279                    error: Some("Local path contains invalid UTF-8".to_string()),
1280                    duration_ms: start.elapsed().as_millis() as u64,
1281                    ..Default::default()
1282                };
1283            };
1284            if let Err(e) = std::fs::OpenOptions::new()
1285                .write(true)
1286                .create_new(true)
1287                .open(&temp_path)
1288                .and_then(|file| file.sync_all())
1289            {
1290                return PathSyncResult {
1291                    remote_path: remote_path.to_string(),
1292                    local_path,
1293                    success: false,
1294                    error: Some(format!("Failed to create {}: {}", temp_path.display(), e)),
1295                    duration_ms: start.elapsed().as_millis() as u64,
1296                    ..Default::default()
1297                };
1298            }
1299
1300            let remote_spec = remote_spec_for_scp(host, &remote_file);
1301            let mut cmd = Command::new("scp");
1302            cmd.args([
1303                "-B",
1304                "-o",
1305                &format!("ConnectTimeout={}", connect_timeout),
1306                "-o",
1307                "ServerAliveInterval=15",
1308                "-o",
1309                "ServerAliveCountMax=3",
1310                "-o",
1311                "StrictHostKeyChecking=yes",
1312                "--",
1313                &remote_spec,
1314                temp_path_str,
1315            ]);
1316
1317            let output = match cmd.output() {
1318                Ok(o) => o,
1319                Err(e) => {
1320                    return PathSyncResult {
1321                        remote_path: remote_path.to_string(),
1322                        local_path,
1323                        success: false,
1324                        error: Some(format!("Failed to execute scp: {}", e)),
1325                        duration_ms: start.elapsed().as_millis() as u64,
1326                        ..Default::default()
1327                    };
1328                }
1329            };
1330
1331            if !output.status.success() {
1332                let _ = std::fs::remove_file(&temp_path);
1333                let stderr = String::from_utf8_lossy(&output.stderr);
1334                let error_msg = if is_host_key_verification_failure(&stderr) {
1335                    host_key_verification_error(host)
1336                } else if stderr.contains("Permission denied") {
1337                    format!("Permission denied: {}", stderr.trim())
1338                } else {
1339                    format!("scp failed: {}", stderr.trim())
1340                };
1341
1342                tracing::warn!(
1343                    host = %host,
1344                    remote_path = %remote_file,
1345                    error = %error_msg,
1346                    "scp file transfer failed"
1347                );
1348
1349                return PathSyncResult {
1350                    remote_path: remote_path.to_string(),
1351                    local_path,
1352                    success: false,
1353                    error: Some(error_msg),
1354                    duration_ms: start.elapsed().as_millis() as u64,
1355                    ..Default::default()
1356                };
1357            }
1358
1359            files_transferred += 1;
1360            if let Err(e) = sync_file_path(&temp_path) {
1361                return PathSyncResult {
1362                    remote_path: remote_path.to_string(),
1363                    local_path,
1364                    success: false,
1365                    error: Some(format!("Failed to sync {}: {}", temp_path.display(), e)),
1366                    duration_ms: start.elapsed().as_millis() as u64,
1367                    ..Default::default()
1368                };
1369            }
1370            if let Ok(metadata) = std::fs::metadata(&temp_path) {
1371                bytes_transferred = bytes_transferred.saturating_add(metadata.len());
1372            }
1373            if let Err(e) = replace_file_from_temp(&temp_path, &local_file) {
1374                return PathSyncResult {
1375                    remote_path: remote_path.to_string(),
1376                    local_path,
1377                    success: false,
1378                    error: Some(format!(
1379                        "Failed to publish {} to {}: {}",
1380                        temp_path.display(),
1381                        local_file.display(),
1382                        e
1383                    )),
1384                    duration_ms: start.elapsed().as_millis() as u64,
1385                    ..Default::default()
1386                };
1387            }
1388        }
1389
1390        let duration_ms = start.elapsed().as_millis() as u64;
1391
1392        tracing::info!(
1393            host = %host,
1394            remote_path = %expanded_path,
1395            files = files_transferred,
1396            bytes = bytes_transferred,
1397            duration_ms,
1398            "scp sync completed"
1399        );
1400
1401        PathSyncResult {
1402            remote_path: remote_path.to_string(),
1403            local_path,
1404            files_transferred,
1405            bytes_transferred,
1406            success: true,
1407            error: None,
1408            duration_ms,
1409        }
1410    }
1411
1412    /// Sync a single path using SFTP (fallback when rsync unavailable).
1413    ///
1414    /// Uses the ssh2 crate for SFTP transfers. Authenticates via SSH agent
1415    /// or key file from SSH config.
1416    fn sync_path_sftp(
1417        &self,
1418        host: &str,
1419        remote_path: &str,
1420        dest_dir: &Path,
1421        remote_home: Option<&str>,
1422    ) -> PathSyncResult {
1423        let start = Instant::now();
1424        if remote_path.starts_with('~') && remote_home.is_none() {
1425            let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
1426            return PathSyncResult {
1427                remote_path: remote_path.to_string(),
1428                local_path,
1429                success: false,
1430                error: Some(
1431                    "Cannot expand '~' in remote path; failed to determine remote home directory"
1432                        .to_string(),
1433                ),
1434                duration_ms: start.elapsed().as_millis() as u64,
1435                ..Default::default()
1436            };
1437        }
1438        let expanded_path = Self::expand_tilde_with_home(remote_path, remote_home);
1439        // Use raw remote_path for stability (independent of home expansion success)
1440        let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
1441
1442        // Create local directory without following any pre-existing mirror symlink.
1443        if let Err(e) = prepare_local_sync_container(dest_dir, &local_path) {
1444            return PathSyncResult {
1445                remote_path: remote_path.to_string(),
1446                local_path,
1447                success: false,
1448                error: Some(e),
1449                duration_ms: start.elapsed().as_millis() as u64,
1450                ..Default::default()
1451            };
1452        }
1453
1454        // Parse host to extract user if present (user@host format)
1455        let (ssh_user, ssh_host) = parse_ssh_host(host);
1456
1457        // Look up host in SSH config for connection details
1458        // First try matching by SSH config alias (Host line), then by actual hostname
1459        let ssh_config = discover_ssh_hosts();
1460        let host_config = ssh_config.iter().find(|h| h.name == ssh_host).or_else(|| {
1461            ssh_config
1462                .iter()
1463                .find(|h| h.hostname.as_deref() == Some(ssh_host))
1464        });
1465
1466        // Determine connection parameters
1467        let hostname = host_config
1468            .and_then(|h| h.hostname.as_deref())
1469            .unwrap_or(ssh_host);
1470        let port = host_config.and_then(|h| h.port).unwrap_or(22);
1471        // Resolve username deterministically; never guess with a sentinel value.
1472        let username = match first_nonblank_username([
1473            ssh_user,
1474            host_config.and_then(|h| h.user.as_deref()),
1475        ])
1476        .or_else(|| env_username("USER"))
1477        .or_else(|| env_username("LOGNAME"))
1478        {
1479            Some(user) => user,
1480            None => {
1481                return PathSyncResult {
1482                    remote_path: remote_path.to_string(),
1483                    local_path,
1484                    success: false,
1485                    error: Some(format!(
1486                        "Unable to determine SSH username for host '{}' (missing/blank user@host, SSH config user, USER, and LOGNAME)",
1487                        host
1488                    )),
1489                    duration_ms: start.elapsed().as_millis() as u64,
1490                    ..Default::default()
1491                };
1492            }
1493        };
1494        let identity_file = host_config.and_then(|h| h.identity_file.as_deref());
1495
1496        tracing::debug!(
1497            hostname = %hostname,
1498            port,
1499            username = %username,
1500            identity_file = ?identity_file,
1501            remote_path = %expanded_path,
1502            "SFTP connection parameters"
1503        );
1504
1505        // Connect via TCP with connection timeout
1506        let conn_timeout = std::time::Duration::from_secs(self.connection_timeout);
1507        let addr = format!("{}:{}", hostname, port);
1508        let sock_addr: std::net::SocketAddr = match addr.parse().or_else(|_| {
1509            // Resolve hostname to socket address
1510            use std::net::ToSocketAddrs;
1511            (hostname, port)
1512                .to_socket_addrs()
1513                .ok()
1514                .and_then(|mut addrs| addrs.next())
1515                .ok_or(std::io::Error::new(
1516                    std::io::ErrorKind::InvalidInput,
1517                    "cannot resolve hostname",
1518                ))
1519        }) {
1520            Ok(a) => a,
1521            Err(e) => {
1522                return PathSyncResult {
1523                    remote_path: remote_path.to_string(),
1524                    local_path,
1525                    success: false,
1526                    error: Some(format!("DNS resolution failed for {hostname}:{port}: {e}")),
1527                    duration_ms: start.elapsed().as_millis() as u64,
1528                    ..Default::default()
1529                };
1530            }
1531        };
1532        let tcp = match TcpStream::connect_timeout(&sock_addr, conn_timeout) {
1533            Ok(t) => t,
1534            Err(e) => {
1535                return PathSyncResult {
1536                    remote_path: remote_path.to_string(),
1537                    local_path,
1538                    success: false,
1539                    error: Some(format!(
1540                        "TCP connection failed to {}:{}: {}",
1541                        hostname, port, e
1542                    )),
1543                    duration_ms: start.elapsed().as_millis() as u64,
1544                    ..Default::default()
1545                };
1546            }
1547        };
1548
1549        // Set TCP read/write timeout (use transfer_timeout, not connection_timeout)
1550        let timeout = std::time::Duration::from_secs(self.transfer_timeout);
1551        if let Err(e) = tcp.set_read_timeout(Some(timeout)) {
1552            tracing::warn!("Failed to set TCP read timeout: {}", e);
1553        }
1554        if let Err(e) = tcp.set_write_timeout(Some(timeout)) {
1555            tracing::warn!("Failed to set TCP write timeout: {}", e);
1556        }
1557        let tcp_shutdown = tcp.try_clone().ok();
1558
1559        // Create SSH session
1560        let mut session = match Session::new() {
1561            Ok(s) => s,
1562            Err(e) => {
1563                let _ = tcp.shutdown(Shutdown::Both);
1564                return PathSyncResult {
1565                    remote_path: remote_path.to_string(),
1566                    local_path,
1567                    success: false,
1568                    error: Some(format!("Failed to create SSH session: {}", e)),
1569                    duration_ms: start.elapsed().as_millis() as u64,
1570                    ..Default::default()
1571                };
1572            }
1573        };
1574
1575        session.set_tcp_stream(tcp);
1576        let close_connections = |session: &mut Session, reason: &str| {
1577            let _ = session.disconnect(None, reason, None);
1578            if let Some(stream) = tcp_shutdown.as_ref() {
1579                let _ = stream.shutdown(Shutdown::Both);
1580            }
1581        };
1582
1583        if let Err(e) = session.handshake() {
1584            close_connections(&mut session, "handshake failed");
1585            return PathSyncResult {
1586                remote_path: remote_path.to_string(),
1587                local_path,
1588                success: false,
1589                error: Some(format!("SSH handshake failed: {}", e)),
1590                duration_ms: start.elapsed().as_millis() as u64,
1591                ..Default::default()
1592            };
1593        }
1594
1595        // Authenticate - try agent first, then key file
1596        if let Err(e) = self.authenticate_ssh(&session, &username, identity_file) {
1597            close_connections(&mut session, "authentication failed");
1598            return PathSyncResult {
1599                remote_path: remote_path.to_string(),
1600                local_path,
1601                success: false,
1602                error: Some(format!("SSH authentication failed: {}", e)),
1603                duration_ms: start.elapsed().as_millis() as u64,
1604                ..Default::default()
1605            };
1606        }
1607
1608        // Open SFTP session
1609        let sftp = match session.sftp() {
1610            Ok(s) => s,
1611            Err(e) => {
1612                close_connections(&mut session, "sftp open failed");
1613                return PathSyncResult {
1614                    remote_path: remote_path.to_string(),
1615                    local_path,
1616                    success: false,
1617                    error: Some(format!("Failed to open SFTP session: {}", e)),
1618                    duration_ms: start.elapsed().as_millis() as u64,
1619                    ..Default::default()
1620                };
1621            }
1622        };
1623
1624        tracing::info!(
1625            host = %host,
1626            remote_path = %expanded_path,
1627            local_path = %local_path.display(),
1628            "starting SFTP sync"
1629        );
1630
1631        // Recursively download the remote path
1632        let mut files_transferred = 0u64;
1633        let mut bytes_transferred = 0u64;
1634
1635        // For consistency with rsync and scp, we should create a subdirectory
1636        // with the remote path's leaf name inside the container directory.
1637        let leaf_name = Path::new(remote_path)
1638            .file_name()
1639            .and_then(|n| n.to_str())
1640            .unwrap_or("remote");
1641        let target_local_path = local_path.join(leaf_name);
1642
1643        if let Err(e) = self.sftp_download_recursive(
1644            &sftp,
1645            Path::new(&expanded_path),
1646            &target_local_path,
1647            &local_path,
1648            &mut files_transferred,
1649            &mut bytes_transferred,
1650        ) {
1651            close_connections(&mut session, "sftp download failed");
1652            return PathSyncResult {
1653                remote_path: remote_path.to_string(),
1654                local_path,
1655                files_transferred,
1656                bytes_transferred,
1657                success: false,
1658                error: Some(format!("SFTP download failed: {}", e)),
1659                duration_ms: start.elapsed().as_millis() as u64,
1660            };
1661        }
1662
1663        let duration_ms = start.elapsed().as_millis() as u64;
1664
1665        tracing::info!(
1666            host = %host,
1667            remote_path = %expanded_path,
1668            files = files_transferred,
1669            bytes = bytes_transferred,
1670            duration_ms,
1671            "SFTP sync completed"
1672        );
1673
1674        close_connections(&mut session, "sync complete");
1675        PathSyncResult {
1676            remote_path: remote_path.to_string(),
1677            local_path,
1678            files_transferred,
1679            bytes_transferred,
1680            success: true,
1681            error: None,
1682            duration_ms,
1683        }
1684    }
1685
1686    /// Authenticate SSH session using agent or key file.
1687    fn authenticate_ssh(
1688        &self,
1689        session: &Session,
1690        username: &str,
1691        identity_file: Option<&str>,
1692    ) -> Result<(), String> {
1693        // Try SSH agent first
1694        if let Ok(mut agent) = session.agent()
1695            && agent.connect().is_ok()
1696            && agent.list_identities().is_ok()
1697        {
1698            for identity in agent.identities().unwrap_or_default() {
1699                if agent.userauth(username, &identity).is_ok() && session.authenticated() {
1700                    tracing::debug!("Authenticated via SSH agent");
1701                    return Ok(());
1702                }
1703            }
1704        }
1705
1706        // Try key file if specified
1707        if let Some(key_path) = identity_file {
1708            let key_path_expanded = expand_tilde_local(key_path);
1709            let key_path_buf = Path::new(&key_path_expanded);
1710
1711            if key_path_buf.exists()
1712                && session
1713                    .userauth_pubkey_file(username, None, key_path_buf, None)
1714                    .is_ok()
1715                && session.authenticated()
1716            {
1717                tracing::debug!(key = %key_path_buf.display(), "Authenticated via key file");
1718                return Ok(());
1719            }
1720        }
1721
1722        // Try default key locations
1723        if let Some(home) = dirs::home_dir() {
1724            for key_name in ["id_ed25519", "id_rsa", "id_ecdsa"] {
1725                let key_path = home.join(".ssh").join(key_name);
1726                if key_path.exists()
1727                    && session
1728                        .userauth_pubkey_file(username, None, &key_path, None)
1729                        .is_ok()
1730                    && session.authenticated()
1731                {
1732                    tracing::debug!(key = %key_path.display(), "Authenticated via default key");
1733                    return Ok(());
1734                }
1735            }
1736        }
1737
1738        Err(format!(
1739            "No valid authentication method found for user '{}'",
1740            username
1741        ))
1742    }
1743
1744    /// Recursively download a remote path via SFTP.
1745    fn sftp_download_recursive(
1746        &self,
1747        sftp: &Sftp,
1748        remote_path: &Path,
1749        local_path: &Path,
1750        local_root: &Path,
1751        files_transferred: &mut u64,
1752        bytes_transferred: &mut u64,
1753    ) -> Result<(), String> {
1754        // Use lstat so a remote symlink is classified as a symlink rather than
1755        // followed to a file or directory outside the configured source root.
1756        let stat = sftp
1757            .lstat(remote_path)
1758            .map_err(|e| format!("Failed to lstat {}: {}", remote_path.display(), e))?;
1759
1760        if sftp_file_stat_is_symlink(&stat) {
1761            tracing::warn!(
1762                path = %remote_path.display(),
1763                "Skipping remote symlink during SFTP sync"
1764            );
1765            return Ok(());
1766        }
1767
1768        if stat.is_dir() {
1769            // Create local directory for this directory item
1770            reject_local_symlink_below_root(local_root, local_path)?;
1771            std::fs::create_dir_all(local_path)
1772                .map_err(|e| format!("Failed to create {}: {}", local_path.display(), e))?;
1773            reject_local_symlink_below_root(local_root, local_path)?;
1774
1775            // List directory contents
1776            let entries = sftp
1777                .readdir(remote_path)
1778                .map_err(|e| format!("Failed to list {}: {}", remote_path.display(), e))?;
1779
1780            for (entry_path, _entry_stat) in entries {
1781                let Some(file_name) = sftp_entry_file_name(&entry_path, remote_path) else {
1782                    continue;
1783                };
1784
1785                let entry_stat = sftp
1786                    .lstat(&entry_path)
1787                    .map_err(|e| format!("Failed to lstat {}: {}", entry_path.display(), e))?;
1788                if sftp_file_stat_is_symlink(&entry_stat) {
1789                    tracing::warn!(
1790                        path = %entry_path.display(),
1791                        "Skipping remote symlink during SFTP sync"
1792                    );
1793                    continue;
1794                }
1795
1796                let local_entry = local_path.join(file_name);
1797
1798                if entry_stat.is_dir() {
1799                    // Recurse into subdirectory
1800                    self.sftp_download_recursive(
1801                        sftp,
1802                        &entry_path,
1803                        &local_entry,
1804                        local_root,
1805                        files_transferred,
1806                        bytes_transferred,
1807                    )?;
1808                } else if entry_stat.is_file() {
1809                    // Download file
1810                    if self.sftp_download_file(
1811                        sftp,
1812                        &entry_path,
1813                        &local_entry,
1814                        local_root,
1815                        bytes_transferred,
1816                    )? {
1817                        *files_transferred += 1;
1818                    }
1819                }
1820                // Skip symlinks and other types for safety
1821            }
1822        } else if stat.is_file() {
1823            // Ensure the parent directory exists
1824            if let Some(parent) = local_path.parent() {
1825                reject_local_symlink_below_root(local_root, parent)?;
1826                std::fs::create_dir_all(parent).map_err(|e| {
1827                    format!("Failed to create local dir {}: {}", parent.display(), e)
1828                })?;
1829                reject_local_symlink_below_root(local_root, parent)?;
1830            }
1831
1832            if self.sftp_download_file(
1833                sftp,
1834                remote_path,
1835                local_path,
1836                local_root,
1837                bytes_transferred,
1838            )? {
1839                *files_transferred += 1;
1840            }
1841        } else {
1842            // Not a regular file or directory (symlink, socket, etc.) - skip with warning
1843            tracing::warn!(
1844                path = %remote_path.display(),
1845                "Skipping remote path: not a regular file or directory"
1846            );
1847        }
1848
1849        Ok(())
1850    }
1851
1852    /// Download a single file via SFTP.
1853    fn sftp_download_file(
1854        &self,
1855        sftp: &Sftp,
1856        remote_path: &Path,
1857        local_path: &Path,
1858        local_root: &Path,
1859        bytes_transferred: &mut u64,
1860    ) -> Result<bool, String> {
1861        let stat = sftp
1862            .lstat(remote_path)
1863            .map_err(|e| format!("Failed to lstat {}: {}", remote_path.display(), e))?;
1864        if sftp_file_stat_is_symlink(&stat) {
1865            tracing::warn!(
1866                path = %remote_path.display(),
1867                "Skipping remote symlink during SFTP sync"
1868            );
1869            return Ok(false);
1870        }
1871        if !stat.is_file() {
1872            tracing::warn!(
1873                path = %remote_path.display(),
1874                "Skipping remote path: not a regular file"
1875            );
1876            return Ok(false);
1877        }
1878
1879        let mut remote_file = sftp
1880            .open(remote_path)
1881            .map_err(|e| format!("Failed to open {}: {}", remote_path.display(), e))?;
1882
1883        reject_local_symlink_below_root(local_root, local_path)?;
1884
1885        let temp_path = unique_atomic_sidecar_path(local_path, "download", "cass-sync-download");
1886        let mut local_file = std::fs::OpenOptions::new()
1887            .write(true)
1888            .create_new(true)
1889            .open(&temp_path)
1890            .map_err(|e| format!("Failed to create {}: {}", temp_path.display(), e))?;
1891
1892        // Transfer in chunks
1893        let mut buffer = [0u8; 32768]; // 32KB chunks
1894        loop {
1895            let bytes_read = remote_file
1896                .read(&mut buffer)
1897                .map_err(|e| format!("Failed to read {}: {}", remote_path.display(), e))?;
1898
1899            if bytes_read == 0 {
1900                break;
1901            }
1902
1903            local_file
1904                .write_all(&buffer[..bytes_read])
1905                .map_err(|e| format!("Failed to write {}: {}", local_path.display(), e))?;
1906
1907            *bytes_transferred += bytes_read as u64;
1908        }
1909
1910        tracing::trace!(
1911            remote = %remote_path.display(),
1912            local = %local_path.display(),
1913            "downloaded file"
1914        );
1915
1916        local_file
1917            .sync_all()
1918            .map_err(|e| format!("Failed to sync {}: {}", temp_path.display(), e))?;
1919        drop(local_file);
1920        replace_file_from_temp(&temp_path, local_path).map_err(|e| {
1921            format!(
1922                "Failed to publish {} to {}: {}",
1923                temp_path.display(),
1924                local_path.display(),
1925                e
1926            )
1927        })?;
1928
1929        Ok(true)
1930    }
1931}
1932
1933/// Resolve an SFTP entry's basename for local mirroring.
1934fn sftp_entry_file_name<'a>(entry_path: &'a Path, parent_path: &Path) -> Option<&'a str> {
1935    let Some(file_name) = entry_path.file_name() else {
1936        tracing::warn!(
1937            parent = %parent_path.display(),
1938            entry = ?entry_path,
1939            "Skipping SFTP entry without a file name"
1940        );
1941        return None;
1942    };
1943
1944    let Some(file_name) = file_name.to_str() else {
1945        tracing::warn!(
1946            parent = %parent_path.display(),
1947            entry = ?entry_path,
1948            "Skipping SFTP entry with non-UTF-8 file name"
1949        );
1950        return None;
1951    };
1952
1953    if file_name.is_empty() {
1954        tracing::warn!(
1955            parent = %parent_path.display(),
1956            entry = ?entry_path,
1957            "Skipping SFTP entry with empty file name"
1958        );
1959        return None;
1960    }
1961
1962    if file_name == "." || file_name == ".." {
1963        return None;
1964    }
1965
1966    Some(file_name)
1967}
1968
1969/// Check whether the `scp` executable exists on this system.
1970///
1971/// Uses a simple PATH search rather than running `scp` (which exits non-zero
1972/// when invoked without arguments on many platforms).
1973fn which_scp_exists() -> bool {
1974    std::env::var_os("PATH")
1975        .map(|path_var| {
1976            std::env::split_paths(&path_var).any(|dir| {
1977                let candidate = dir.join(if cfg!(target_os = "windows") {
1978                    "scp.exe"
1979                } else {
1980                    "scp"
1981                });
1982                candidate.is_file()
1983            })
1984        })
1985        .unwrap_or(false)
1986}
1987
1988/// Convert a Windows absolute path to a WSL-accessible `/mnt/<drive>/…` path.
1989///
1990/// E.g. `C:\Users\george\AppData\Roaming\cass` →
1991///      `/mnt/c/Users/george/AppData/Roaming/cass`
1992///
1993/// If the path does not look like a Windows drive path it is returned unchanged.
1994fn windows_path_to_wsl(path: &str) -> String {
1995    // Match "C:\..." or "C:/..."
1996    if path.len() >= 3 {
1997        let bytes = path.as_bytes();
1998        if bytes[1] == b':' && (bytes[2] == b'\\' || bytes[2] == b'/') {
1999            let drive = (bytes[0] as char).to_lowercase().next().unwrap_or('c');
2000            let rest = path[3..].replace('\\', "/");
2001            return format!("/mnt/{}/{}", drive, rest);
2002        }
2003    }
2004    path.to_string()
2005}
2006
2007/// Parse SSH host string into (optional_user, host).
2008///
2009/// Examples:
2010/// - "myserver" -> (None, "myserver")
2011/// - "user@myserver" -> (Some("user"), "myserver")
2012fn parse_ssh_host(host: &str) -> (Option<&str>, &str) {
2013    if let Some(at_pos) = host.find('@') {
2014        let user = &host[..at_pos];
2015        let hostname = &host[at_pos + 1..];
2016        (Some(user), hostname)
2017    } else {
2018        (None, host)
2019    }
2020}
2021
2022fn first_nonblank_username<'a>(
2023    candidates: impl IntoIterator<Item = Option<&'a str>>,
2024) -> Option<String> {
2025    candidates.into_iter().find_map(|candidate| {
2026        let trimmed = candidate?.trim();
2027        if trimmed.is_empty() {
2028            None
2029        } else {
2030            Some(trimmed.to_string())
2031        }
2032    })
2033}
2034
2035fn env_username(key: &str) -> Option<String> {
2036    dotenvy::var(key)
2037        .ok()
2038        .and_then(|value| first_nonblank_username([Some(value.as_str())]))
2039}
2040
2041/// Expand tilde in local paths.
2042fn expand_tilde_local(path: &str) -> String {
2043    if let Some(stripped) = path.strip_prefix("~/")
2044        && let Some(home) = dirs::home_dir()
2045    {
2046        return format!("{}/{}", home.display(), stripped);
2047    } else if path == "~"
2048        && let Some(home) = dirs::home_dir()
2049    {
2050        return home.display().to_string();
2051    }
2052    path.to_string()
2053}
2054
2055/// Convert a remote path to a safe directory name.
2056///
2057/// Sanitizes path by:
2058/// - Removing leading `~` and `/`
2059/// - Replacing path separators and spaces with underscores
2060/// - Removing parent directory references (`..`) to prevent traversal attacks
2061/// - Removing current directory references (`.`)
2062/// - Appending a stable hash to prevent collisions (e.g., "foo/bar" vs "foo_bar")
2063pub fn path_to_safe_dirname(path: &str) -> String {
2064    use std::path::{Component, Path};
2065
2066    let path_obj = Path::new(path);
2067    let mut parts: Vec<&str> = Vec::new();
2068
2069    for component in path_obj.components() {
2070        match component {
2071            Component::Normal(name) => {
2072                if let Some(s) = name.to_str() {
2073                    // Skip "~" (home directory marker) and empty/dot-only components
2074                    if !s.is_empty() && s != "." && s != "~" {
2075                        parts.push(s);
2076                    }
2077                }
2078            }
2079            // Skip all traversal components for security
2080            Component::ParentDir
2081            | Component::CurDir
2082            | Component::RootDir
2083            | Component::Prefix(_) => {}
2084        }
2085    }
2086
2087    let cleaned = parts.join("_").replace([' ', '\\'], "_");
2088
2089    // Append stable hash to prevent collisions
2090    let hash = fnv1a_hash(path);
2091    let hash_suffix = format!("{:08x}", hash);
2092
2093    if cleaned.is_empty() {
2094        format!("root_{}", hash_suffix)
2095    } else {
2096        format!("{}_{}", cleaned, hash_suffix)
2097    }
2098}
2099
2100fn fnv1a_hash(text: &str) -> u64 {
2101    let mut hash: u64 = 0xcbf29ce484222325;
2102    for byte in text.bytes() {
2103        hash ^= u64::from(byte);
2104        hash = hash.wrapping_mul(0x100000001b3);
2105    }
2106    hash
2107}
2108
2109/// Parse transfer statistics from rsync --stats output.
2110fn parse_rsync_stats(output: &str) -> RsyncStats {
2111    let mut stats = RsyncStats::default();
2112
2113    for line in output.lines() {
2114        let line = line.trim();
2115
2116        // Parse "Number of regular files transferred: N"
2117        if line.starts_with("Number of regular files transferred:")
2118            && let Some(num_str) = line.split(':').nth(1)
2119        {
2120            stats.files_transferred = num_str.trim().replace(',', "").parse().unwrap_or(0);
2121        }
2122
2123        // Parse "Total transferred file size: N bytes"
2124        if line.starts_with("Total transferred file size:")
2125            && let Some(size_part) = line.split(':').nth(1)
2126        {
2127            // Handle formats like "1,234 bytes" or "1234"
2128            let size_str = size_part
2129                .split_whitespace()
2130                .next()
2131                .unwrap_or("0")
2132                .replace(',', "");
2133            stats.bytes_transferred = size_str.parse().unwrap_or(0);
2134        }
2135    }
2136
2137    stats
2138}
2139
2140// =============================================================================
2141// Sync Status Persistence
2142// =============================================================================
2143
2144/// Result of a sync operation for a source.
2145#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
2146#[serde(rename_all = "snake_case")]
2147pub enum SyncResult {
2148    /// Sync completed successfully.
2149    Success,
2150    /// Some paths synced, some failed.
2151    PartialFailure(String),
2152    /// Sync failed completely.
2153    Failed(String),
2154    /// Sync was skipped (e.g., dry run).
2155    #[default]
2156    Skipped,
2157}
2158
2159impl SyncResult {
2160    /// Short display label for the result.
2161    pub fn label(&self) -> &'static str {
2162        match self {
2163            Self::Success => "success",
2164            Self::PartialFailure(_) => "partial",
2165            Self::Failed(_) => "failed",
2166            Self::Skipped => "never",
2167        }
2168    }
2169
2170    /// Error text for partial/full failures.
2171    pub fn error_message(&self) -> Option<&str> {
2172        match self {
2173            Self::PartialFailure(error) | Self::Failed(error) => Some(error.as_str()),
2174            Self::Success | Self::Skipped => None,
2175        }
2176    }
2177}
2178
2179/// Scheduler action for a remote source.
2180#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
2181#[serde(rename_all = "snake_case")]
2182pub enum SourceSyncAction {
2183    /// The source is eligible to sync now.
2184    Sync,
2185    /// The source is healthy enough but not due under its configured schedule.
2186    Skip,
2187    /// The source is temporarily or operationally unsafe to sync automatically.
2188    Defer,
2189}
2190
2191impl SourceSyncAction {
2192    pub fn as_str(self) -> &'static str {
2193        match self {
2194            Self::Sync => "sync",
2195            Self::Skip => "skip",
2196            Self::Defer => "defer",
2197        }
2198    }
2199}
2200
2201/// Health class used by the adaptive source scheduler.
2202#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
2203#[serde(rename_all = "snake_case")]
2204pub enum SourceHealthKind {
2205    NeverSynced,
2206    Healthy,
2207    Stale,
2208    HighLatency,
2209    Flapping,
2210    AuthFailed,
2211    BackingOff,
2212}
2213
2214impl SourceHealthKind {
2215    pub fn as_str(self) -> &'static str {
2216        match self {
2217            Self::NeverSynced => "never_synced",
2218            Self::Healthy => "healthy",
2219            Self::Stale => "stale",
2220            Self::HighLatency => "high_latency",
2221            Self::Flapping => "flapping",
2222            Self::AuthFailed => "auth_failed",
2223            Self::BackingOff => "backing_off",
2224        }
2225    }
2226}
2227
2228/// Evidence-backed scheduling decision for one source.
2229#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
2230pub struct SourceSyncDecision {
2231    /// Decision action the scheduler would take.
2232    pub action: SourceSyncAction,
2233    /// Current health class inferred from durable sync state.
2234    pub health: SourceHealthKind,
2235    /// Coarse 0..=100 health score for sorting/explanations.
2236    pub health_score: u8,
2237    /// Age of the last sync attempt, capped at zero when clocks move backward.
2238    pub staleness_ms: Option<i64>,
2239    /// Coarse 0..=100 estimate of value from refreshing stale remote data.
2240    pub stale_value_score: u8,
2241    /// Whether an explicit operator request is overriding automatic scheduling.
2242    pub manual_override: bool,
2243    /// Whether the decision is using the conservative fallback path.
2244    pub fallback_active: bool,
2245    /// Next time this source is eligible under its configured schedule.
2246    pub next_eligible_sync_ms: Option<i64>,
2247    /// End of transient failure backoff when applicable.
2248    pub backoff_until_ms: Option<i64>,
2249    /// Human-readable evidence terms, stable enough for robot consumers.
2250    pub reasons: Vec<String>,
2251}
2252
2253impl SourceSyncDecision {
2254    fn evaluate(
2255        source: &SourceDefinition,
2256        info: Option<&SourceSyncInfo>,
2257        now_ms: i64,
2258        manual_override: bool,
2259    ) -> Self {
2260        let period_ms = sync_schedule_period_ms(source.sync_schedule);
2261        let next_eligible_sync_ms = info
2262            .and_then(|info| info.last_sync)
2263            .and_then(|last_sync| period_ms.map(|period| last_sync.saturating_add(period)));
2264        let backoff_until_ms = info.and_then(failure_backoff_until_ms);
2265        let staleness_ms = info.and_then(|info| {
2266            info.last_sync
2267                .map(|last_sync| now_ms.saturating_sub(last_sync).max(0))
2268        });
2269        let stale_value_score =
2270            stale_value_score_for_source(source.sync_schedule, staleness_ms, info);
2271        let mut reasons = Vec::new();
2272
2273        let health = match info {
2274            None => {
2275                reasons.push("no durable sync status exists for this source".to_string());
2276                SourceHealthKind::NeverSynced
2277            }
2278            Some(info) if info.last_sync.is_none() => {
2279                reasons.push("source has never completed or attempted a sync".to_string());
2280                SourceHealthKind::NeverSynced
2281            }
2282            Some(info) if sync_result_auth_failure(&info.last_result) => {
2283                reasons
2284                    .push("last sync failed with an authentication or host-key error".to_string());
2285                SourceHealthKind::AuthFailed
2286            }
2287            Some(info) if matches!(info.last_result, SyncResult::PartialFailure(_)) => {
2288                reasons.push("last sync partially succeeded and partially failed".to_string());
2289                SourceHealthKind::Flapping
2290            }
2291            Some(info)
2292                if info.consecutive_failures > 0
2293                    && backoff_until_ms.is_some_and(|until| until > now_ms) =>
2294            {
2295                reasons.push(format!(
2296                    "{} consecutive failure(s) are inside retry backoff",
2297                    info.consecutive_failures
2298                ));
2299                SourceHealthKind::BackingOff
2300            }
2301            Some(info) if matches!(info.last_result, SyncResult::Failed(_)) => {
2302                let error = info.last_result.error_message().unwrap_or("unknown error");
2303                reasons.push(format!(
2304                    "last sync failed completely ({error}); local fallback remains active"
2305                ));
2306                SourceHealthKind::Flapping
2307            }
2308            Some(info) if info.duration_ms >= SOURCE_HIGH_LATENCY_MS => {
2309                reasons.push(format!(
2310                    "last sync took {}ms, above {}ms high-latency guard",
2311                    info.duration_ms, SOURCE_HIGH_LATENCY_MS
2312                ));
2313                SourceHealthKind::HighLatency
2314            }
2315            Some(info) if sync_schedule_due(info.last_sync, period_ms, now_ms) => {
2316                reasons.push("configured sync schedule is due".to_string());
2317                SourceHealthKind::Stale
2318            }
2319            Some(_) => SourceHealthKind::Healthy,
2320        };
2321
2322        let fallback_active = matches!(
2323            health,
2324            SourceHealthKind::AuthFailed
2325                | SourceHealthKind::BackingOff
2326                | SourceHealthKind::Flapping
2327                | SourceHealthKind::HighLatency
2328        );
2329
2330        let mut action = if manual_override {
2331            reasons.push("explicit sync command overrides automatic scheduling".to_string());
2332            SourceSyncAction::Sync
2333        } else {
2334            automatic_source_sync_action(source.sync_schedule, health, info, now_ms)
2335        };
2336
2337        if !manual_override && matches!(health, SourceHealthKind::AuthFailed) {
2338            action = SourceSyncAction::Defer;
2339        }
2340
2341        if !manual_override && matches!(source.sync_schedule, SyncSchedule::Manual) {
2342            reasons.push("sync_schedule=manual requires an explicit sync command".to_string());
2343        }
2344
2345        if !manual_override
2346            && matches!(action, SourceSyncAction::Skip)
2347            && let Some(next_ms) = next_eligible_sync_ms
2348        {
2349            reasons.push(format!(
2350                "next scheduled sync is eligible at unix_ms={next_ms}"
2351            ));
2352        }
2353
2354        if reasons.is_empty() {
2355            reasons.push("source is healthy and within schedule".to_string());
2356        }
2357
2358        Self {
2359            action,
2360            health,
2361            health_score: health_score_for_source(health),
2362            staleness_ms,
2363            stale_value_score,
2364            manual_override,
2365            fallback_active,
2366            next_eligible_sync_ms,
2367            backoff_until_ms,
2368            reasons,
2369        }
2370    }
2371}
2372
2373/// Sync information for a single source.
2374#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
2375pub struct SourceSyncInfo {
2376    /// Timestamp of last sync attempt.
2377    pub last_sync: Option<i64>,
2378    /// Result of last sync.
2379    pub last_result: SyncResult,
2380    /// Number of files synced in last sync.
2381    pub files_synced: u64,
2382    /// Number of bytes transferred in last sync.
2383    pub bytes_transferred: u64,
2384    /// Duration of last sync in milliseconds.
2385    pub duration_ms: u64,
2386    /// Consecutive failed sync attempts, reset to zero by a fully successful sync.
2387    #[serde(default)]
2388    pub consecutive_failures: u32,
2389}
2390
2391impl SourceSyncInfo {
2392    /// Build sync info from a sync report using the current wall clock time.
2393    pub fn from_report(report: &SyncReport) -> Self {
2394        let last_result = report.sync_result();
2395        Self {
2396            last_sync: Some(current_unix_ms()),
2397            consecutive_failures: u32::from(!report.all_succeeded),
2398            last_result,
2399            files_synced: report.total_files(),
2400            bytes_transferred: report.total_bytes(),
2401            duration_ms: report.total_duration_ms,
2402        }
2403    }
2404}
2405
2406/// Persistent sync status for all sources.
2407#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
2408pub struct SyncStatus {
2409    /// Sync info per source (keyed by source name).
2410    pub sources: std::collections::HashMap<String, SourceSyncInfo>,
2411}
2412
2413impl SyncStatus {
2414    /// Load sync status from disk.
2415    pub fn load(data_dir: &Path) -> Result<Self, std::io::Error> {
2416        let path = Self::status_path(data_dir);
2417        match std::fs::read_to_string(&path) {
2418            Ok(content) => serde_json::from_str(&content)
2419                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
2420            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()),
2421            Err(e) => Err(e),
2422        }
2423    }
2424
2425    /// Save sync status to disk.
2426    ///
2427    /// Uses an atomic rename on Unix. On Windows, falls back to remove-then-rename
2428    /// because replacing an existing destination with `std::fs::rename` fails.
2429    pub fn save(&self, data_dir: &Path) -> Result<(), std::io::Error> {
2430        let path = Self::status_path(data_dir);
2431        if let Some(parent) = path.parent() {
2432            std::fs::create_dir_all(parent)?;
2433        }
2434        let content = serde_json::to_string_pretty(self)?;
2435        let tmp_path = write_sync_status_temp_file(&path, content.as_bytes())?;
2436        replace_file_from_temp(&tmp_path, &path)
2437    }
2438
2439    /// Update status for a source from a sync report.
2440    pub fn update(&mut self, source_name: &str, report: &SyncReport) {
2441        let previous_failures = self
2442            .get(source_name)
2443            .map(|info| info.consecutive_failures)
2444            .unwrap_or_default();
2445        let mut info = SourceSyncInfo::from_report(report);
2446        if report.all_succeeded {
2447            info.consecutive_failures = 0;
2448        } else {
2449            info.consecutive_failures = previous_failures.saturating_add(1);
2450        }
2451        self.set_info(source_name, info);
2452    }
2453
2454    /// Set status for a source from precomputed sync info.
2455    pub fn set_info(&mut self, source_name: &str, info: SourceSyncInfo) {
2456        self.sources.insert(source_name.to_string(), info);
2457    }
2458
2459    /// Drop sync status entries for sources that no longer exist.
2460    ///
2461    /// Returns `true` when at least one stale entry was removed.
2462    pub fn retain_sources<'a>(&mut self, source_names: impl IntoIterator<Item = &'a str>) -> bool {
2463        let allowed: std::collections::HashSet<&str> = source_names.into_iter().collect();
2464        let previous_len = self.sources.len();
2465        self.sources
2466            .retain(|source_name, _| allowed.contains(source_name.as_str()));
2467        self.sources.len() != previous_len
2468    }
2469
2470    /// Get sync info for a source.
2471    pub fn get(&self, source_name: &str) -> Option<&SourceSyncInfo> {
2472        self.sources.get(source_name)
2473    }
2474
2475    /// Evaluate automatic scheduling for one source at a deterministic timestamp.
2476    pub fn decision_for_source_at(
2477        &self,
2478        source: &SourceDefinition,
2479        now_ms: i64,
2480        manual_override: bool,
2481    ) -> SourceSyncDecision {
2482        SourceSyncDecision::evaluate(source, self.get(&source.name), now_ms, manual_override)
2483    }
2484
2485    /// Get the path to the status file.
2486    fn status_path(data_dir: &Path) -> PathBuf {
2487        data_dir.join("sync_status.json")
2488    }
2489}
2490
2491const SOURCE_HIGH_LATENCY_MS: u64 = 60_000;
2492const SOURCE_FAILURE_BACKOFF_BASE_MS: i64 = 5 * 60 * 1000;
2493const SOURCE_FAILURE_BACKOFF_MAX_MS: i64 = 60 * 60 * 1000;
2494
2495pub(crate) fn current_unix_ms() -> i64 {
2496    let now = std::time::SystemTime::now()
2497        .duration_since(std::time::UNIX_EPOCH)
2498        .unwrap_or_default()
2499        .as_millis();
2500    i64::try_from(now).unwrap_or(i64::MAX)
2501}
2502
2503fn sync_schedule_period_ms(schedule: SyncSchedule) -> Option<i64> {
2504    match schedule {
2505        SyncSchedule::Manual => None,
2506        SyncSchedule::Hourly => Some(60 * 60 * 1000),
2507        SyncSchedule::Daily => Some(24 * 60 * 60 * 1000),
2508    }
2509}
2510
2511fn sync_schedule_due(last_sync: Option<i64>, period_ms: Option<i64>, now_ms: i64) -> bool {
2512    match (last_sync, period_ms) {
2513        (None, _) => true,
2514        (Some(_), None) => false,
2515        (Some(last_sync), Some(period_ms)) => last_sync.saturating_add(period_ms) <= now_ms,
2516    }
2517}
2518
2519fn automatic_source_sync_action(
2520    schedule: SyncSchedule,
2521    health: SourceHealthKind,
2522    info: Option<&SourceSyncInfo>,
2523    now_ms: i64,
2524) -> SourceSyncAction {
2525    match health {
2526        SourceHealthKind::AuthFailed | SourceHealthKind::BackingOff => SourceSyncAction::Defer,
2527        _ if matches!(schedule, SyncSchedule::Manual) => SourceSyncAction::Skip,
2528        SourceHealthKind::NeverSynced | SourceHealthKind::Stale => SourceSyncAction::Sync,
2529        SourceHealthKind::Flapping | SourceHealthKind::HighLatency => {
2530            if sync_schedule_due(
2531                info.and_then(|info| info.last_sync),
2532                sync_schedule_period_ms(schedule),
2533                now_ms,
2534            ) {
2535                SourceSyncAction::Sync
2536            } else {
2537                SourceSyncAction::Skip
2538            }
2539        }
2540        SourceHealthKind::Healthy => {
2541            if sync_schedule_due(
2542                info.and_then(|info| info.last_sync),
2543                sync_schedule_period_ms(schedule),
2544                now_ms,
2545            ) {
2546                SourceSyncAction::Sync
2547            } else {
2548                SourceSyncAction::Skip
2549            }
2550        }
2551    }
2552}
2553
2554fn health_score_for_source(health: SourceHealthKind) -> u8 {
2555    match health {
2556        SourceHealthKind::Healthy => 100,
2557        SourceHealthKind::Stale => 75,
2558        SourceHealthKind::NeverSynced => 65,
2559        SourceHealthKind::HighLatency => 55,
2560        SourceHealthKind::Flapping => 40,
2561        SourceHealthKind::BackingOff => 25,
2562        SourceHealthKind::AuthFailed => 10,
2563    }
2564}
2565
2566fn stale_value_score_for_source(
2567    schedule: SyncSchedule,
2568    staleness_ms: Option<i64>,
2569    info: Option<&SourceSyncInfo>,
2570) -> u8 {
2571    let Some(info) = info else {
2572        return 100;
2573    };
2574    if info.last_sync.is_none() {
2575        return 100;
2576    }
2577
2578    let Some(staleness_ms) = staleness_ms else {
2579        return 100;
2580    };
2581
2582    let Some(period_ms) = sync_schedule_period_ms(schedule) else {
2583        return 0;
2584    };
2585
2586    let score = staleness_ms.saturating_mul(100) / period_ms.max(1);
2587    u8::try_from(score.clamp(0, 100)).unwrap_or(100)
2588}
2589
2590fn failure_backoff_until_ms(info: &SourceSyncInfo) -> Option<i64> {
2591    if info.consecutive_failures == 0 {
2592        return None;
2593    }
2594    let last_sync = info.last_sync?;
2595    let exponent = info.consecutive_failures.saturating_sub(1).min(4);
2596    let multiplier = 1_i64.checked_shl(exponent).unwrap_or(16);
2597    let backoff_ms = SOURCE_FAILURE_BACKOFF_BASE_MS
2598        .saturating_mul(multiplier)
2599        .min(SOURCE_FAILURE_BACKOFF_MAX_MS);
2600    Some(last_sync.saturating_add(backoff_ms))
2601}
2602
2603fn sync_result_auth_failure(result: &SyncResult) -> bool {
2604    let Some(error) = result.error_message() else {
2605        return false;
2606    };
2607    let error = error.to_ascii_lowercase();
2608    error.contains("permission denied")
2609        || error.contains("authentication")
2610        || error.contains("host key verification failed")
2611        || error.contains("known_hosts")
2612        || error.contains("no valid authentication")
2613}
2614
2615fn unique_atomic_temp_path(path: &Path) -> PathBuf {
2616    unique_atomic_sidecar_path(path, "tmp", "sync_status.json")
2617}
2618
2619fn write_sync_status_temp_file(
2620    final_path: &Path,
2621    content: &[u8],
2622) -> Result<PathBuf, std::io::Error> {
2623    for _ in 0..100 {
2624        let tmp_path = unique_atomic_temp_path(final_path);
2625        match write_sync_status_temp_file_at(&tmp_path, content) {
2626            Ok(()) => return Ok(tmp_path),
2627            Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => continue,
2628            Err(err) => return Err(err),
2629        }
2630    }
2631
2632    Err(std::io::Error::new(
2633        std::io::ErrorKind::AlreadyExists,
2634        format!(
2635            "failed to allocate unique sync status temp path for {}",
2636            final_path.display()
2637        ),
2638    ))
2639}
2640
2641fn write_sync_status_temp_file_at(path: &Path, content: &[u8]) -> Result<(), std::io::Error> {
2642    let mut file = std::fs::OpenOptions::new()
2643        .write(true)
2644        .create_new(true)
2645        .open(path)?;
2646    file.write_all(content)?;
2647    file.sync_all()
2648}
2649
2650fn replace_file_from_temp(temp_path: &Path, final_path: &Path) -> Result<(), std::io::Error> {
2651    #[cfg(windows)]
2652    {
2653        match std::fs::rename(temp_path, final_path) {
2654            Ok(()) => sync_parent_directory(final_path),
2655            Err(first_err)
2656                if final_path.exists()
2657                    && matches!(
2658                        first_err.kind(),
2659                        std::io::ErrorKind::AlreadyExists | std::io::ErrorKind::PermissionDenied
2660                    ) =>
2661            {
2662                let backup_path = unique_replace_backup_path(final_path);
2663                std::fs::rename(final_path, &backup_path).map_err(|backup_err| {
2664                    let _ = std::fs::remove_file(temp_path);
2665                    std::io::Error::other(format!(
2666                        "failed preparing backup {} before replacing {}: first error: {}; backup error: {}",
2667                        backup_path.display(),
2668                        final_path.display(),
2669                        first_err,
2670                        backup_err
2671                    ))
2672                })?;
2673                match std::fs::rename(temp_path, final_path) {
2674                    Ok(()) => {
2675                        let _ = std::fs::remove_file(&backup_path);
2676                        sync_parent_directory(final_path)
2677                    }
2678                    Err(second_err) => {
2679                        let restore_result = std::fs::rename(&backup_path, final_path);
2680                        match restore_result {
2681                            Ok(()) => {
2682                                let _ = std::fs::remove_file(temp_path);
2683                                sync_parent_directory(final_path).map_err(|sync_err| {
2684                                    std::io::Error::other(format!(
2685                                        "failed replacing {} with {}: first error: {}; second error: {}; restored original file but failed syncing parent directory: {}",
2686                                        final_path.display(),
2687                                        temp_path.display(),
2688                                        first_err,
2689                                        second_err,
2690                                        sync_err
2691                                    ))
2692                                })?;
2693                                Err(std::io::Error::new(
2694                                    second_err.kind(),
2695                                    format!(
2696                                        "failed replacing {} with {}: first error: {}; second error: {}; restored original file",
2697                                        final_path.display(),
2698                                        temp_path.display(),
2699                                        first_err,
2700                                        second_err
2701                                    ),
2702                                ))
2703                            }
2704                            Err(restore_err) => Err(std::io::Error::other(format!(
2705                                "failed replacing {} with {}: first error: {}; second error: {}; restore error: {}; temp file retained at {}",
2706                                final_path.display(),
2707                                temp_path.display(),
2708                                first_err,
2709                                second_err,
2710                                restore_err,
2711                                temp_path.display()
2712                            ))),
2713                        }
2714                    }
2715                }
2716            }
2717            Err(rename_err) => Err(rename_err),
2718        }
2719    }
2720
2721    #[cfg(not(windows))]
2722    {
2723        std::fs::rename(temp_path, final_path)?;
2724        sync_parent_directory(final_path)
2725    }
2726}
2727
2728fn sync_file_path(path: &Path) -> Result<(), std::io::Error> {
2729    std::fs::File::open(path)?.sync_all()
2730}
2731
2732#[cfg(not(windows))]
2733fn sync_parent_directory(path: &Path) -> Result<(), std::io::Error> {
2734    let Some(parent) = path.parent() else {
2735        return Ok(());
2736    };
2737    std::fs::File::open(parent)?.sync_all()
2738}
2739
2740#[cfg(windows)]
2741fn sync_parent_directory(_path: &Path) -> Result<(), std::io::Error> {
2742    Ok(())
2743}
2744
2745#[cfg(windows)]
2746fn unique_replace_backup_path(path: &Path) -> PathBuf {
2747    unique_atomic_sidecar_path(path, "bak", "sync_status.json")
2748}
2749
2750fn unique_atomic_sidecar_path(path: &Path, suffix: &str, fallback_name: &str) -> PathBuf {
2751    static NEXT_NONCE: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
2752
2753    let timestamp = std::time::SystemTime::now()
2754        .duration_since(std::time::UNIX_EPOCH)
2755        .unwrap_or_default()
2756        .as_nanos();
2757    let nonce = NEXT_NONCE.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2758    let file_name = path
2759        .file_name()
2760        .and_then(|name| name.to_str())
2761        .unwrap_or(fallback_name);
2762
2763    path.with_file_name(format!(
2764        ".{file_name}.{suffix}.{}.{}.{}",
2765        std::process::id(),
2766        timestamp,
2767        nonce
2768    ))
2769}
2770
2771#[cfg(test)]
2772mod tests {
2773    use super::*;
2774    use tempfile::TempDir;
2775
2776    #[test]
2777    fn test_path_to_safe_dirname() {
2778        let res = path_to_safe_dirname("~/.claude/projects");
2779        assert!(res.starts_with(".claude_projects_"));
2780
2781        let res = path_to_safe_dirname("/home/user/data");
2782        assert!(res.starts_with("home_user_data_"));
2783
2784        let res = path_to_safe_dirname("~/");
2785        assert!(res.starts_with("root_"));
2786
2787        let res = path_to_safe_dirname("");
2788        assert!(res.starts_with("root_"));
2789    }
2790
2791    #[test]
2792    fn test_path_to_safe_dirname_empty() {
2793        let res = path_to_safe_dirname("~");
2794        assert!(res.starts_with("root_"));
2795
2796        let res = path_to_safe_dirname("/");
2797        assert!(res.starts_with("root_"));
2798    }
2799
2800    #[test]
2801    fn test_path_to_safe_dirname_strips_traversal_components() {
2802        let res = path_to_safe_dirname("../../etc/passwd");
2803
2804        assert!(res.starts_with("etc_passwd_"));
2805        assert!(!res.contains(".."));
2806        assert!(!res.contains('/'));
2807        assert!(!res.contains('\\'));
2808    }
2809
2810    #[test]
2811    fn test_get_remote_home_rejects_unsafe_hosts_before_ssh() {
2812        let temp = TempDir::new().unwrap();
2813        let engine = SyncEngine::new(temp.path());
2814
2815        for host in [
2816            "work-mac;touch /tmp/cass-owned",
2817            "work mac",
2818            "work-mac\nhostname",
2819            "work-mac`hostname`",
2820            "work-mac/../../secret",
2821            "-oProxyCommand=evil",
2822            "",
2823            "@host",
2824            "user@",
2825            "user@host@extra",
2826        ] {
2827            let err = engine.get_remote_home(host).unwrap_err();
2828            assert!(
2829                matches!(err, SyncError::SshFailed(ref message) if message.contains("Invalid characters in host")),
2830                "expected invalid-host rejection for {host:?}, got {err}"
2831            );
2832        }
2833    }
2834
2835    #[test]
2836    fn test_sync_source_rejects_invalid_source_name_before_mirror_creation() {
2837        let temp = TempDir::new().unwrap();
2838        let engine = SyncEngine::new(temp.path());
2839        let mut source = SourceDefinition::ssh("../escape", "user@host");
2840        source.paths = vec!["/tmp/sessions".to_string()];
2841
2842        let err = engine
2843            .sync_source(&source)
2844            .expect_err("invalid source name should fail before local writes");
2845
2846        assert!(
2847            matches!(err, SyncError::InvalidSource(ref message) if message.contains("Source name cannot contain path separators")),
2848            "expected invalid source-name rejection, got {err}"
2849        );
2850        assert!(
2851            !temp.path().join("escape").exists(),
2852            "invalid source name must not escape the remotes mirror layout"
2853        );
2854        assert!(
2855            !temp.path().join("remotes").exists(),
2856            "invalid source name must be rejected before creating mirror roots"
2857        );
2858    }
2859
2860    #[test]
2861    fn test_sync_source_rejects_invalid_host_before_mirror_creation() {
2862        let temp = TempDir::new().unwrap();
2863        let engine = SyncEngine::new(temp.path());
2864        let mut source = SourceDefinition::ssh("unsafe-host", "user@host withspace");
2865        source.paths = vec!["/tmp/sessions".to_string()];
2866
2867        let err = engine
2868            .sync_source(&source)
2869            .expect_err("invalid host should fail before local writes");
2870
2871        assert!(
2872            matches!(err, SyncError::InvalidSource(ref message) if message.contains("SSH host cannot contain whitespace")),
2873            "expected invalid host rejection, got {err}"
2874        );
2875        assert!(
2876            !temp.path().join("remotes").exists(),
2877            "invalid host must be rejected before creating mirror roots"
2878        );
2879    }
2880
2881    #[test]
2882    fn test_sync_source_reports_invalid_remote_paths_without_transfer() {
2883        let temp = TempDir::new().unwrap();
2884        let engine = SyncEngine::new(temp.path());
2885
2886        for (path, expected) in [
2887            ("", "paths[0] cannot be empty"),
2888            ("   ", "paths[0] cannot be empty"),
2889            (" ~/.claude/projects", "paths[0] cannot have leading"),
2890            ("~/.claude/projects ", "paths[0] cannot have leading"),
2891            ("~/.claude\nprojects", "paths[0] cannot contain control"),
2892        ] {
2893            let mut source = SourceDefinition::ssh("laptop", "user@laptop.local");
2894            source.paths = vec![path.to_string()];
2895
2896            let report = engine.sync_source(&source).unwrap();
2897            assert_eq!(report.path_results.len(), 1);
2898            let result = &report.path_results[0];
2899            assert!(!result.success);
2900            assert_eq!(result.remote_path, path);
2901            assert!(
2902                result
2903                    .error
2904                    .as_deref()
2905                    .is_some_and(|message| message.contains(expected)),
2906                "expected invalid path rejection for {path:?}, got {result:?}"
2907            );
2908        }
2909    }
2910
2911    #[test]
2912    fn test_remote_sync_path_validation_allows_internal_spaces() {
2913        assert!(
2914            validate_remote_sync_path_entry(
2915                0,
2916                "~/Library/Application Support/Cursor/User/globalStorage"
2917            )
2918            .is_ok()
2919        );
2920    }
2921
2922    #[test]
2923    fn test_sync_source_preserves_path_result_order_for_mixed_invalid_paths() {
2924        let temp = TempDir::new().unwrap();
2925        let engine = SyncEngine::new(temp.path()).with_connection_timeout(1);
2926        // Use a validation-safe TEST-NET host so source structure checks pass,
2927        // but remote-home lookup still fails quickly before path result ordering.
2928        let mut source = SourceDefinition::ssh("laptop", "192.0.2.1");
2929        source.paths = vec![
2930            "~/.codex/sessions".to_string(),
2931            " ~/.claude/projects".to_string(),
2932            "~/.gemini/tmp".to_string(),
2933        ];
2934
2935        let report = engine.sync_source(&source).unwrap();
2936        let remote_paths = report
2937            .path_results
2938            .iter()
2939            .map(|result| result.remote_path.as_str())
2940            .collect::<Vec<_>>();
2941
2942        assert_eq!(
2943            remote_paths,
2944            vec!["~/.codex/sessions", " ~/.claude/projects", "~/.gemini/tmp"]
2945        );
2946        assert!(
2947            report.path_results[1]
2948                .error
2949                .as_deref()
2950                .is_some_and(|message| message.contains("paths[1] cannot have leading")),
2951            "expected invalid path error in original slot: {:?}",
2952            report.path_results
2953        );
2954    }
2955
2956    #[test]
2957    fn test_remote_find_regular_files_command_uses_physical_traversal() {
2958        assert_eq!(
2959            remote_find_regular_files_command("/tmp/has space"),
2960            "find -P '/tmp/has space' -type f -print0"
2961        );
2962        assert_eq!(
2963            remote_find_regular_files_command("/tmp/that's all"),
2964            "find -P '/tmp/that'\\''s all' -type f -print0"
2965        );
2966    }
2967
2968    #[test]
2969    fn test_parse_remote_home_stdout_accepts_single_absolute_candidate() {
2970        assert_eq!(
2971            parse_remote_home_stdout(b"Welcome to host\nCASS_HOME_MARKER:/home/user\n"),
2972            Some("/home/user".to_string())
2973        );
2974        assert_eq!(
2975            parse_remote_home_stdout(b"CASS_HOME_MARKER:/Users/test user\r\n"),
2976            Some("/Users/test user".to_string())
2977        );
2978    }
2979
2980    #[test]
2981    fn test_parse_remote_home_stdout_rejects_missing_or_ambiguous_home() {
2982        assert_eq!(parse_remote_home_stdout(b"Welcome to host\n"), None);
2983        assert_eq!(
2984            parse_remote_home_stdout(b"CASS_HOME_MARKER:not_absolute\n"),
2985            None
2986        );
2987    }
2988
2989    #[test]
2990    fn test_parse_null_terminated_utf8_paths_skips_invalid_entries() {
2991        let paths = parse_null_terminated_utf8_paths(
2992            b"/remote/sessions/a.jsonl\0bad-\xff-name\0/remote/sessions/b.jsonl\0",
2993        );
2994        assert_eq!(
2995            paths,
2996            vec![
2997                "/remote/sessions/a.jsonl".to_string(),
2998                "/remote/sessions/b.jsonl".to_string()
2999            ]
3000        );
3001    }
3002
3003    #[test]
3004    fn test_remote_file_to_safe_local_path_rejects_outside_root() {
3005        let root = Path::new("/remote/sessions");
3006        let local = Path::new("/mirror/root");
3007
3008        assert_eq!(
3009            remote_file_to_safe_local_path(
3010                root,
3011                Path::new("/remote/sessions/a/b.jsonl"),
3012                local,
3013                "sessions"
3014            ),
3015            Some(PathBuf::from("/mirror/root/sessions/a/b.jsonl"))
3016        );
3017        assert_eq!(
3018            remote_file_to_safe_local_path(
3019                Path::new("/remote/session.jsonl"),
3020                Path::new("/remote/session.jsonl"),
3021                local,
3022                "session.jsonl"
3023            ),
3024            Some(PathBuf::from("/mirror/root/session.jsonl"))
3025        );
3026        assert_eq!(
3027            remote_file_to_safe_local_path(
3028                root,
3029                Path::new("/remote/sessions/../secret.txt"),
3030                local,
3031                "sessions"
3032            ),
3033            None
3034        );
3035        assert_eq!(
3036            remote_file_to_safe_local_path(
3037                root,
3038                Path::new("/remote/other/secret.txt"),
3039                local,
3040                "sessions"
3041            ),
3042            None
3043        );
3044    }
3045
3046    #[test]
3047    fn test_local_symlink_guard_allows_regular_paths() {
3048        let temp = TempDir::new().expect("tempdir");
3049        let root = temp.path().join("mirror");
3050        let target = root.join("sessions/session.jsonl");
3051
3052        assert!(reject_local_symlink_below_root(&root, &target).is_ok());
3053
3054        std::fs::create_dir_all(target.parent().expect("target parent")).expect("create parent");
3055        std::fs::write(&target, "{}").expect("write target");
3056
3057        assert!(reject_local_symlink_below_root(&root, &target).is_ok());
3058    }
3059
3060    #[cfg(unix)]
3061    #[test]
3062    fn test_local_symlink_guard_rejects_nested_symlink() {
3063        use std::os::unix::fs::symlink;
3064
3065        let temp = TempDir::new().expect("tempdir");
3066        let root = temp.path().join("mirror");
3067        let outside = temp.path().join("outside");
3068        std::fs::create_dir_all(&root).expect("create root");
3069        std::fs::create_dir_all(&outside).expect("create outside");
3070        symlink(&outside, root.join("sessions")).expect("symlink nested dir");
3071
3072        let err = reject_local_symlink_below_root(&root, &root.join("sessions/session.jsonl"))
3073            .expect_err("nested symlink should be rejected");
3074
3075        assert!(err.contains("Refusing to write"));
3076        assert!(err.contains("sessions"));
3077    }
3078
3079    #[cfg(unix)]
3080    #[test]
3081    fn test_local_symlink_guard_rejects_root_symlink() {
3082        use std::os::unix::fs::symlink;
3083
3084        let temp = TempDir::new().expect("tempdir");
3085        let outside = temp.path().join("outside");
3086        let root = temp.path().join("mirror-link");
3087        std::fs::create_dir_all(&outside).expect("create outside");
3088        symlink(&outside, &root).expect("symlink root");
3089
3090        let err = reject_local_symlink_below_root(&root, &root.join("session.jsonl"))
3091            .expect_err("root symlink should be rejected");
3092
3093        assert!(err.contains("Refusing to write"));
3094        assert!(err.contains("mirror-link"));
3095    }
3096
3097    #[test]
3098    fn test_prepare_local_sync_container_creates_regular_container() {
3099        let temp = TempDir::new().expect("tempdir");
3100        let root = temp.path().join("mirror");
3101        let target = root.join("sessions");
3102
3103        prepare_local_sync_container(&root, &target).expect("regular container should be created");
3104
3105        assert!(target.is_dir());
3106    }
3107
3108    #[cfg(unix)]
3109    #[test]
3110    fn test_prepare_local_sync_container_rejects_preexisting_target_symlink() {
3111        use std::os::unix::fs::symlink;
3112
3113        let temp = TempDir::new().expect("tempdir");
3114        let root = temp.path().join("mirror");
3115        let outside = temp.path().join("outside");
3116        let target = root.join("sessions");
3117        std::fs::create_dir_all(&root).expect("create root");
3118        std::fs::create_dir_all(&outside).expect("create outside");
3119        symlink(&outside, &target).expect("symlink target");
3120
3121        let err = prepare_local_sync_container(&root, &target)
3122            .expect_err("sync container symlink should be rejected");
3123
3124        assert!(err.contains("Refusing to write"));
3125        assert!(err.contains("sessions"));
3126    }
3127
3128    #[cfg(unix)]
3129    #[test]
3130    fn test_prepare_local_sync_container_rejects_root_symlink() {
3131        use std::os::unix::fs::symlink;
3132
3133        let temp = TempDir::new().expect("tempdir");
3134        let outside = temp.path().join("outside");
3135        let root = temp.path().join("mirror-link");
3136        let target = root.join("sessions");
3137        std::fs::create_dir_all(&outside).expect("create outside");
3138        symlink(&outside, &root).expect("symlink root");
3139
3140        let err = prepare_local_sync_container(&root, &target)
3141            .expect_err("sync root symlink should be rejected");
3142
3143        assert!(err.contains("Refusing to write"));
3144        assert!(err.contains("mirror-link"));
3145    }
3146
3147    #[cfg(unix)]
3148    #[test]
3149    fn test_prepare_local_sync_root_rejects_symlinked_source_parent() {
3150        use std::os::unix::fs::symlink;
3151
3152        let temp = TempDir::new().expect("tempdir");
3153        let local_store = temp.path().join("data");
3154        let remotes = local_store.join("remotes");
3155        let outside = temp.path().join("outside");
3156        let source_link = remotes.join("laptop");
3157        let mirror_dir = source_link.join("mirror");
3158
3159        std::fs::create_dir_all(&remotes).expect("create remotes");
3160        std::fs::create_dir_all(&outside).expect("create outside");
3161        symlink(&outside, &source_link).expect("symlink source parent");
3162
3163        let err = prepare_local_sync_root(&local_store, &mirror_dir)
3164            .expect_err("symlinked source parent should be rejected before mkdir");
3165
3166        assert!(err.contains("Refusing to write"));
3167        assert!(err.contains("laptop"));
3168        assert!(
3169            !outside.join("mirror").exists(),
3170            "sync root preparation must not create directories through source parent symlinks"
3171        );
3172    }
3173
3174    #[test]
3175    fn test_sftp_file_stat_is_symlink_detects_link_modes() {
3176        let symlink = FileStat {
3177            size: None,
3178            uid: None,
3179            gid: None,
3180            perm: Some(0o120000 | 0o777),
3181            atime: None,
3182            mtime: None,
3183        };
3184        let regular = FileStat {
3185            size: None,
3186            uid: None,
3187            gid: None,
3188            perm: Some(0o100000 | 0o644),
3189            atime: None,
3190            mtime: None,
3191        };
3192
3193        assert!(sftp_file_stat_is_symlink(&symlink));
3194        assert!(!sftp_file_stat_is_symlink(&regular));
3195    }
3196
3197    #[test]
3198    fn test_sftp_entry_file_name_accepts_regular_names() {
3199        let parent = Path::new("/remote");
3200        let entry = parent.join("session.jsonl");
3201
3202        assert_eq!(sftp_entry_file_name(&entry, parent), Some("session.jsonl"));
3203    }
3204
3205    #[test]
3206    fn test_sftp_entry_file_name_skips_dot_entries() {
3207        let parent = Path::new("/remote");
3208
3209        assert_eq!(sftp_entry_file_name(Path::new("."), parent), None);
3210        assert_eq!(sftp_entry_file_name(Path::new(".."), parent), None);
3211    }
3212
3213    #[cfg(unix)]
3214    #[test]
3215    fn test_sftp_entry_file_name_rejects_non_utf8_names() {
3216        use std::ffi::OsStr;
3217        use std::os::unix::ffi::OsStrExt;
3218
3219        let parent = Path::new("/remote");
3220        let bad_component = Path::new(OsStr::from_bytes(b"bad-\xff-name"));
3221        let entry = parent.join(bad_component);
3222
3223        assert_eq!(sftp_entry_file_name(&entry, parent), None);
3224    }
3225
3226    #[test]
3227    fn test_parse_rsync_stats() {
3228        let output = r#"
3229Number of files: 42
3230Number of regular files transferred: 10
3231Total transferred file size: 1,234 bytes
3232        "#;
3233
3234        let stats = parse_rsync_stats(output);
3235        assert_eq!(stats.files_transferred, 10);
3236        assert_eq!(stats.bytes_transferred, 1234);
3237    }
3238
3239    #[test]
3240    fn test_parse_rsync_stats_empty() {
3241        let stats = parse_rsync_stats("");
3242        assert_eq!(stats.files_transferred, 0);
3243        assert_eq!(stats.bytes_transferred, 0);
3244    }
3245
3246    #[test]
3247    fn test_quote_remote_shell_path_handles_spaces_and_quotes() {
3248        assert_eq!(
3249            quote_remote_shell_path("/Users/me/Library/Application Support/Cursor"),
3250            "'/Users/me/Library/Application Support/Cursor'"
3251        );
3252        assert_eq!(
3253            quote_remote_shell_path("/tmp/that's all"),
3254            "'/tmp/that'\\''s all'"
3255        );
3256    }
3257
3258    #[test]
3259    fn test_remote_spec_for_rsync_quotes_only_when_needed() {
3260        assert_eq!(
3261            remote_spec_for_rsync("work-mac", "/tmp/has space", true),
3262            "work-mac:/tmp/has space"
3263        );
3264        assert_eq!(
3265            remote_spec_for_rsync("work-mac", "/tmp/that's all", true),
3266            "work-mac:/tmp/that's all"
3267        );
3268        assert_eq!(
3269            remote_spec_for_rsync("work-mac", "/tmp/has space", false),
3270            "work-mac:'/tmp/has space'"
3271        );
3272    }
3273
3274    #[test]
3275    fn rsync_arg_protection_enum_maps_flags_correctly() {
3276        // Regression for #191: Homebrew rsync 3.4.1 renamed the flag to
3277        // --secluded-args; earlier 3.0–3.3 use --protect-args. The caller
3278        // must pass the name the installed rsync actually accepts in its
3279        // own --help listing.
3280        assert_eq!(
3281            RsyncArgProtection::ProtectArgs.flag(),
3282            Some("--protect-args")
3283        );
3284        assert_eq!(
3285            RsyncArgProtection::SecludedArgs.flag(),
3286            Some("--secluded-args")
3287        );
3288        assert_eq!(RsyncArgProtection::None.flag(), None);
3289        assert!(RsyncArgProtection::ProtectArgs.is_supported());
3290        assert!(RsyncArgProtection::SecludedArgs.is_supported());
3291        assert!(!RsyncArgProtection::None.is_supported());
3292    }
3293
3294    #[test]
3295    fn test_remote_spec_for_shell_bound_copy_quotes_remote_path() {
3296        assert_eq!(
3297            remote_spec_for_shell_bound_copy("work-mac", "/tmp/has space"),
3298            "work-mac:'/tmp/has space'"
3299        );
3300    }
3301
3302    #[test]
3303    fn test_remote_spec_for_scp_always_quotes_remote_path() {
3304        assert_eq!(
3305            remote_spec_for_scp("work-mac", "/tmp/that's all"),
3306            "work-mac:'/tmp/that'\\''s all'"
3307        );
3308    }
3309
3310    #[test]
3311    fn test_sync_report_totals() {
3312        let mut report = SyncReport::new("test", SyncMethod::Rsync);
3313        report.add_path_result(PathSyncResult {
3314            files_transferred: 5,
3315            bytes_transferred: 100,
3316            success: true,
3317            ..Default::default()
3318        });
3319        report.add_path_result(PathSyncResult {
3320            files_transferred: 3,
3321            bytes_transferred: 50,
3322            success: true,
3323            ..Default::default()
3324        });
3325
3326        assert_eq!(report.total_files(), 8);
3327        assert_eq!(report.total_bytes(), 150);
3328        assert!(report.all_succeeded);
3329    }
3330
3331    #[test]
3332    fn test_sync_report_with_failure() {
3333        let mut report = SyncReport::new("test", SyncMethod::Rsync);
3334        report.add_path_result(PathSyncResult {
3335            success: true,
3336            ..Default::default()
3337        });
3338        report.add_path_result(PathSyncResult {
3339            success: false,
3340            error: Some("Connection refused".into()),
3341            ..Default::default()
3342        });
3343
3344        assert!(!report.all_succeeded);
3345        assert_eq!(report.successful_paths(), 1);
3346        assert_eq!(report.failed_paths(), 1);
3347    }
3348
3349    #[test]
3350    fn test_detect_sync_method() {
3351        // This test is platform-dependent but should at least not panic
3352        let method = SyncEngine::detect_sync_method();
3353        assert!(matches!(
3354            method,
3355            SyncMethod::Rsync | SyncMethod::WslRsync | SyncMethod::Scp | SyncMethod::Sftp
3356        ));
3357    }
3358
3359    #[test]
3360    fn test_sync_engine_mirror_dir() {
3361        let engine = SyncEngine::new(Path::new("/data/cass"));
3362        let mirror = engine.mirror_dir("laptop");
3363        assert_eq!(mirror, PathBuf::from("/data/cass/remotes/laptop/mirror"));
3364    }
3365
3366    #[test]
3367    fn test_sync_method_display() {
3368        for (method, expected) in [
3369            (SyncMethod::Rsync, "rsync"),
3370            (SyncMethod::WslRsync, "wsl-rsync"),
3371            (SyncMethod::Scp, "scp"),
3372            (SyncMethod::Sftp, "sftp"),
3373        ] {
3374            assert_eq!(method.as_str(), expected);
3375            assert_eq!(method.to_string(), expected);
3376        }
3377    }
3378
3379    #[test]
3380    fn test_windows_path_to_wsl_drive() {
3381        assert_eq!(
3382            windows_path_to_wsl("C:\\Users\\george\\AppData\\Roaming\\cass"),
3383            "/mnt/c/Users/george/AppData/Roaming/cass"
3384        );
3385    }
3386
3387    #[test]
3388    fn test_windows_path_to_wsl_forward_slash() {
3389        assert_eq!(
3390            windows_path_to_wsl("C:/Users/george/data"),
3391            "/mnt/c/Users/george/data"
3392        );
3393    }
3394
3395    #[test]
3396    fn test_windows_path_to_wsl_non_windows_path_unchanged() {
3397        // A Unix absolute path should pass through unchanged.
3398        assert_eq!(
3399            windows_path_to_wsl("/home/george/data"),
3400            "/home/george/data"
3401        );
3402    }
3403
3404    #[test]
3405    fn test_expand_tilde_with_home() {
3406        // No tilde - returns unchanged
3407        assert_eq!(
3408            SyncEngine::expand_tilde_with_home("/home/user/projects", Some("/home/user")),
3409            "/home/user/projects"
3410        );
3411
3412        // Tilde with home provided
3413        assert_eq!(
3414            SyncEngine::expand_tilde_with_home("~/.claude/projects", Some("/home/user")),
3415            "/home/user/.claude/projects"
3416        );
3417
3418        // Just tilde
3419        assert_eq!(
3420            SyncEngine::expand_tilde_with_home("~", Some("/home/user")),
3421            "/home/user"
3422        );
3423
3424        // Tilde without home - returns unchanged
3425        assert_eq!(
3426            SyncEngine::expand_tilde_with_home("~/.claude/projects", None),
3427            "~/.claude/projects"
3428        );
3429
3430        // ~otheruser/path case - not expanded
3431        assert_eq!(
3432            SyncEngine::expand_tilde_with_home("~otheruser/projects", Some("/home/user")),
3433            "~otheruser/projects"
3434        );
3435    }
3436
3437    #[test]
3438    fn test_sync_report_failed() {
3439        let report = SyncReport::failed("test-source", SyncError::NoHost);
3440        assert_eq!(report.source_name, "test-source");
3441        assert!(!report.all_succeeded);
3442        assert_eq!(report.path_results.len(), 1);
3443        assert!(!report.path_results[0].success);
3444        assert!(report.path_results[0].error.is_some());
3445    }
3446
3447    #[test]
3448    fn test_sync_result_default() {
3449        let result = SyncResult::default();
3450        assert!(matches!(result, SyncResult::Skipped));
3451        assert_eq!(result.label(), "never");
3452    }
3453
3454    #[test]
3455    fn test_source_sync_info_default() {
3456        let info = SourceSyncInfo::default();
3457        assert!(info.last_sync.is_none());
3458        assert_eq!(info.files_synced, 0);
3459        assert_eq!(info.bytes_transferred, 0);
3460        assert_eq!(info.duration_ms, 0);
3461    }
3462
3463    #[test]
3464    fn test_sync_status_update() {
3465        let mut status = SyncStatus::default();
3466
3467        let mut report = SyncReport::new("laptop", SyncMethod::Rsync);
3468        report.add_path_result(PathSyncResult {
3469            files_transferred: 10,
3470            bytes_transferred: 1000,
3471            success: true,
3472            ..Default::default()
3473        });
3474        report.total_duration_ms = 500;
3475
3476        status.update("laptop", &report);
3477
3478        let info = status.get("laptop").unwrap();
3479        assert!(info.last_sync.is_some());
3480        assert!(matches!(info.last_result, SyncResult::Success));
3481        assert_eq!(info.files_synced, 10);
3482        assert_eq!(info.bytes_transferred, 1000);
3483        assert_eq!(info.duration_ms, 500);
3484    }
3485
3486    #[test]
3487    fn test_sync_status_partial_failure() {
3488        let mut status = SyncStatus::default();
3489
3490        let mut report = SyncReport::new("server", SyncMethod::Rsync);
3491        report.add_path_result(PathSyncResult {
3492            success: true,
3493            files_transferred: 5,
3494            ..Default::default()
3495        });
3496        report.add_path_result(PathSyncResult {
3497            success: false,
3498            error: Some("Connection refused".into()),
3499            ..Default::default()
3500        });
3501
3502        status.update("server", &report);
3503
3504        let info = status.get("server").unwrap();
3505        assert!(matches!(info.last_result, SyncResult::PartialFailure(_)));
3506    }
3507
3508    #[test]
3509    fn test_sync_status_full_failure() {
3510        let mut status = SyncStatus::default();
3511
3512        let mut report = SyncReport::new("dead-host", SyncMethod::Rsync);
3513        report.add_path_result(PathSyncResult {
3514            success: false,
3515            error: Some("Host unreachable".into()),
3516            ..Default::default()
3517        });
3518
3519        status.update("dead-host", &report);
3520
3521        let info = status.get("dead-host").unwrap();
3522        assert!(matches!(info.last_result, SyncResult::Failed(_)));
3523    }
3524
3525    #[test]
3526    fn test_sync_status_save_round_trips() {
3527        let temp = TempDir::new().expect("tempdir");
3528        let mut status = SyncStatus::default();
3529        let mut report = SyncReport::new("laptop", SyncMethod::Rsync);
3530        report.add_path_result(PathSyncResult {
3531            files_transferred: 3,
3532            bytes_transferred: 42,
3533            success: true,
3534            ..Default::default()
3535        });
3536        status.update("laptop", &report);
3537
3538        status.save(temp.path()).expect("save status");
3539        let loaded = SyncStatus::load(temp.path()).expect("load status");
3540
3541        let info = loaded.get("laptop").expect("round-tripped source");
3542        assert_eq!(info.files_synced, 3);
3543        assert_eq!(info.bytes_transferred, 42);
3544        assert!(matches!(info.last_result, SyncResult::Success));
3545    }
3546
3547    #[cfg(unix)]
3548    #[test]
3549    fn test_sync_status_temp_write_refuses_existing_symlink() {
3550        use std::os::unix::fs::symlink;
3551
3552        let temp = TempDir::new().expect("tempdir");
3553        let protected = temp.path().join("protected.json");
3554        let temp_path = temp.path().join(".sync_status.json.tmp");
3555
3556        std::fs::write(&protected, b"protected").expect("write protected target");
3557        symlink(&protected, &temp_path).expect("create temp symlink");
3558
3559        let err = write_sync_status_temp_file_at(&temp_path, br#"{"sources":{}}"#)
3560            .expect_err("existing temp symlink must be rejected");
3561
3562        assert_eq!(err.kind(), std::io::ErrorKind::AlreadyExists);
3563        assert_eq!(
3564            std::fs::read(&protected).expect("read protected target"),
3565            b"protected"
3566        );
3567        assert!(
3568            std::fs::symlink_metadata(&temp_path)
3569                .expect("temp path metadata")
3570                .file_type()
3571                .is_symlink(),
3572            "failed temp write should leave the existing symlink untouched"
3573        );
3574    }
3575
3576    #[test]
3577    fn test_sync_status_retain_sources_prunes_removed_entries() {
3578        let mut status = SyncStatus::default();
3579        status.sources.insert(
3580            "laptop".into(),
3581            SourceSyncInfo {
3582                files_synced: 3,
3583                ..Default::default()
3584            },
3585        );
3586        status.sources.insert(
3587            "desktop".into(),
3588            SourceSyncInfo {
3589                files_synced: 5,
3590                ..Default::default()
3591            },
3592        );
3593
3594        let removed_any = status.retain_sources(["laptop"]);
3595
3596        assert!(removed_any);
3597        assert!(status.get("laptop").is_some());
3598        assert!(status.get("desktop").is_none());
3599    }
3600
3601    fn source_with_schedule(schedule: SyncSchedule) -> SourceDefinition {
3602        let mut source = SourceDefinition::ssh("laptop", "user@laptop.local");
3603        source.sync_schedule = schedule;
3604        source.paths = vec!["~/.claude/projects".to_string()];
3605        source
3606    }
3607
3608    fn status_with_info(info: SourceSyncInfo) -> SyncStatus {
3609        let mut status = SyncStatus::default();
3610        status.set_info("laptop", info);
3611        status
3612    }
3613
3614    #[test]
3615    fn source_sync_decision_skips_healthy_source_until_schedule_due() {
3616        let now_ms = 1_700_000_000_000;
3617        let source = source_with_schedule(SyncSchedule::Hourly);
3618        let status = status_with_info(SourceSyncInfo {
3619            last_sync: Some(now_ms - 10 * 60 * 1000),
3620            last_result: SyncResult::Success,
3621            duration_ms: 250,
3622            ..Default::default()
3623        });
3624
3625        let decision = status.decision_for_source_at(&source, now_ms, false);
3626
3627        assert_eq!(decision.action, SourceSyncAction::Skip);
3628        assert_eq!(decision.health, SourceHealthKind::Healthy);
3629        assert!(!decision.fallback_active);
3630        assert_eq!(
3631            decision.next_eligible_sync_ms,
3632            Some(now_ms + 50 * 60 * 1000)
3633        );
3634        assert_eq!(decision.staleness_ms, Some(10 * 60 * 1000));
3635        assert_eq!(decision.stale_value_score, 16);
3636    }
3637
3638    #[test]
3639    fn source_sync_decision_syncs_stale_scheduled_source() {
3640        let now_ms = 1_700_000_000_000;
3641        let source = source_with_schedule(SyncSchedule::Hourly);
3642        let status = status_with_info(SourceSyncInfo {
3643            last_sync: Some(now_ms - 2 * 60 * 60 * 1000),
3644            last_result: SyncResult::Success,
3645            duration_ms: 250,
3646            ..Default::default()
3647        });
3648
3649        let decision = status.decision_for_source_at(&source, now_ms, false);
3650
3651        assert_eq!(decision.action, SourceSyncAction::Sync);
3652        assert_eq!(decision.health, SourceHealthKind::Stale);
3653        assert_eq!(decision.stale_value_score, 100);
3654        assert!(
3655            decision
3656                .reasons
3657                .iter()
3658                .any(|reason| reason.contains("schedule is due"))
3659        );
3660    }
3661
3662    #[test]
3663    fn source_sync_decision_defers_auth_failures_with_fallback_reason() {
3664        let now_ms = 1_700_000_000_000;
3665        let source = source_with_schedule(SyncSchedule::Hourly);
3666        let status = status_with_info(SourceSyncInfo {
3667            last_sync: Some(now_ms - 10 * 60 * 1000),
3668            last_result: SyncResult::Failed("Permission denied (publickey)".into()),
3669            duration_ms: 800,
3670            consecutive_failures: 1,
3671            ..Default::default()
3672        });
3673
3674        let decision = status.decision_for_source_at(&source, now_ms, false);
3675
3676        assert_eq!(decision.action, SourceSyncAction::Defer);
3677        assert_eq!(decision.health, SourceHealthKind::AuthFailed);
3678        assert!(decision.fallback_active);
3679        assert_eq!(decision.health_score, 10);
3680    }
3681
3682    #[test]
3683    fn source_sync_decision_marks_partial_success_as_flapping() {
3684        let now_ms = 1_700_000_000_000;
3685        let source = source_with_schedule(SyncSchedule::Hourly);
3686        let status = status_with_info(SourceSyncInfo {
3687            last_sync: Some(now_ms - 10 * 60 * 1000),
3688            last_result: SyncResult::PartialFailure("one path failed".into()),
3689            files_synced: 7,
3690            duration_ms: 900,
3691            consecutive_failures: 1,
3692            ..Default::default()
3693        });
3694
3695        let decision = status.decision_for_source_at(&source, now_ms, false);
3696
3697        assert_eq!(decision.action, SourceSyncAction::Skip);
3698        assert_eq!(decision.health, SourceHealthKind::Flapping);
3699        assert!(decision.fallback_active);
3700    }
3701
3702    #[test]
3703    fn source_sync_decision_keeps_local_fallback_after_unreachable_backoff_expires() {
3704        let now_ms = 1_700_000_000_000;
3705        let source = source_with_schedule(SyncSchedule::Hourly);
3706        let last_sync = now_ms - 10 * 60 * 1000;
3707        let status = status_with_info(SourceSyncInfo {
3708            last_sync: Some(last_sync),
3709            last_result: SyncResult::Failed("Host unreachable".into()),
3710            duration_ms: 900,
3711            consecutive_failures: 1,
3712            ..Default::default()
3713        });
3714
3715        let decision = status.decision_for_source_at(&source, now_ms, false);
3716
3717        assert_eq!(decision.action, SourceSyncAction::Skip);
3718        assert_eq!(decision.health, SourceHealthKind::Flapping);
3719        assert!(decision.fallback_active);
3720        assert_eq!(
3721            decision.backoff_until_ms,
3722            Some(last_sync + SOURCE_FAILURE_BACKOFF_BASE_MS)
3723        );
3724        assert!(
3725            decision
3726                .reasons
3727                .iter()
3728                .any(|reason| reason.contains("local fallback remains active"))
3729        );
3730    }
3731
3732    #[test]
3733    fn source_sync_decision_marks_slow_source_as_high_latency() {
3734        let now_ms = 1_700_000_000_000;
3735        let source = source_with_schedule(SyncSchedule::Hourly);
3736        let status = status_with_info(SourceSyncInfo {
3737            last_sync: Some(now_ms - 10 * 60 * 1000),
3738            last_result: SyncResult::Success,
3739            duration_ms: SOURCE_HIGH_LATENCY_MS + 1,
3740            ..Default::default()
3741        });
3742
3743        let decision = status.decision_for_source_at(&source, now_ms, false);
3744
3745        assert_eq!(decision.action, SourceSyncAction::Skip);
3746        assert_eq!(decision.health, SourceHealthKind::HighLatency);
3747        assert!(decision.fallback_active);
3748    }
3749
3750    #[test]
3751    fn source_sync_decision_manual_override_forces_sync() {
3752        let now_ms = 1_700_000_000_000;
3753        let source = source_with_schedule(SyncSchedule::Manual);
3754        let status = status_with_info(SourceSyncInfo {
3755            last_sync: Some(now_ms),
3756            last_result: SyncResult::Success,
3757            duration_ms: 100,
3758            ..Default::default()
3759        });
3760
3761        let decision = status.decision_for_source_at(&source, now_ms, true);
3762
3763        assert_eq!(decision.action, SourceSyncAction::Sync);
3764        assert!(decision.manual_override);
3765        assert!(
3766            decision
3767                .reasons
3768                .iter()
3769                .any(|reason| reason.contains("overrides automatic scheduling"))
3770        );
3771    }
3772
3773    #[test]
3774    fn test_unique_atomic_temp_path_changes_each_call() {
3775        let final_path = Path::new("/tmp/sync_status.json");
3776        let first = unique_atomic_temp_path(final_path);
3777        let second = unique_atomic_temp_path(final_path);
3778
3779        assert_ne!(first, second);
3780        assert_eq!(first.parent(), final_path.parent());
3781        assert_eq!(second.parent(), final_path.parent());
3782    }
3783
3784    #[test]
3785    fn test_replace_file_from_temp_overwrites_existing_file() {
3786        let temp = TempDir::new().expect("tempdir");
3787        let final_path = temp.path().join("sync_status.json");
3788        let first_tmp = temp.path().join("first.tmp");
3789        let second_tmp = temp.path().join("second.tmp");
3790
3791        std::fs::write(&first_tmp, "{\"first\":true}").expect("write first temp");
3792        replace_file_from_temp(&first_tmp, &final_path).expect("initial replace");
3793        assert_eq!(
3794            std::fs::read_to_string(&final_path).expect("read first final"),
3795            "{\"first\":true}"
3796        );
3797
3798        std::fs::write(&second_tmp, "{\"second\":true}").expect("write second temp");
3799        replace_file_from_temp(&second_tmp, &final_path).expect("overwrite replace");
3800        assert_eq!(
3801            std::fs::read_to_string(&final_path).expect("read second final"),
3802            "{\"second\":true}"
3803        );
3804    }
3805
3806    #[test]
3807    fn test_sync_engine_with_timeouts() {
3808        let engine = SyncEngine::new(Path::new("/data"))
3809            .with_connection_timeout(30)
3810            .with_transfer_timeout(600);
3811
3812        assert_eq!(engine.connection_timeout, 30);
3813        assert_eq!(engine.transfer_timeout, 600);
3814    }
3815
3816    #[test]
3817    fn test_sync_error_display() {
3818        assert_eq!(
3819            SyncError::NoHost.to_string(),
3820            "Source has no host configured"
3821        );
3822        assert_eq!(
3823            SyncError::NoPaths.to_string(),
3824            "Source has no paths configured"
3825        );
3826        assert_eq!(
3827            SyncError::InvalidPath("paths[0] cannot be empty".to_string()).to_string(),
3828            "Invalid source path: paths[0] cannot be empty"
3829        );
3830        assert_eq!(
3831            SyncError::Timeout(30).to_string(),
3832            "Connection timed out after 30 seconds"
3833        );
3834        assert_eq!(SyncError::Cancelled.to_string(), "Sync cancelled");
3835    }
3836
3837    // =========================================================================
3838    // SFTP helper function tests
3839    // =========================================================================
3840
3841    #[test]
3842    fn test_parse_ssh_host_simple() {
3843        let (user, host) = parse_ssh_host("myserver");
3844        assert!(user.is_none());
3845        assert_eq!(host, "myserver");
3846    }
3847
3848    #[test]
3849    fn test_parse_ssh_host_with_user() {
3850        let (user, host) = parse_ssh_host("admin@myserver");
3851        assert_eq!(user, Some("admin"));
3852        assert_eq!(host, "myserver");
3853    }
3854
3855    #[test]
3856    fn test_parse_ssh_host_with_domain() {
3857        let (user, host) = parse_ssh_host("deploy@server.example.com");
3858        assert_eq!(user, Some("deploy"));
3859        assert_eq!(host, "server.example.com");
3860    }
3861
3862    #[test]
3863    fn test_parse_ssh_host_email_like() {
3864        // Edge case: user looks like email prefix
3865        let (user, host) = parse_ssh_host("user@host");
3866        assert_eq!(user, Some("user"));
3867        assert_eq!(host, "host");
3868    }
3869
3870    #[test]
3871    fn test_first_nonblank_username_priority_and_trimming() {
3872        assert_eq!(
3873            first_nonblank_username([Some("  alice  "), Some("bob")]),
3874            Some("alice".to_string())
3875        );
3876        assert_eq!(
3877            first_nonblank_username([Some("  "), None, Some("carol")]),
3878            Some("carol".to_string())
3879        );
3880        assert_eq!(first_nonblank_username([None, Some("\t")]), None);
3881    }
3882
3883    #[test]
3884    fn test_expand_tilde_local_with_tilde_prefix() {
3885        let expanded = expand_tilde_local("~/Documents/file.txt");
3886        // Should start with home directory, not tilde
3887        assert!(!expanded.starts_with('~'));
3888        assert!(expanded.ends_with("/Documents/file.txt"));
3889    }
3890
3891    #[test]
3892    fn test_expand_tilde_local_just_tilde() {
3893        let expanded = expand_tilde_local("~");
3894        // Should be just home directory
3895        assert!(!expanded.starts_with('~'));
3896        assert!(!expanded.is_empty());
3897    }
3898
3899    #[test]
3900    fn test_expand_tilde_local_no_tilde() {
3901        let path = "/absolute/path/to/file";
3902        let expanded = expand_tilde_local(path);
3903        assert_eq!(expanded, path);
3904    }
3905
3906    #[test]
3907    fn test_expand_tilde_local_tilde_in_middle() {
3908        // Tilde in middle should not be expanded
3909        let path = "/path/with/~tilde/inside";
3910        let expanded = expand_tilde_local(path);
3911        assert_eq!(expanded, path);
3912    }
3913}