Skip to main content

coding_agent_search/sources/
sync.rs

1//! Sync engine for pulling agent sessions from remote sources.
2//!
3//! This module provides the core sync functionality using rsync over SSH
4//! for efficient delta transfers, with progress reporting and error recovery.
5//!
6//! # Safety
7//!
8//! **IMPORTANT**: The sync engine uses rsync WITHOUT the `--delete` flag
9//! to ensure safe additive syncs. This prevents accidental data loss if
10//! a remote is misconfigured or temporarily empty.
11//!
12//! # Example
13//!
14//! ```rust,ignore
15//! use coding_agent_search::sources::sync::SyncEngine;
16//! use coding_agent_search::sources::config::SourcesConfig;
17//!
18//! let config = SourcesConfig::load()?;
19//! let engine = SyncEngine::new(&data_dir);
20//!
21//! for source in config.remote_sources() {
22//!     let report = engine.sync_source(source)?;
23//!     println!("Synced {}: {} files", source.name, report.total_files());
24//! }
25//! ```
26
27use std::collections::HashMap;
28use std::path::{Path, PathBuf};
29use std::process::{Command, Stdio};
30use std::sync::{Mutex, OnceLock};
31use std::time::{Duration, Instant};
32
33use thiserror::Error;
34
35use super::{
36    config::{
37        SourceDefinition, SyncSchedule, discover_ssh_hosts, source_path_entry_error,
38        ssh_host_has_safe_token_chars, validate_optional_user_host_shape,
39    },
40    configure_child_process_group, host_key_verification_error, is_host_key_verification_failure,
41    strict_ssh_cli_tokens, strict_ssh_command_for_rsync, wait_for_child_output_with_timeout,
42};
43use ssh2::{FileStat, Session, Sftp};
44use std::io::{Read as IoRead, Write as IoWrite};
45use std::net::{Shutdown, TcpStream};
46
47/// Which variant of rsync's "pass args protected to the remote" flag the
48/// system `rsync` accepts. The flag was introduced in rsync 3.0.0 as
49/// `--protect-args`; rsync 3.4.0 renamed the primary form to
50/// `--secluded-args` (`-s`) and current Homebrew `rsync 3.4.1` prints only
51/// the new name in `--help`, so a simple substring probe for `--protect-args`
52/// mis-classifies it as unsupported and falls through to the quoted-path
53/// rsync branch — which breaks (#191). openrsync (macOS 15+) supports
54/// neither.
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56enum RsyncArgProtection {
57    /// Neither flag supported — callers must manually quote remote paths for
58    /// the remote login shell.
59    None,
60    /// rsync 3.0.0..3.4.0 — original flag name.
61    ProtectArgs,
62    /// rsync 3.4.0+ (incl. Homebrew 3.4.1) — renamed primary form.
63    SecludedArgs,
64}
65
66impl RsyncArgProtection {
67    fn is_supported(self) -> bool {
68        !matches!(self, Self::None)
69    }
70
71    /// CLI flag to pass to rsync, or `None` if no protection variant is
72    /// available.
73    fn flag(self) -> Option<&'static str> {
74        match self {
75            Self::ProtectArgs => Some("--protect-args"),
76            Self::SecludedArgs => Some("--secluded-args"),
77            Self::None => None,
78        }
79    }
80}
81
82fn detect_rsync_arg_protection() -> RsyncArgProtection {
83    static CACHED: OnceLock<RsyncArgProtection> = OnceLock::new();
84    *CACHED.get_or_init(|| {
85        let Some(out) = Command::new("rsync").arg("--help").output().ok() else {
86            return RsyncArgProtection::None;
87        };
88        // rsync prints to stdout on GNU/Linux and Homebrew macOS, but some
89        // forks / older builds print help on stderr — check both so we never
90        // misclassify a supported rsync as unsupported.
91        let mut combined = String::from_utf8_lossy(&out.stdout).into_owned();
92        combined.push_str(&String::from_utf8_lossy(&out.stderr));
93        // Prefer the newer name when both are listed (forward-compat with a
94        // hypothetical rsync that keeps both as aliases): `--secluded-args`
95        // is what current rsync actually prints in help output, and using
96        // the printed name is the one guaranteed to be accepted.
97        if combined.contains("--secluded-args") {
98            RsyncArgProtection::SecludedArgs
99        } else if combined.contains("--protect-args") {
100            RsyncArgProtection::ProtectArgs
101        } else {
102            RsyncArgProtection::None
103        }
104    })
105}
106
107fn quote_remote_shell_path(path: &str) -> String {
108    // POSIX shell single-quote escape:
109    // 1. Wrap the whole thing in single quotes.
110    // 2. Escape existing single quotes by closing the current quote,
111    //    inserting a backslash-escaped quote, and opening a new one.
112    // Result: 'foo'\''bar'
113    format!("'{}'", path.replace('\'', r#"'\''"#))
114}
115
116fn remote_spec_for_shell_bound_copy(host: &str, remote_path: &str) -> String {
117    // host itself might contain user@ or be an alias, but we should not quote it
118    // if it's already a single token. However, if it contains spaces or other
119    // weirdness it's already broken for SSH. We focus on the path part.
120    format!("{host}:{}", quote_remote_shell_path(remote_path))
121}
122
123fn remote_spec_for_rsync(host: &str, remote_path: &str, protect_args_supported: bool) -> String {
124    if protect_args_supported {
125        // With --protect-args, rsync handles its own escaping over the wire
126        format!("{host}:{remote_path}")
127    } else {
128        // Without it (e.g. openrsync), we must manually quote for the remote shell
129        remote_spec_for_shell_bound_copy(host, remote_path)
130    }
131}
132
133fn run_rsync_command(
134    timeout_str: &str,
135    ssh_opts: &str,
136    remote_spec: &str,
137    local_path_str: &str,
138    arg_protection: RsyncArgProtection,
139) -> std::io::Result<std::process::Output> {
140    let mut cmd = Command::new("rsync");
141    cmd.args(["-avz", "--links", "--safe-links", "--stats", "--partial"]);
142    if let Some(flag) = arg_protection.flag() {
143        cmd.arg(flag);
144    }
145    cmd.args([
146        "--timeout",
147        timeout_str,
148        "-e",
149        ssh_opts,
150        "--",
151        remote_spec,
152        local_path_str,
153    ]);
154    cmd.output()
155}
156
157fn rsync_arg_protection_remote_rejected(stderr: &str) -> bool {
158    let lower = stderr.to_ascii_lowercase();
159    // GNU rsync error formats:
160    lower.contains("invalid option -- s")
161        || lower.contains("unknown option -- s")
162        || lower.contains("unrecognized option '--secluded-args'")
163        || lower.contains("unknown option -- secluded-args")
164        || lower.contains("unrecognized option '--protect-args'")
165        || lower.contains("unknown option -- protect-args")
166        // BSD/macOS getopt error format (openrsync uses getopt which prints "illegal option"):
167        || lower.contains("illegal option -- s")
168}
169
170/// Return `true` if the first line of `rsync --version` output indicates
171/// that the remote rsync is Apple's openrsync (protocol 29, no --secluded-args
172/// support).
173///
174/// openrsync identifies itself in its `--version` output with the string
175/// "openrsync" (case-insensitive), e.g.:
176///   "openrsync: protocol version 29"
177fn parse_remote_rsync_is_openrsync(version_output: &str) -> bool {
178    version_output
179        .lines()
180        .next()
181        .map(|line| line.to_ascii_lowercase().contains("openrsync"))
182        .unwrap_or(false)
183}
184
185/// Per-host openrsync detection cache. Populated lazily on the first sync to
186/// a given host; subsequent syncs reuse the cached result without an extra
187/// SSH round-trip.
188fn remote_openrsync_cache() -> &'static Mutex<HashMap<String, bool>> {
189    static CACHE: OnceLock<Mutex<HashMap<String, bool>>> = OnceLock::new();
190    CACHE.get_or_init(|| Mutex::new(HashMap::new()))
191}
192
193/// Query whether the remote host's rsync is openrsync, using a cached result
194/// when available.
195///
196/// Runs `rsync --version 2>&1 | head -1` on the remote via SSH.  If the SSH
197/// call fails (e.g. host unreachable) we return `false` rather than propagating
198/// the error — the main sync will surface a better error message shortly after.
199fn probe_remote_rsync_is_openrsync(host: &str, timeout_secs: u64) -> bool {
200    // Fast path: already cached.
201    {
202        let cache = remote_openrsync_cache()
203            .lock()
204            .unwrap_or_else(|e| e.into_inner());
205        if let Some(&cached) = cache.get(host) {
206            return cached;
207        }
208    }
209
210    // Slow path: ask the remote.
211    let is_openrsync = detect_remote_rsync_is_openrsync_via_ssh(host, timeout_secs);
212
213    // Store in cache.
214    {
215        let mut cache = remote_openrsync_cache()
216            .lock()
217            .unwrap_or_else(|e| e.into_inner());
218        cache.insert(host.to_string(), is_openrsync);
219    }
220
221    is_openrsync
222}
223
224/// Inner (non-cached) implementation of the remote openrsync probe.
225fn detect_remote_rsync_is_openrsync_via_ssh(host: &str, timeout_secs: u64) -> bool {
226    let timeout_secs = timeout_secs.clamp(1, 30);
227    let mut cmd = Command::new("ssh");
228    cmd.args(strict_ssh_cli_tokens(timeout_secs))
229        .arg("-o")
230        .arg("LogLevel=ERROR")
231        .arg("--")
232        .arg(host)
233        .arg("rsync --version 2>&1 | head -1")
234        .stdout(Stdio::piped())
235        .stderr(Stdio::piped());
236    configure_child_process_group(&mut cmd);
237
238    let Ok(Some(output)) = cmd.spawn().and_then(|child| {
239        wait_for_child_output_with_timeout(child, Duration::from_secs(timeout_secs))
240    }) else {
241        return false;
242    };
243    if !output.status.success() && output.stdout.is_empty() {
244        return false;
245    }
246    let first_line = String::from_utf8_lossy(&output.stdout);
247    let result = parse_remote_rsync_is_openrsync(first_line.trim());
248    tracing::debug!(
249        host = %host,
250        rsync_version_line = %first_line.trim(),
251        is_openrsync = result,
252        "remote rsync version probe"
253    );
254    result
255}
256
257fn remote_spec_for_scp(host: &str, remote_path: &str) -> String {
258    // scp still executes a remote shell command for the source operand, so the
259    // path side must be quoted even though we pass it as one local argv token.
260    remote_spec_for_shell_bound_copy(host, remote_path)
261}
262
263fn remote_find_regular_files_command(remote_path: &str) -> String {
264    format!(
265        "find -P {} -type f -print0",
266        quote_remote_shell_path(remote_path)
267    )
268}
269
270fn parse_remote_home_stdout(stdout: &[u8]) -> Option<String> {
271    let output = String::from_utf8_lossy(stdout);
272    for line in output.lines() {
273        if let Some(home) = line.trim().strip_prefix("CASS_HOME_MARKER:")
274            && home.starts_with('/')
275            && !home.contains('\0')
276        {
277            return Some(home.to_string());
278        }
279    }
280    None
281}
282
283fn parse_null_terminated_utf8_paths(bytes: &[u8]) -> Vec<String> {
284    bytes
285        .split(|byte| *byte == 0)
286        .filter(|part| !part.is_empty())
287        .filter_map(|part| std::str::from_utf8(part).ok())
288        .map(ToOwned::to_owned)
289        .collect()
290}
291
292fn validate_remote_sync_path_entry(index: usize, path: &str) -> Result<(), SyncError> {
293    match source_path_entry_error(index, path) {
294        Some(message) => Err(SyncError::InvalidPath(message)),
295        None => Ok(()),
296    }
297}
298
299fn invalid_remote_sync_path_result(remote_path: &str, err: SyncError) -> PathSyncResult {
300    PathSyncResult {
301        remote_path: remote_path.to_string(),
302        success: false,
303        error: Some(err.to_string()),
304        ..Default::default()
305    }
306}
307
308fn remote_file_to_safe_local_path(
309    remote_root: &Path,
310    remote_file: &Path,
311    local_container: &Path,
312    leaf_name: &str,
313) -> Option<PathBuf> {
314    let mut local_path = local_container.join(leaf_name);
315    if remote_file == remote_root {
316        return Some(local_path);
317    }
318
319    let relative = remote_file.strip_prefix(remote_root).ok()?;
320    for component in relative.components() {
321        match component {
322            std::path::Component::Normal(name) => local_path.push(name),
323            std::path::Component::CurDir => {}
324            _ => return None,
325        }
326    }
327
328    Some(local_path)
329}
330
331fn existing_local_symlink_below_root(root: &Path, path: &Path) -> Result<Option<PathBuf>, String> {
332    let rel = path.strip_prefix(root).map_err(|_| {
333        format!(
334            "Local path {} is outside sync root {}",
335            path.display(),
336            root.display()
337        )
338    })?;
339
340    let mut current = root.to_path_buf();
341    if let Some(link) = existing_path_symlink(&current)? {
342        return Ok(Some(link));
343    }
344
345    for component in rel.components() {
346        match component {
347            std::path::Component::Normal(name) => current.push(name),
348            std::path::Component::CurDir => continue,
349            _ => {
350                return Err(format!(
351                    "Local path {} contains unsafe component below sync root {}",
352                    path.display(),
353                    root.display()
354                ));
355            }
356        }
357
358        if let Some(link) = existing_path_symlink(&current)? {
359            return Ok(Some(link));
360        }
361    }
362
363    Ok(None)
364}
365
366fn existing_path_symlink(path: &Path) -> Result<Option<PathBuf>, String> {
367    match std::fs::symlink_metadata(path) {
368        Ok(metadata) if metadata.file_type().is_symlink() => Ok(Some(path.to_path_buf())),
369        Ok(_) => Ok(None),
370        Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
371        Err(e) => Err(format!("Failed to inspect {}: {}", path.display(), e)),
372    }
373}
374
375fn reject_local_symlink_below_root(root: &Path, path: &Path) -> Result<(), String> {
376    if let Some(link) = existing_local_symlink_below_root(root, path)? {
377        return Err(format!(
378            "Refusing to write {} through local symlink {}",
379            path.display(),
380            link.display()
381        ));
382    }
383
384    Ok(())
385}
386
387fn prepare_local_sync_container(sync_root: &Path, local_path: &Path) -> Result<(), String> {
388    reject_local_symlink_below_root(sync_root, local_path)?;
389    std::fs::create_dir_all(local_path)
390        .map_err(|e| format!("Failed to create directory: {}", e))?;
391    reject_local_symlink_below_root(sync_root, local_path)?;
392    Ok(())
393}
394
395fn prepare_local_sync_root(local_store: &Path, mirror_dir: &Path) -> Result<(), String> {
396    reject_local_symlink_below_root(local_store, mirror_dir)?;
397    std::fs::create_dir_all(mirror_dir)
398        .map_err(|e| format!("Failed to create directory: {}", e))?;
399    reject_local_symlink_below_root(local_store, mirror_dir)?;
400    Ok(())
401}
402
403fn sftp_file_stat_is_symlink(stat: &FileStat) -> bool {
404    stat.file_type().is_symlink()
405}
406
407/// Errors that can occur during sync operations.
408#[derive(Error, Debug)]
409pub enum SyncError {
410    #[error("Source has no host configured")]
411    NoHost,
412
413    #[error("Source has no paths configured")]
414    NoPaths,
415
416    #[error("Invalid source path: {0}")]
417    InvalidPath(String),
418
419    #[error("Invalid source definition: {0}")]
420    InvalidSource(String),
421
422    #[error("rsync command failed: {0}")]
423    RsyncFailed(String),
424
425    #[error("Failed to create local directory: {0}")]
426    CreateDirFailed(#[from] std::io::Error),
427
428    #[error("SSH connection failed: {0}")]
429    SshFailed(String),
430
431    #[error("Connection timed out after {0} seconds")]
432    Timeout(u64),
433
434    #[error("Sync cancelled")]
435    Cancelled,
436}
437
438/// Method used for syncing files from remote.
439#[derive(Debug, Clone, Copy, PartialEq, Eq)]
440pub enum SyncMethod {
441    /// rsync over SSH - preferred for delta transfers
442    Rsync,
443    /// rsync invoked via WSL (`wsl rsync`) - used on Windows when native rsync is unavailable
444    /// but WSL is installed with rsync available inside it.
445    WslRsync,
446    /// SCP-based transfer using the system `scp` command.
447    ///
448    /// Used on Windows (and other platforms) when rsync is unavailable. Delegates all
449    /// authentication to the system `ssh`/`scp` binary so it inherits OpenSSH agent,
450    /// `~/.ssh/` keys, and `~/.ssh/config` correctly – avoiding the `ssh2` library
451    /// which does not integrate with the Windows OpenSSH agent.
452    Scp,
453    /// SFTP fallback using the `ssh2` crate – last resort only.
454    ///
455    /// Deprecated in favour of [`SyncMethod::Scp`] which uses the native system SSH
456    /// binary. Kept for backward compatibility with callers that pattern-match on this
457    /// variant.
458    Sftp,
459}
460
461impl SyncMethod {
462    pub fn as_str(self) -> &'static str {
463        match self {
464            Self::Rsync => "rsync",
465            Self::WslRsync => "wsl-rsync",
466            Self::Scp => "scp",
467            Self::Sftp => "sftp",
468        }
469    }
470}
471
472impl std::fmt::Display for SyncMethod {
473    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
474        f.write_str(self.as_str())
475    }
476}
477
478/// Result of syncing a single path.
479#[derive(Debug, Clone, Default)]
480pub struct PathSyncResult {
481    /// Remote path that was synced.
482    pub remote_path: String,
483    /// Local destination path.
484    pub local_path: PathBuf,
485    /// Number of files transferred.
486    pub files_transferred: u64,
487    /// Total bytes transferred.
488    pub bytes_transferred: u64,
489    /// Whether the sync succeeded.
490    pub success: bool,
491    /// Error message if sync failed.
492    pub error: Option<String>,
493    /// Duration of the sync operation.
494    pub duration_ms: u64,
495}
496
497/// Report from syncing an entire source.
498#[derive(Debug, Clone)]
499pub struct SyncReport {
500    /// Name of the source that was synced.
501    pub source_name: String,
502    /// Method used for syncing.
503    pub method: SyncMethod,
504    /// Results for each path.
505    pub path_results: Vec<PathSyncResult>,
506    /// Total duration of the sync.
507    pub total_duration_ms: u64,
508    /// Whether all paths synced successfully.
509    pub all_succeeded: bool,
510}
511
512impl SyncReport {
513    /// Create a new report for a source.
514    pub fn new(source_name: impl Into<String>, method: SyncMethod) -> Self {
515        Self {
516            source_name: source_name.into(),
517            method,
518            path_results: Vec::new(),
519            total_duration_ms: 0,
520            all_succeeded: true,
521        }
522    }
523
524    /// Create a failed report when sync couldn't even start.
525    pub fn failed(source_name: impl Into<String>, error: SyncError) -> Self {
526        Self {
527            source_name: source_name.into(),
528            method: SyncMethod::Rsync,
529            path_results: vec![PathSyncResult {
530                error: Some(error.to_string()),
531                success: false,
532                ..Default::default()
533            }],
534            total_duration_ms: 0,
535            all_succeeded: false,
536        }
537    }
538
539    /// Add a path result to the report.
540    pub fn add_path_result(&mut self, result: PathSyncResult) {
541        if !result.success {
542            self.all_succeeded = false;
543        }
544        self.path_results.push(result);
545    }
546
547    /// Get total files transferred across all paths.
548    pub fn total_files(&self) -> u64 {
549        self.path_results.iter().map(|r| r.files_transferred).sum()
550    }
551
552    /// Get total bytes transferred across all paths.
553    pub fn total_bytes(&self) -> u64 {
554        self.path_results.iter().map(|r| r.bytes_transferred).sum()
555    }
556
557    /// Get count of successful path syncs.
558    pub fn successful_paths(&self) -> usize {
559        self.path_results.iter().filter(|r| r.success).count()
560    }
561
562    /// Get count of failed path syncs.
563    pub fn failed_paths(&self) -> usize {
564        self.path_results.iter().filter(|r| !r.success).count()
565    }
566
567    /// Summarize the overall sync outcome.
568    pub fn sync_result(&self) -> SyncResult {
569        if self.all_succeeded {
570            SyncResult::Success
571        } else {
572            let errors: Vec<String> = self
573                .path_results
574                .iter()
575                .filter_map(|r| r.error.clone())
576                .collect();
577            if self.successful_paths() > 0 {
578                SyncResult::PartialFailure(errors.join("; "))
579            } else {
580                SyncResult::Failed(errors.join("; "))
581            }
582        }
583    }
584}
585
586/// Statistics parsed from rsync output.
587#[derive(Debug, Default)]
588struct RsyncStats {
589    files_transferred: u64,
590    bytes_transferred: u64,
591}
592
593/// Sync engine for pulling sessions from remote sources.
594pub struct SyncEngine {
595    /// Base directory for storing synced data.
596    /// Structure: `{local_store}/remotes/{source_name}/mirror/`
597    local_store: PathBuf,
598    /// Connection timeout in seconds.
599    connection_timeout: u64,
600    /// Transfer timeout in seconds (0 = no timeout).
601    transfer_timeout: u64,
602}
603
604impl SyncEngine {
605    /// Create a new sync engine.
606    ///
607    /// # Arguments
608    /// * `data_dir` - The cass data directory (e.g., ~/.local/share/coding-agent-search)
609    pub fn new(data_dir: &Path) -> Self {
610        Self {
611            local_store: data_dir.to_path_buf(),
612            connection_timeout: 10,
613            transfer_timeout: 300, // 5 minutes
614        }
615    }
616
617    /// Set the connection timeout.
618    pub fn with_connection_timeout(mut self, seconds: u64) -> Self {
619        self.connection_timeout = seconds;
620        self
621    }
622
623    /// Set the transfer timeout.
624    pub fn with_transfer_timeout(mut self, seconds: u64) -> Self {
625        self.transfer_timeout = seconds;
626        self
627    }
628
629    /// Get the local mirror directory for a source.
630    pub fn mirror_dir(&self, source_name: &str) -> PathBuf {
631        self.local_store
632            .join("remotes")
633            .join(source_name)
634            .join("mirror")
635    }
636
637    /// Get the remote home directory by SSH-ing to the host and printing `$HOME`.
638    ///
639    /// This is called once per source sync to avoid repeated SSH calls for each path.
640    fn get_remote_home(&self, host: &str) -> Result<String, SyncError> {
641        // Validate host doesn't contain shell metacharacters to prevent injection
642        if host.trim().is_empty()
643            || host.starts_with('-')
644            || !ssh_host_has_safe_token_chars(host)
645            || validate_optional_user_host_shape(host).is_err()
646        {
647            return Err(SyncError::SshFailed(format!(
648                "Invalid characters in host: {}",
649                host
650            )));
651        }
652
653        let timeout_secs = self.connection_timeout.max(1);
654        let mut cmd = Command::new("ssh");
655        cmd.args(strict_ssh_cli_tokens(timeout_secs))
656            .arg("--")
657            .arg(host)
658            .arg("printf 'CASS_HOME_MARKER:%s\\n' \"$HOME\"")
659            .stdout(Stdio::piped())
660            .stderr(Stdio::piped());
661        configure_child_process_group(&mut cmd);
662
663        let child = cmd
664            .spawn()
665            .map_err(|e| SyncError::SshFailed(format!("Failed to execute ssh: {}", e)))?;
666        let output = wait_for_child_output_with_timeout(child, Duration::from_secs(timeout_secs))
667            .map_err(|e| SyncError::SshFailed(format!("SSH command failed: {}", e)))?
668            .ok_or(SyncError::Timeout(timeout_secs))?;
669
670        if !output.status.success() {
671            let stderr = String::from_utf8_lossy(&output.stderr);
672            if is_host_key_verification_failure(&stderr) {
673                return Err(SyncError::SshFailed(host_key_verification_error(host)));
674            }
675            return Err(SyncError::SshFailed(format!(
676                "Failed to get remote home directory: {}",
677                stderr.trim()
678            )));
679        }
680
681        let remote_home = parse_remote_home_stdout(&output.stdout).ok_or_else(|| {
682            SyncError::SshFailed(
683                "Unable to parse remote home directory from SSH output".to_string(),
684            )
685        })?;
686
687        tracing::debug!(host = %host, remote_home = %remote_home, "got remote home directory");
688        Ok(remote_home)
689    }
690
691    /// Expand ~ in a remote path using the provided home directory.
692    ///
693    /// If `remote_home` is None, returns the path unchanged.
694    fn expand_tilde_with_home(path: &str, remote_home: Option<&str>) -> String {
695        if !path.starts_with('~') {
696            return path.to_string();
697        }
698
699        let Some(home) = remote_home else {
700            return path.to_string();
701        };
702
703        if path == "~" {
704            home.to_string()
705        } else if let Some(rest) = path.strip_prefix("~/") {
706            format!("{}/{}", home, rest)
707        } else {
708            // ~user/path case - not supported, return as-is
709            path.to_string()
710        }
711    }
712
713    /// Detect the available sync method.
714    ///
715    /// Detection order:
716    /// 1. Native `rsync` → [`SyncMethod::Rsync`]
717    /// 2. `wsl rsync` (Windows only) → [`SyncMethod::WslRsync`]
718    /// 3. System `scp` available → [`SyncMethod::Scp`]
719    /// 4. Last resort → [`SyncMethod::Sftp`] (ssh2-based, no native-agent integration)
720    ///
721    /// On Windows the `ssh2` SFTP path is intentionally avoided whenever possible
722    /// because it bypasses the Windows OpenSSH agent and `~/.ssh/config`, leading to
723    /// "No valid authentication method found" errors even when SSH keys are properly
724    /// configured. Using the system `scp` binary instead lets OpenSSH handle auth the
725    /// same way `ssh` and `cass sources doctor` do.
726    pub fn detect_sync_method() -> SyncMethod {
727        // 1. Native rsync
728        if Command::new("rsync")
729            .arg("--version")
730            .output()
731            .map(|o| o.status.success())
732            .unwrap_or(false)
733        {
734            return SyncMethod::Rsync;
735        }
736
737        // 2. WSL rsync (Windows-only: rsync inside WSL invoked via `wsl rsync`)
738        #[cfg(target_os = "windows")]
739        if Command::new("wsl")
740            .args(["rsync", "--version"])
741            .output()
742            .map(|o| o.status.success())
743            .unwrap_or(false)
744        {
745            return SyncMethod::WslRsync;
746        }
747
748        // 3. System scp – preferred over ssh2/SFTP because it inherits the native
749        //    OpenSSH agent and ~/.ssh/config on all platforms (especially Windows).
750        if Command::new("scp")
751            .arg("-S")
752            .arg("ssh")
753            .arg("--")
754            // pass a harmless flag; scp prints usage and exits non-zero, but if the
755            // binary exists the spawn itself succeeds which is all we need to check.
756            .output()
757            .is_ok()
758        {
759            // Confirm scp is a real binary by checking for the executable
760            if which_scp_exists() {
761                return SyncMethod::Scp;
762            }
763        }
764
765        // 4. Last resort: ssh2-based SFTP
766        SyncMethod::Sftp
767    }
768
769    /// Sync a single source.
770    ///
771    /// Syncs all configured paths from the source to the local mirror directory.
772    /// Individual path failures don't abort the entire sync.
773    pub fn sync_source(&self, source: &SourceDefinition) -> Result<SyncReport, SyncError> {
774        if !source.is_remote() {
775            return Err(SyncError::NoHost);
776        }
777
778        let host = source.host.as_ref().ok_or(SyncError::NoHost)?;
779
780        if source.paths.is_empty() {
781            return Err(SyncError::NoPaths);
782        }
783
784        source
785            .validate_structure()
786            .map_err(|e| SyncError::InvalidSource(e.to_string()))?;
787
788        let method = Self::detect_sync_method();
789        let mut report = SyncReport::new(&source.name, method);
790        let overall_start = Instant::now();
791
792        // Create the mirror directory
793        let mirror_dir = self.mirror_dir(&source.name);
794        prepare_local_sync_root(&self.local_store, &mirror_dir)
795            .map_err(|e| SyncError::CreateDirFailed(std::io::Error::other(e)))?;
796
797        // Pre-fetch remote home directory if any paths use tilde (avoids multiple SSH calls)
798        let remote_home = if source.paths.iter().enumerate().any(|(index, path)| {
799            path.starts_with('~') && validate_remote_sync_path_entry(index, path).is_ok()
800        }) {
801            match self.get_remote_home(host) {
802                Ok(home) => Some(home),
803                Err(e) => {
804                    tracing::warn!(host = %host, error = %e, "Failed to get remote home directory");
805                    None
806                }
807            }
808        } else {
809            None
810        };
811
812        // Proactively detect whether the remote runs openrsync (Apple's rsync,
813        // protocol 29) which rejects --secluded-args / -s.  We do this once per
814        // source (before the path loop) so every path benefits without additional
815        // SSH round-trips.  If the probe itself fails we fall back gracefully to
816        // the existing retry-on-rejection logic.
817        let remote_is_openrsync = if matches!(method, SyncMethod::Rsync | SyncMethod::WslRsync) {
818            probe_remote_rsync_is_openrsync(host, self.connection_timeout)
819        } else {
820            false
821        };
822        if remote_is_openrsync {
823            tracing::info!(
824                host = %host,
825                "remote rsync identified as openrsync; will omit --secluded-args / -s"
826            );
827        }
828
829        for (index, remote_path) in source.paths.iter().enumerate() {
830            if let Err(err) = validate_remote_sync_path_entry(index, remote_path) {
831                report.add_path_result(invalid_remote_sync_path_result(remote_path, err));
832                continue;
833            }
834
835            let result = match method {
836                SyncMethod::Rsync => self.sync_path_rsync(
837                    host,
838                    remote_path,
839                    &mirror_dir,
840                    remote_home.as_deref(),
841                    remote_is_openrsync,
842                ),
843                SyncMethod::WslRsync => {
844                    self.sync_path_wsl_rsync(host, remote_path, &mirror_dir, remote_home.as_deref())
845                }
846                SyncMethod::Scp => {
847                    self.sync_path_scp(host, remote_path, &mirror_dir, remote_home.as_deref())
848                }
849                SyncMethod::Sftp => {
850                    self.sync_path_sftp(host, remote_path, &mirror_dir, remote_home.as_deref())
851                }
852            };
853            report.add_path_result(result);
854        }
855
856        report.total_duration_ms = overall_start.elapsed().as_millis() as u64;
857        Ok(report)
858    }
859
860    /// Sync all remote sources from a config.
861    ///
862    /// Continues even if individual sources fail.
863    pub fn sync_all(
864        &self,
865        sources: impl Iterator<Item = impl std::borrow::Borrow<SourceDefinition>>,
866    ) -> Vec<SyncReport> {
867        sources
868            .map(|source| {
869                let source = source.borrow();
870                self.sync_source(source)
871                    .unwrap_or_else(|e| SyncReport::failed(&source.name, e))
872            })
873            .collect()
874    }
875
876    /// Sync a single path using rsync.
877    ///
878    /// **IMPORTANT**: Uses rsync WITHOUT --delete for safe additive syncs.
879    ///
880    /// The `remote_home` parameter should be pre-fetched via `get_remote_home()` to avoid
881    /// repeated SSH calls for each path.
882    ///
883    /// `remote_is_openrsync` should be `true` when the remote host runs Apple's
884    /// openrsync (protocol 29).  When set, the `--secluded-args` / `-s` flag is
885    /// omitted entirely rather than relying on the rejection-retry fallback.
886    fn sync_path_rsync(
887        &self,
888        host: &str,
889        remote_path: &str,
890        dest_dir: &Path,
891        remote_home: Option<&str>,
892        remote_is_openrsync: bool,
893    ) -> PathSyncResult {
894        let start = Instant::now();
895        if remote_path.starts_with('~') && remote_home.is_none() {
896            let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
897            return PathSyncResult {
898                remote_path: remote_path.to_string(),
899                local_path,
900                success: false,
901                error: Some(
902                    "Cannot expand '~' in remote path; failed to determine remote home directory"
903                        .to_string(),
904                ),
905                duration_ms: start.elapsed().as_millis() as u64,
906                ..Default::default()
907            };
908        }
909
910        // Expand ~ using pre-fetched home directory (no SSH call here)
911        let expanded_path = Self::expand_tilde_with_home(remote_path, remote_home);
912
913        // If tilde expansion failed (no remote_home provided), log a warning
914        if remote_path.starts_with('~') && expanded_path == remote_path {
915            tracing::warn!(
916                remote_path = %remote_path,
917                "Could not expand tilde in path (remote home directory not available)"
918            );
919        }
920
921        // Convert remote path to safe local directory name
922        // Use raw remote_path for stability (independent of home expansion success)
923        let safe_name = path_to_safe_dirname(remote_path);
924        let local_path = dest_dir.join(&safe_name);
925
926        // Create local directory without following any pre-existing mirror symlink.
927        if let Err(e) = prepare_local_sync_container(dest_dir, &local_path) {
928            return PathSyncResult {
929                remote_path: remote_path.to_string(),
930                local_path: local_path.clone(),
931                success: false,
932                error: Some(e),
933                duration_ms: start.elapsed().as_millis() as u64,
934                ..Default::default()
935            };
936        }
937
938        // Build rsync command
939        // NOTE: NO --delete flag! Safe additive sync only.
940        //
941        // When the remote is openrsync (Apple's rsync, protocol 29) we must not
942        // pass --secluded-args / -s because openrsync doesn't understand that
943        // protocol extension.  We honour the proactive detection result
944        // (`remote_is_openrsync`) first; the reactive rejection-retry below
945        // remains as a belt-and-suspenders fallback for cases the probe missed.
946        let arg_protection = if remote_is_openrsync {
947            RsyncArgProtection::None
948        } else {
949            detect_rsync_arg_protection()
950        };
951        let protect_args_supported = arg_protection.is_supported();
952        let remote_spec = remote_spec_for_rsync(host, &expanded_path, protect_args_supported);
953        let ssh_opts = strict_ssh_command_for_rsync(self.connection_timeout);
954
955        let local_path_str = match local_path.to_str() {
956            Some(s) => s,
957            None => {
958                return PathSyncResult {
959                    remote_path: remote_path.to_string(),
960                    local_path,
961                    success: false,
962                    error: Some("Local path contains invalid UTF-8".to_string()),
963                    duration_ms: start.elapsed().as_millis() as u64,
964                    ..Default::default()
965                };
966            }
967        };
968
969        tracing::debug!(
970            host = %host,
971            remote_path = %expanded_path,
972            local_path = %local_path.display(),
973            "starting rsync"
974        );
975
976        let timeout_str = self.transfer_timeout.to_string();
977        let output = match run_rsync_command(
978            &timeout_str,
979            &ssh_opts,
980            &remote_spec,
981            local_path_str,
982            arg_protection,
983        ) {
984            Ok(o) => o,
985            Err(e) => {
986                return PathSyncResult {
987                    remote_path: remote_path.to_string(),
988                    local_path,
989                    success: false,
990                    error: Some(format!("Failed to execute rsync: {}", e)),
991                    duration_ms: start.elapsed().as_millis() as u64,
992                    ..Default::default()
993                };
994            }
995        };
996
997        let mut duration_ms = start.elapsed().as_millis() as u64;
998        let mut status_success = output.status.success();
999        let mut stdout = String::from_utf8_lossy(&output.stdout).into_owned();
1000        let mut stderr = String::from_utf8_lossy(&output.stderr).into_owned();
1001
1002        if !status_success
1003            && arg_protection.is_supported()
1004            && rsync_arg_protection_remote_rejected(&stderr)
1005        {
1006            tracing::warn!(
1007                host = %host,
1008                remote_path = %expanded_path,
1009                protection_flag = ?arg_protection.flag(),
1010                "remote rsync rejected argument-protection flag; retrying with shell-quoted remote path"
1011            );
1012
1013            let fallback_remote_spec = remote_spec_for_rsync(host, &expanded_path, false);
1014            let retry = match run_rsync_command(
1015                &timeout_str,
1016                &ssh_opts,
1017                &fallback_remote_spec,
1018                local_path_str,
1019                RsyncArgProtection::None,
1020            ) {
1021                Ok(o) => o,
1022                Err(e) => {
1023                    return PathSyncResult {
1024                        remote_path: remote_path.to_string(),
1025                        local_path,
1026                        success: false,
1027                        error: Some(format!(
1028                            "Failed to execute rsync fallback without argument protection: {}",
1029                            e
1030                        )),
1031                        duration_ms: start.elapsed().as_millis() as u64,
1032                        ..Default::default()
1033                    };
1034                }
1035            };
1036
1037            duration_ms = start.elapsed().as_millis() as u64;
1038            status_success = retry.status.success();
1039            stdout = String::from_utf8_lossy(&retry.stdout).into_owned();
1040            stderr = String::from_utf8_lossy(&retry.stderr).into_owned();
1041        }
1042
1043        if !status_success {
1044            // Check for specific error types
1045            let error_msg = if stderr.contains("Connection refused")
1046                || stderr.contains("Connection timed out")
1047            {
1048                format!("SSH connection failed: {}", stderr.trim())
1049            } else if is_host_key_verification_failure(&stderr) {
1050                host_key_verification_error(host)
1051            } else if stderr.contains("No such file or directory") {
1052                format!("Remote path not found: {}", expanded_path)
1053            } else if stderr.contains("Permission denied") {
1054                format!("Permission denied: {}", stderr.trim())
1055            } else {
1056                format!("rsync failed: {}", stderr.trim())
1057            };
1058
1059            tracing::warn!(
1060                host = %host,
1061                remote_path = %expanded_path,
1062                error = %error_msg,
1063                "rsync failed"
1064            );
1065
1066            return PathSyncResult {
1067                remote_path: remote_path.to_string(),
1068                local_path,
1069                success: false,
1070                error: Some(error_msg),
1071                duration_ms,
1072                ..Default::default()
1073            };
1074        }
1075
1076        // Parse stats from rsync output
1077        let stats = parse_rsync_stats(&stdout);
1078
1079        tracing::info!(
1080            host = %host,
1081            remote_path = %expanded_path,
1082            files = stats.files_transferred,
1083            bytes = stats.bytes_transferred,
1084            duration_ms,
1085            "rsync completed"
1086        );
1087
1088        PathSyncResult {
1089            remote_path: remote_path.to_string(),
1090            local_path,
1091            files_transferred: stats.files_transferred,
1092            bytes_transferred: stats.bytes_transferred,
1093            success: true,
1094            error: None,
1095            duration_ms,
1096        }
1097    }
1098
1099    /// Sync a single path using rsync invoked through WSL (`wsl rsync …`).
1100    ///
1101    /// Used on Windows when native rsync is absent but WSL with rsync is available.
1102    /// WSL paths use the `\\wsl$\…` UNC convention for the local destination.
1103    fn sync_path_wsl_rsync(
1104        &self,
1105        host: &str,
1106        remote_path: &str,
1107        dest_dir: &Path,
1108        remote_home: Option<&str>,
1109    ) -> PathSyncResult {
1110        let start = Instant::now();
1111
1112        if remote_path.starts_with('~') && remote_home.is_none() {
1113            let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
1114            return PathSyncResult {
1115                remote_path: remote_path.to_string(),
1116                local_path,
1117                success: false,
1118                error: Some(
1119                    "Cannot expand '~' in remote path; failed to determine remote home directory"
1120                        .to_string(),
1121                ),
1122                duration_ms: start.elapsed().as_millis() as u64,
1123                ..Default::default()
1124            };
1125        }
1126
1127        let expanded_path = Self::expand_tilde_with_home(remote_path, remote_home);
1128        let safe_name = path_to_safe_dirname(remote_path);
1129        let local_path = dest_dir.join(&safe_name);
1130
1131        if let Err(e) = prepare_local_sync_container(dest_dir, &local_path) {
1132            return PathSyncResult {
1133                remote_path: remote_path.to_string(),
1134                local_path,
1135                success: false,
1136                error: Some(e),
1137                duration_ms: start.elapsed().as_millis() as u64,
1138                ..Default::default()
1139            };
1140        }
1141
1142        let local_path_str = match local_path.to_str() {
1143            Some(s) => s,
1144            None => {
1145                return PathSyncResult {
1146                    remote_path: remote_path.to_string(),
1147                    local_path,
1148                    success: false,
1149                    error: Some("Local path contains invalid UTF-8".to_string()),
1150                    duration_ms: start.elapsed().as_millis() as u64,
1151                    ..Default::default()
1152                };
1153            }
1154        };
1155
1156        // Convert Windows path to a WSL-accessible path.
1157        // WSL can access Windows paths via /mnt/<drive>/... conventions.
1158        // E.g. C:\Users\george\AppData\... → /mnt/c/Users/george/AppData/...
1159        let wsl_dest = windows_path_to_wsl(local_path_str);
1160
1161        let remote_spec = remote_spec_for_rsync(host, &expanded_path, true);
1162        let ssh_opts = strict_ssh_command_for_rsync(self.connection_timeout);
1163        let timeout_str = self.transfer_timeout.to_string();
1164
1165        let mut cmd = Command::new("wsl");
1166        cmd.args([
1167            "rsync",
1168            "-avz",
1169            "--links",
1170            "--safe-links",
1171            "--stats",
1172            "--partial",
1173        ]);
1174        // WSL rsync is the real rsync (not openrsync), so --protect-args is safe.
1175        cmd.arg("--protect-args");
1176        cmd.args([
1177            "--timeout",
1178            &timeout_str,
1179            "-e",
1180            &ssh_opts,
1181            "--",
1182            &remote_spec,
1183            &wsl_dest,
1184        ]);
1185
1186        tracing::debug!(
1187            host = %host,
1188            remote_path = %expanded_path,
1189            local_path = %local_path.display(),
1190            wsl_dest = %wsl_dest,
1191            "starting wsl rsync"
1192        );
1193
1194        let output = match cmd.output() {
1195            Ok(o) => o,
1196            Err(e) => {
1197                return PathSyncResult {
1198                    remote_path: remote_path.to_string(),
1199                    local_path,
1200                    success: false,
1201                    error: Some(format!("Failed to execute wsl rsync: {}", e)),
1202                    duration_ms: start.elapsed().as_millis() as u64,
1203                    ..Default::default()
1204                };
1205            }
1206        };
1207
1208        let duration_ms = start.elapsed().as_millis() as u64;
1209        let stdout = String::from_utf8_lossy(&output.stdout);
1210        let stderr = String::from_utf8_lossy(&output.stderr);
1211
1212        if !output.status.success() {
1213            let error_msg = if stderr.contains("Connection refused")
1214                || stderr.contains("Connection timed out")
1215            {
1216                format!("SSH connection failed: {}", stderr.trim())
1217            } else if is_host_key_verification_failure(&stderr) {
1218                host_key_verification_error(host)
1219            } else if stderr.contains("No such file or directory") {
1220                format!("Remote path not found: {}", expanded_path)
1221            } else if stderr.contains("Permission denied") {
1222                format!("Permission denied: {}", stderr.trim())
1223            } else {
1224                format!("wsl rsync failed: {}", stderr.trim())
1225            };
1226
1227            tracing::warn!(
1228                host = %host,
1229                remote_path = %expanded_path,
1230                error = %error_msg,
1231                "wsl rsync failed"
1232            );
1233
1234            return PathSyncResult {
1235                remote_path: remote_path.to_string(),
1236                local_path,
1237                success: false,
1238                error: Some(error_msg),
1239                duration_ms,
1240                ..Default::default()
1241            };
1242        }
1243
1244        let stats = parse_rsync_stats(&stdout);
1245
1246        tracing::info!(
1247            host = %host,
1248            remote_path = %expanded_path,
1249            files = stats.files_transferred,
1250            bytes = stats.bytes_transferred,
1251            duration_ms,
1252            "wsl rsync completed"
1253        );
1254
1255        PathSyncResult {
1256            remote_path: remote_path.to_string(),
1257            local_path,
1258            files_transferred: stats.files_transferred,
1259            bytes_transferred: stats.bytes_transferred,
1260            success: true,
1261            error: None,
1262            duration_ms,
1263        }
1264    }
1265
1266    /// Sync a single path using SCP after a physical `find -P` regular-file listing.
1267    ///
1268    /// This method delegates all authentication to the native system `scp`/`ssh`
1269    /// binary, which correctly reads `~/.ssh/config`, the OpenSSH agent (including
1270    /// the Windows OpenSSH agent on Windows), and all standard key locations.
1271    ///
1272    /// This avoids the "No valid authentication method found" failure that occurs
1273    /// in the `ssh2`-based SFTP path on Windows, where the library does not
1274    /// integrate with the Windows OpenSSH agent (`ssh-agent.exe`).
1275    fn sync_path_scp(
1276        &self,
1277        host: &str,
1278        remote_path: &str,
1279        dest_dir: &Path,
1280        remote_home: Option<&str>,
1281    ) -> PathSyncResult {
1282        let start = Instant::now();
1283
1284        if remote_path.starts_with('~') && remote_home.is_none() {
1285            let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
1286            return PathSyncResult {
1287                remote_path: remote_path.to_string(),
1288                local_path,
1289                success: false,
1290                error: Some(
1291                    "Cannot expand '~' in remote path; failed to determine remote home directory"
1292                        .to_string(),
1293                ),
1294                duration_ms: start.elapsed().as_millis() as u64,
1295                ..Default::default()
1296            };
1297        }
1298
1299        let expanded_path = Self::expand_tilde_with_home(remote_path, remote_home);
1300        let safe_name = path_to_safe_dirname(remote_path);
1301        let local_path = dest_dir.join(&safe_name);
1302
1303        if let Err(e) = prepare_local_sync_container(dest_dir, &local_path) {
1304            return PathSyncResult {
1305                remote_path: remote_path.to_string(),
1306                local_path,
1307                success: false,
1308                error: Some(e),
1309                duration_ms: start.elapsed().as_millis() as u64,
1310                ..Default::default()
1311            };
1312        }
1313
1314        // `scp -r` follows symlinks on some OpenSSH paths. Enumerate only regular
1315        // files with physical traversal first, then copy those files individually.
1316        let connect_timeout = self.connection_timeout.to_string();
1317        let find_command = remote_find_regular_files_command(&expanded_path);
1318
1319        tracing::debug!(
1320            host = %host,
1321            remote_path = %expanded_path,
1322            local_path = %local_path.display(),
1323            "listing regular files for scp sync"
1324        );
1325
1326        let timeout_secs = self.connection_timeout.max(1);
1327        let mut cmd = Command::new("ssh");
1328        cmd.args(strict_ssh_cli_tokens(timeout_secs))
1329            .arg("--")
1330            .arg(host)
1331            .arg(&find_command)
1332            .stdout(Stdio::piped())
1333            .stderr(Stdio::piped());
1334        configure_child_process_group(&mut cmd);
1335
1336        let output = match cmd.spawn().and_then(|child| {
1337            wait_for_child_output_with_timeout(child, Duration::from_secs(timeout_secs))
1338        }) {
1339            Ok(Some(o)) => o,
1340            Ok(None) => {
1341                return PathSyncResult {
1342                    remote_path: remote_path.to_string(),
1343                    local_path,
1344                    success: false,
1345                    error: Some(format!(
1346                        "SSH file listing timed out after {timeout_secs} seconds"
1347                    )),
1348                    duration_ms: start.elapsed().as_millis() as u64,
1349                    ..Default::default()
1350                };
1351            }
1352            Err(e) => {
1353                return PathSyncResult {
1354                    remote_path: remote_path.to_string(),
1355                    local_path,
1356                    success: false,
1357                    error: Some(format!("Failed to execute ssh file listing: {}", e)),
1358                    duration_ms: start.elapsed().as_millis() as u64,
1359                    ..Default::default()
1360                };
1361            }
1362        };
1363
1364        let stderr = String::from_utf8_lossy(&output.stderr);
1365        if !output.status.success() {
1366            let error_msg = if stderr.contains("Connection refused")
1367                || stderr.contains("Connection timed out")
1368            {
1369                format!("SSH connection failed: {}", stderr.trim())
1370            } else if is_host_key_verification_failure(&stderr) {
1371                host_key_verification_error(host)
1372            } else if stderr.contains("No such file or directory") {
1373                format!("Remote path not found: {}", expanded_path)
1374            } else if stderr.contains("Permission denied") {
1375                format!("Permission denied: {}", stderr.trim())
1376            } else {
1377                format!("Remote file listing failed: {}", stderr.trim())
1378            };
1379
1380            tracing::warn!(
1381                host = %host,
1382                remote_path = %expanded_path,
1383                error = %error_msg,
1384                "scp file listing failed"
1385            );
1386
1387            return PathSyncResult {
1388                remote_path: remote_path.to_string(),
1389                local_path,
1390                success: false,
1391                error: Some(error_msg),
1392                duration_ms: start.elapsed().as_millis() as u64,
1393                ..Default::default()
1394            };
1395        }
1396
1397        let remote_files = parse_null_terminated_utf8_paths(&output.stdout);
1398        let remote_root = Path::new(&expanded_path);
1399        let leaf_name = Path::new(remote_path)
1400            .file_name()
1401            .and_then(|n| n.to_str())
1402            .unwrap_or("remote");
1403        let mut files_transferred = 0u64;
1404        let mut bytes_transferred = 0u64;
1405
1406        for remote_file in remote_files {
1407            let remote_file_path = Path::new(&remote_file);
1408            let Some(local_file) = remote_file_to_safe_local_path(
1409                remote_root,
1410                remote_file_path,
1411                &local_path,
1412                leaf_name,
1413            ) else {
1414                tracing::warn!(
1415                    remote_path = %remote_file,
1416                    root = %expanded_path,
1417                    "skipping scp file outside listed root"
1418                );
1419                continue;
1420            };
1421
1422            if let Err(e) = reject_local_symlink_below_root(&local_path, &local_file) {
1423                return PathSyncResult {
1424                    remote_path: remote_path.to_string(),
1425                    local_path,
1426                    success: false,
1427                    error: Some(e),
1428                    duration_ms: start.elapsed().as_millis() as u64,
1429                    ..Default::default()
1430                };
1431            }
1432
1433            if let Some(parent) = local_file.parent() {
1434                if let Err(e) = std::fs::create_dir_all(parent) {
1435                    return PathSyncResult {
1436                        remote_path: remote_path.to_string(),
1437                        local_path,
1438                        success: false,
1439                        error: Some(format!("Failed to create {}: {}", parent.display(), e)),
1440                        duration_ms: start.elapsed().as_millis() as u64,
1441                        ..Default::default()
1442                    };
1443                }
1444
1445                if let Err(e) = reject_local_symlink_below_root(&local_path, parent) {
1446                    return PathSyncResult {
1447                        remote_path: remote_path.to_string(),
1448                        local_path,
1449                        success: false,
1450                        error: Some(e),
1451                        duration_ms: start.elapsed().as_millis() as u64,
1452                        ..Default::default()
1453                    };
1454                }
1455            }
1456
1457            if let Err(e) = reject_local_symlink_below_root(&local_path, &local_file) {
1458                return PathSyncResult {
1459                    remote_path: remote_path.to_string(),
1460                    local_path,
1461                    success: false,
1462                    error: Some(e),
1463                    duration_ms: start.elapsed().as_millis() as u64,
1464                    ..Default::default()
1465                };
1466            }
1467
1468            let temp_path =
1469                unique_atomic_sidecar_path(&local_file, "download", "cass-sync-scp-download");
1470            let Some(temp_path_str) = temp_path.to_str() else {
1471                return PathSyncResult {
1472                    remote_path: remote_path.to_string(),
1473                    local_path,
1474                    success: false,
1475                    error: Some("Local path contains invalid UTF-8".to_string()),
1476                    duration_ms: start.elapsed().as_millis() as u64,
1477                    ..Default::default()
1478                };
1479            };
1480            if let Err(e) = std::fs::OpenOptions::new()
1481                .write(true)
1482                .create_new(true)
1483                .open(&temp_path)
1484                .and_then(|file| file.sync_all())
1485            {
1486                return PathSyncResult {
1487                    remote_path: remote_path.to_string(),
1488                    local_path,
1489                    success: false,
1490                    error: Some(format!("Failed to create {}: {}", temp_path.display(), e)),
1491                    duration_ms: start.elapsed().as_millis() as u64,
1492                    ..Default::default()
1493                };
1494            }
1495
1496            let remote_spec = remote_spec_for_scp(host, &remote_file);
1497            let mut cmd = Command::new("scp");
1498            cmd.args([
1499                "-B",
1500                "-o",
1501                &format!("ConnectTimeout={}", connect_timeout),
1502                "-o",
1503                "ServerAliveInterval=15",
1504                "-o",
1505                "ServerAliveCountMax=3",
1506                "-o",
1507                "StrictHostKeyChecking=yes",
1508                "--",
1509                &remote_spec,
1510                temp_path_str,
1511            ]);
1512
1513            let output = match cmd.output() {
1514                Ok(o) => o,
1515                Err(e) => {
1516                    return PathSyncResult {
1517                        remote_path: remote_path.to_string(),
1518                        local_path,
1519                        success: false,
1520                        error: Some(format!("Failed to execute scp: {}", e)),
1521                        duration_ms: start.elapsed().as_millis() as u64,
1522                        ..Default::default()
1523                    };
1524                }
1525            };
1526
1527            if !output.status.success() {
1528                let _ = std::fs::remove_file(&temp_path);
1529                let stderr = String::from_utf8_lossy(&output.stderr);
1530                let error_msg = if is_host_key_verification_failure(&stderr) {
1531                    host_key_verification_error(host)
1532                } else if stderr.contains("Permission denied") {
1533                    format!("Permission denied: {}", stderr.trim())
1534                } else {
1535                    format!("scp failed: {}", stderr.trim())
1536                };
1537
1538                tracing::warn!(
1539                    host = %host,
1540                    remote_path = %remote_file,
1541                    error = %error_msg,
1542                    "scp file transfer failed"
1543                );
1544
1545                return PathSyncResult {
1546                    remote_path: remote_path.to_string(),
1547                    local_path,
1548                    success: false,
1549                    error: Some(error_msg),
1550                    duration_ms: start.elapsed().as_millis() as u64,
1551                    ..Default::default()
1552                };
1553            }
1554
1555            files_transferred += 1;
1556            if let Err(e) = sync_file_path(&temp_path) {
1557                return PathSyncResult {
1558                    remote_path: remote_path.to_string(),
1559                    local_path,
1560                    success: false,
1561                    error: Some(format!("Failed to sync {}: {}", temp_path.display(), e)),
1562                    duration_ms: start.elapsed().as_millis() as u64,
1563                    ..Default::default()
1564                };
1565            }
1566            if let Ok(metadata) = std::fs::metadata(&temp_path) {
1567                bytes_transferred = bytes_transferred.saturating_add(metadata.len());
1568            }
1569            if let Err(e) = replace_file_from_temp(&temp_path, &local_file) {
1570                return PathSyncResult {
1571                    remote_path: remote_path.to_string(),
1572                    local_path,
1573                    success: false,
1574                    error: Some(format!(
1575                        "Failed to publish {} to {}: {}",
1576                        temp_path.display(),
1577                        local_file.display(),
1578                        e
1579                    )),
1580                    duration_ms: start.elapsed().as_millis() as u64,
1581                    ..Default::default()
1582                };
1583            }
1584        }
1585
1586        let duration_ms = start.elapsed().as_millis() as u64;
1587
1588        tracing::info!(
1589            host = %host,
1590            remote_path = %expanded_path,
1591            files = files_transferred,
1592            bytes = bytes_transferred,
1593            duration_ms,
1594            "scp sync completed"
1595        );
1596
1597        PathSyncResult {
1598            remote_path: remote_path.to_string(),
1599            local_path,
1600            files_transferred,
1601            bytes_transferred,
1602            success: true,
1603            error: None,
1604            duration_ms,
1605        }
1606    }
1607
1608    /// Sync a single path using SFTP (fallback when rsync unavailable).
1609    ///
1610    /// Uses the ssh2 crate for SFTP transfers. Authenticates via SSH agent
1611    /// or key file from SSH config.
1612    fn sync_path_sftp(
1613        &self,
1614        host: &str,
1615        remote_path: &str,
1616        dest_dir: &Path,
1617        remote_home: Option<&str>,
1618    ) -> PathSyncResult {
1619        let start = Instant::now();
1620        if remote_path.starts_with('~') && remote_home.is_none() {
1621            let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
1622            return PathSyncResult {
1623                remote_path: remote_path.to_string(),
1624                local_path,
1625                success: false,
1626                error: Some(
1627                    "Cannot expand '~' in remote path; failed to determine remote home directory"
1628                        .to_string(),
1629                ),
1630                duration_ms: start.elapsed().as_millis() as u64,
1631                ..Default::default()
1632            };
1633        }
1634        let expanded_path = Self::expand_tilde_with_home(remote_path, remote_home);
1635        // Use raw remote_path for stability (independent of home expansion success)
1636        let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
1637
1638        // Create local directory without following any pre-existing mirror symlink.
1639        if let Err(e) = prepare_local_sync_container(dest_dir, &local_path) {
1640            return PathSyncResult {
1641                remote_path: remote_path.to_string(),
1642                local_path,
1643                success: false,
1644                error: Some(e),
1645                duration_ms: start.elapsed().as_millis() as u64,
1646                ..Default::default()
1647            };
1648        }
1649
1650        // Parse host to extract user if present (user@host format)
1651        let (ssh_user, ssh_host) = parse_ssh_host(host);
1652
1653        // Look up host in SSH config for connection details
1654        // First try matching by SSH config alias (Host line), then by actual hostname
1655        let ssh_config = discover_ssh_hosts();
1656        let host_config = ssh_config.iter().find(|h| h.name == ssh_host).or_else(|| {
1657            ssh_config
1658                .iter()
1659                .find(|h| h.hostname.as_deref() == Some(ssh_host))
1660        });
1661
1662        // Determine connection parameters
1663        let hostname = host_config
1664            .and_then(|h| h.hostname.as_deref())
1665            .unwrap_or(ssh_host);
1666        let port = host_config.and_then(|h| h.port).unwrap_or(22);
1667        // Resolve username deterministically; never guess with a sentinel value.
1668        let username = match first_nonblank_username([
1669            ssh_user,
1670            host_config.and_then(|h| h.user.as_deref()),
1671        ])
1672        .or_else(|| env_username("USER"))
1673        .or_else(|| env_username("LOGNAME"))
1674        {
1675            Some(user) => user,
1676            None => {
1677                return PathSyncResult {
1678                    remote_path: remote_path.to_string(),
1679                    local_path,
1680                    success: false,
1681                    error: Some(format!(
1682                        "Unable to determine SSH username for host '{}' (missing/blank user@host, SSH config user, USER, and LOGNAME)",
1683                        host
1684                    )),
1685                    duration_ms: start.elapsed().as_millis() as u64,
1686                    ..Default::default()
1687                };
1688            }
1689        };
1690        let identity_file = host_config.and_then(|h| h.identity_file.as_deref());
1691
1692        tracing::debug!(
1693            hostname = %hostname,
1694            port,
1695            username = %username,
1696            identity_file = ?identity_file,
1697            remote_path = %expanded_path,
1698            "SFTP connection parameters"
1699        );
1700
1701        // Connect via TCP with connection timeout
1702        let conn_timeout = std::time::Duration::from_secs(self.connection_timeout);
1703        let addr = format!("{}:{}", hostname, port);
1704        let sock_addr: std::net::SocketAddr = match addr.parse().or_else(|_| {
1705            // Resolve hostname to socket address
1706            use std::net::ToSocketAddrs;
1707            (hostname, port)
1708                .to_socket_addrs()
1709                .ok()
1710                .and_then(|mut addrs| addrs.next())
1711                .ok_or(std::io::Error::new(
1712                    std::io::ErrorKind::InvalidInput,
1713                    "cannot resolve hostname",
1714                ))
1715        }) {
1716            Ok(a) => a,
1717            Err(e) => {
1718                return PathSyncResult {
1719                    remote_path: remote_path.to_string(),
1720                    local_path,
1721                    success: false,
1722                    error: Some(format!("DNS resolution failed for {hostname}:{port}: {e}")),
1723                    duration_ms: start.elapsed().as_millis() as u64,
1724                    ..Default::default()
1725                };
1726            }
1727        };
1728        let tcp = match TcpStream::connect_timeout(&sock_addr, conn_timeout) {
1729            Ok(t) => t,
1730            Err(e) => {
1731                return PathSyncResult {
1732                    remote_path: remote_path.to_string(),
1733                    local_path,
1734                    success: false,
1735                    error: Some(format!(
1736                        "TCP connection failed to {}:{}: {}",
1737                        hostname, port, e
1738                    )),
1739                    duration_ms: start.elapsed().as_millis() as u64,
1740                    ..Default::default()
1741                };
1742            }
1743        };
1744
1745        // Set TCP read/write timeout (use transfer_timeout, not connection_timeout)
1746        let timeout = std::time::Duration::from_secs(self.transfer_timeout);
1747        if let Err(e) = tcp.set_read_timeout(Some(timeout)) {
1748            tracing::warn!("Failed to set TCP read timeout: {}", e);
1749        }
1750        if let Err(e) = tcp.set_write_timeout(Some(timeout)) {
1751            tracing::warn!("Failed to set TCP write timeout: {}", e);
1752        }
1753        let tcp_shutdown = tcp.try_clone().ok();
1754
1755        // Create SSH session
1756        let mut session = match Session::new() {
1757            Ok(s) => s,
1758            Err(e) => {
1759                let _ = tcp.shutdown(Shutdown::Both);
1760                return PathSyncResult {
1761                    remote_path: remote_path.to_string(),
1762                    local_path,
1763                    success: false,
1764                    error: Some(format!("Failed to create SSH session: {}", e)),
1765                    duration_ms: start.elapsed().as_millis() as u64,
1766                    ..Default::default()
1767                };
1768            }
1769        };
1770
1771        session.set_tcp_stream(tcp);
1772        let close_connections = |session: &mut Session, reason: &str| {
1773            let _ = session.disconnect(None, reason, None);
1774            if let Some(stream) = tcp_shutdown.as_ref() {
1775                let _ = stream.shutdown(Shutdown::Both);
1776            }
1777        };
1778
1779        if let Err(e) = session.handshake() {
1780            close_connections(&mut session, "handshake failed");
1781            return PathSyncResult {
1782                remote_path: remote_path.to_string(),
1783                local_path,
1784                success: false,
1785                error: Some(format!("SSH handshake failed: {}", e)),
1786                duration_ms: start.elapsed().as_millis() as u64,
1787                ..Default::default()
1788            };
1789        }
1790
1791        // Authenticate - try agent first, then key file
1792        if let Err(e) = self.authenticate_ssh(&session, &username, identity_file) {
1793            close_connections(&mut session, "authentication failed");
1794            return PathSyncResult {
1795                remote_path: remote_path.to_string(),
1796                local_path,
1797                success: false,
1798                error: Some(format!("SSH authentication failed: {}", e)),
1799                duration_ms: start.elapsed().as_millis() as u64,
1800                ..Default::default()
1801            };
1802        }
1803
1804        // Open SFTP session
1805        let sftp = match session.sftp() {
1806            Ok(s) => s,
1807            Err(e) => {
1808                close_connections(&mut session, "sftp open failed");
1809                return PathSyncResult {
1810                    remote_path: remote_path.to_string(),
1811                    local_path,
1812                    success: false,
1813                    error: Some(format!("Failed to open SFTP session: {}", e)),
1814                    duration_ms: start.elapsed().as_millis() as u64,
1815                    ..Default::default()
1816                };
1817            }
1818        };
1819
1820        tracing::info!(
1821            host = %host,
1822            remote_path = %expanded_path,
1823            local_path = %local_path.display(),
1824            "starting SFTP sync"
1825        );
1826
1827        // Recursively download the remote path
1828        let mut files_transferred = 0u64;
1829        let mut bytes_transferred = 0u64;
1830
1831        // For consistency with rsync and scp, we should create a subdirectory
1832        // with the remote path's leaf name inside the container directory.
1833        let leaf_name = Path::new(remote_path)
1834            .file_name()
1835            .and_then(|n| n.to_str())
1836            .unwrap_or("remote");
1837        let target_local_path = local_path.join(leaf_name);
1838
1839        if let Err(e) = self.sftp_download_recursive(
1840            &sftp,
1841            Path::new(&expanded_path),
1842            &target_local_path,
1843            &local_path,
1844            &mut files_transferred,
1845            &mut bytes_transferred,
1846        ) {
1847            close_connections(&mut session, "sftp download failed");
1848            return PathSyncResult {
1849                remote_path: remote_path.to_string(),
1850                local_path,
1851                files_transferred,
1852                bytes_transferred,
1853                success: false,
1854                error: Some(format!("SFTP download failed: {}", e)),
1855                duration_ms: start.elapsed().as_millis() as u64,
1856            };
1857        }
1858
1859        let duration_ms = start.elapsed().as_millis() as u64;
1860
1861        tracing::info!(
1862            host = %host,
1863            remote_path = %expanded_path,
1864            files = files_transferred,
1865            bytes = bytes_transferred,
1866            duration_ms,
1867            "SFTP sync completed"
1868        );
1869
1870        close_connections(&mut session, "sync complete");
1871        PathSyncResult {
1872            remote_path: remote_path.to_string(),
1873            local_path,
1874            files_transferred,
1875            bytes_transferred,
1876            success: true,
1877            error: None,
1878            duration_ms,
1879        }
1880    }
1881
1882    /// Authenticate SSH session using agent or key file.
1883    fn authenticate_ssh(
1884        &self,
1885        session: &Session,
1886        username: &str,
1887        identity_file: Option<&str>,
1888    ) -> Result<(), String> {
1889        // Try SSH agent first
1890        if let Ok(mut agent) = session.agent()
1891            && agent.connect().is_ok()
1892            && agent.list_identities().is_ok()
1893        {
1894            for identity in agent.identities().unwrap_or_default() {
1895                if agent.userauth(username, &identity).is_ok() && session.authenticated() {
1896                    tracing::debug!("Authenticated via SSH agent");
1897                    return Ok(());
1898                }
1899            }
1900        }
1901
1902        // Try key file if specified
1903        if let Some(key_path) = identity_file {
1904            let key_path_expanded = expand_tilde_local(key_path);
1905            let key_path_buf = Path::new(&key_path_expanded);
1906
1907            if key_path_buf.exists()
1908                && session
1909                    .userauth_pubkey_file(username, None, key_path_buf, None)
1910                    .is_ok()
1911                && session.authenticated()
1912            {
1913                tracing::debug!(key = %key_path_buf.display(), "Authenticated via key file");
1914                return Ok(());
1915            }
1916        }
1917
1918        // Try default key locations
1919        if let Some(home) = dirs::home_dir() {
1920            for key_name in ["id_ed25519", "id_rsa", "id_ecdsa"] {
1921                let key_path = home.join(".ssh").join(key_name);
1922                if key_path.exists()
1923                    && session
1924                        .userauth_pubkey_file(username, None, &key_path, None)
1925                        .is_ok()
1926                    && session.authenticated()
1927                {
1928                    tracing::debug!(key = %key_path.display(), "Authenticated via default key");
1929                    return Ok(());
1930                }
1931            }
1932        }
1933
1934        Err(format!(
1935            "No valid authentication method found for user '{}'",
1936            username
1937        ))
1938    }
1939
1940    /// Recursively download a remote path via SFTP.
1941    fn sftp_download_recursive(
1942        &self,
1943        sftp: &Sftp,
1944        remote_path: &Path,
1945        local_path: &Path,
1946        local_root: &Path,
1947        files_transferred: &mut u64,
1948        bytes_transferred: &mut u64,
1949    ) -> Result<(), String> {
1950        // Use lstat so a remote symlink is classified as a symlink rather than
1951        // followed to a file or directory outside the configured source root.
1952        let stat = sftp
1953            .lstat(remote_path)
1954            .map_err(|e| format!("Failed to lstat {}: {}", remote_path.display(), e))?;
1955
1956        if sftp_file_stat_is_symlink(&stat) {
1957            tracing::warn!(
1958                path = %remote_path.display(),
1959                "Skipping remote symlink during SFTP sync"
1960            );
1961            return Ok(());
1962        }
1963
1964        if stat.is_dir() {
1965            // Create local directory for this directory item
1966            reject_local_symlink_below_root(local_root, local_path)?;
1967            std::fs::create_dir_all(local_path)
1968                .map_err(|e| format!("Failed to create {}: {}", local_path.display(), e))?;
1969            reject_local_symlink_below_root(local_root, local_path)?;
1970
1971            // List directory contents
1972            let entries = sftp
1973                .readdir(remote_path)
1974                .map_err(|e| format!("Failed to list {}: {}", remote_path.display(), e))?;
1975
1976            for (entry_path, _entry_stat) in entries {
1977                let Some(file_name) = sftp_entry_file_name(&entry_path, remote_path) else {
1978                    continue;
1979                };
1980
1981                let entry_stat = sftp
1982                    .lstat(&entry_path)
1983                    .map_err(|e| format!("Failed to lstat {}: {}", entry_path.display(), e))?;
1984                if sftp_file_stat_is_symlink(&entry_stat) {
1985                    tracing::warn!(
1986                        path = %entry_path.display(),
1987                        "Skipping remote symlink during SFTP sync"
1988                    );
1989                    continue;
1990                }
1991
1992                let local_entry = local_path.join(file_name);
1993
1994                if entry_stat.is_dir() {
1995                    // Recurse into subdirectory
1996                    self.sftp_download_recursive(
1997                        sftp,
1998                        &entry_path,
1999                        &local_entry,
2000                        local_root,
2001                        files_transferred,
2002                        bytes_transferred,
2003                    )?;
2004                } else if entry_stat.is_file() {
2005                    // Download file
2006                    if self.sftp_download_file(
2007                        sftp,
2008                        &entry_path,
2009                        &local_entry,
2010                        local_root,
2011                        bytes_transferred,
2012                    )? {
2013                        *files_transferred += 1;
2014                    }
2015                }
2016                // Skip symlinks and other types for safety
2017            }
2018        } else if stat.is_file() {
2019            // Ensure the parent directory exists
2020            if let Some(parent) = local_path.parent() {
2021                reject_local_symlink_below_root(local_root, parent)?;
2022                std::fs::create_dir_all(parent).map_err(|e| {
2023                    format!("Failed to create local dir {}: {}", parent.display(), e)
2024                })?;
2025                reject_local_symlink_below_root(local_root, parent)?;
2026            }
2027
2028            if self.sftp_download_file(
2029                sftp,
2030                remote_path,
2031                local_path,
2032                local_root,
2033                bytes_transferred,
2034            )? {
2035                *files_transferred += 1;
2036            }
2037        } else {
2038            // Not a regular file or directory (symlink, socket, etc.) - skip with warning
2039            tracing::warn!(
2040                path = %remote_path.display(),
2041                "Skipping remote path: not a regular file or directory"
2042            );
2043        }
2044
2045        Ok(())
2046    }
2047
2048    /// Download a single file via SFTP.
2049    fn sftp_download_file(
2050        &self,
2051        sftp: &Sftp,
2052        remote_path: &Path,
2053        local_path: &Path,
2054        local_root: &Path,
2055        bytes_transferred: &mut u64,
2056    ) -> Result<bool, String> {
2057        let stat = sftp
2058            .lstat(remote_path)
2059            .map_err(|e| format!("Failed to lstat {}: {}", remote_path.display(), e))?;
2060        if sftp_file_stat_is_symlink(&stat) {
2061            tracing::warn!(
2062                path = %remote_path.display(),
2063                "Skipping remote symlink during SFTP sync"
2064            );
2065            return Ok(false);
2066        }
2067        if !stat.is_file() {
2068            tracing::warn!(
2069                path = %remote_path.display(),
2070                "Skipping remote path: not a regular file"
2071            );
2072            return Ok(false);
2073        }
2074
2075        let mut remote_file = sftp
2076            .open(remote_path)
2077            .map_err(|e| format!("Failed to open {}: {}", remote_path.display(), e))?;
2078
2079        reject_local_symlink_below_root(local_root, local_path)?;
2080
2081        let temp_path = unique_atomic_sidecar_path(local_path, "download", "cass-sync-download");
2082        let mut local_file = std::fs::OpenOptions::new()
2083            .write(true)
2084            .create_new(true)
2085            .open(&temp_path)
2086            .map_err(|e| format!("Failed to create {}: {}", temp_path.display(), e))?;
2087
2088        // Transfer in chunks
2089        let mut buffer = [0u8; 32768]; // 32KB chunks
2090        loop {
2091            let bytes_read = remote_file
2092                .read(&mut buffer)
2093                .map_err(|e| format!("Failed to read {}: {}", remote_path.display(), e))?;
2094
2095            if bytes_read == 0 {
2096                break;
2097            }
2098
2099            local_file
2100                .write_all(&buffer[..bytes_read])
2101                .map_err(|e| format!("Failed to write {}: {}", local_path.display(), e))?;
2102
2103            *bytes_transferred += bytes_read as u64;
2104        }
2105
2106        tracing::trace!(
2107            remote = %remote_path.display(),
2108            local = %local_path.display(),
2109            "downloaded file"
2110        );
2111
2112        local_file
2113            .sync_all()
2114            .map_err(|e| format!("Failed to sync {}: {}", temp_path.display(), e))?;
2115        drop(local_file);
2116        replace_file_from_temp(&temp_path, local_path).map_err(|e| {
2117            format!(
2118                "Failed to publish {} to {}: {}",
2119                temp_path.display(),
2120                local_path.display(),
2121                e
2122            )
2123        })?;
2124
2125        Ok(true)
2126    }
2127}
2128
2129/// Resolve an SFTP entry's basename for local mirroring.
2130fn sftp_entry_file_name<'a>(entry_path: &'a Path, parent_path: &Path) -> Option<&'a str> {
2131    let Some(file_name) = entry_path.file_name() else {
2132        tracing::warn!(
2133            parent = %parent_path.display(),
2134            entry = ?entry_path,
2135            "Skipping SFTP entry without a file name"
2136        );
2137        return None;
2138    };
2139
2140    let Some(file_name) = file_name.to_str() else {
2141        tracing::warn!(
2142            parent = %parent_path.display(),
2143            entry = ?entry_path,
2144            "Skipping SFTP entry with non-UTF-8 file name"
2145        );
2146        return None;
2147    };
2148
2149    if file_name.is_empty() {
2150        tracing::warn!(
2151            parent = %parent_path.display(),
2152            entry = ?entry_path,
2153            "Skipping SFTP entry with empty file name"
2154        );
2155        return None;
2156    }
2157
2158    if file_name == "." || file_name == ".." {
2159        return None;
2160    }
2161
2162    Some(file_name)
2163}
2164
2165/// Check whether the `scp` executable exists on this system.
2166///
2167/// Uses a simple PATH search rather than running `scp` (which exits non-zero
2168/// when invoked without arguments on many platforms).
2169fn which_scp_exists() -> bool {
2170    std::env::var_os("PATH")
2171        .map(|path_var| {
2172            std::env::split_paths(&path_var).any(|dir| {
2173                let candidate = dir.join(if cfg!(target_os = "windows") {
2174                    "scp.exe"
2175                } else {
2176                    "scp"
2177                });
2178                candidate.is_file()
2179            })
2180        })
2181        .unwrap_or(false)
2182}
2183
2184/// Convert a Windows absolute path to a WSL-accessible `/mnt/<drive>/…` path.
2185///
2186/// E.g. `C:\Users\george\AppData\Roaming\cass` →
2187///      `/mnt/c/Users/george/AppData/Roaming/cass`
2188///
2189/// If the path does not look like a Windows drive path it is returned unchanged.
2190fn windows_path_to_wsl(path: &str) -> String {
2191    // Match "C:\..." or "C:/..."
2192    if path.len() >= 3 {
2193        let bytes = path.as_bytes();
2194        if bytes[1] == b':' && (bytes[2] == b'\\' || bytes[2] == b'/') {
2195            let drive = (bytes[0] as char).to_lowercase().next().unwrap_or('c');
2196            let rest = path[3..].replace('\\', "/");
2197            return format!("/mnt/{}/{}", drive, rest);
2198        }
2199    }
2200    path.to_string()
2201}
2202
2203/// Parse SSH host string into (optional_user, host).
2204///
2205/// Examples:
2206/// - "myserver" -> (None, "myserver")
2207/// - "user@myserver" -> (Some("user"), "myserver")
2208fn parse_ssh_host(host: &str) -> (Option<&str>, &str) {
2209    if let Some(at_pos) = host.find('@') {
2210        let user = &host[..at_pos];
2211        let hostname = &host[at_pos + 1..];
2212        (Some(user), hostname)
2213    } else {
2214        (None, host)
2215    }
2216}
2217
2218fn first_nonblank_username<'a>(
2219    candidates: impl IntoIterator<Item = Option<&'a str>>,
2220) -> Option<String> {
2221    candidates.into_iter().find_map(|candidate| {
2222        let trimmed = candidate?.trim();
2223        if trimmed.is_empty() {
2224            None
2225        } else {
2226            Some(trimmed.to_string())
2227        }
2228    })
2229}
2230
2231fn env_username(key: &str) -> Option<String> {
2232    dotenvy::var(key)
2233        .ok()
2234        .and_then(|value| first_nonblank_username([Some(value.as_str())]))
2235}
2236
2237/// Expand tilde in local paths.
2238fn expand_tilde_local(path: &str) -> String {
2239    if let Some(stripped) = path.strip_prefix("~/")
2240        && let Some(home) = dirs::home_dir()
2241    {
2242        return format!("{}/{}", home.display(), stripped);
2243    } else if path == "~"
2244        && let Some(home) = dirs::home_dir()
2245    {
2246        return home.display().to_string();
2247    }
2248    path.to_string()
2249}
2250
2251/// Convert a remote path to a safe directory name.
2252///
2253/// Sanitizes path by:
2254/// - Removing leading `~` and `/`
2255/// - Replacing path separators and spaces with underscores
2256/// - Removing parent directory references (`..`) to prevent traversal attacks
2257/// - Removing current directory references (`.`)
2258/// - Appending a stable hash to prevent collisions (e.g., "foo/bar" vs "foo_bar")
2259pub fn path_to_safe_dirname(path: &str) -> String {
2260    use std::path::{Component, Path};
2261
2262    let path_obj = Path::new(path);
2263    let mut parts: Vec<&str> = Vec::new();
2264
2265    for component in path_obj.components() {
2266        match component {
2267            Component::Normal(name) => {
2268                if let Some(s) = name.to_str() {
2269                    // Skip "~" (home directory marker) and empty/dot-only components
2270                    if !s.is_empty() && s != "." && s != "~" {
2271                        parts.push(s);
2272                    }
2273                }
2274            }
2275            // Skip all traversal components for security
2276            Component::ParentDir
2277            | Component::CurDir
2278            | Component::RootDir
2279            | Component::Prefix(_) => {}
2280        }
2281    }
2282
2283    let cleaned = parts.join("_").replace([' ', '\\'], "_");
2284
2285    // Append stable hash to prevent collisions
2286    let hash = fnv1a_hash(path);
2287    let hash_suffix = format!("{:08x}", hash);
2288
2289    if cleaned.is_empty() {
2290        format!("root_{}", hash_suffix)
2291    } else {
2292        format!("{}_{}", cleaned, hash_suffix)
2293    }
2294}
2295
2296fn fnv1a_hash(text: &str) -> u64 {
2297    let mut hash: u64 = 0xcbf29ce484222325;
2298    for byte in text.bytes() {
2299        hash ^= u64::from(byte);
2300        hash = hash.wrapping_mul(0x100000001b3);
2301    }
2302    hash
2303}
2304
2305/// Parse transfer statistics from rsync --stats output.
2306fn parse_rsync_stats(output: &str) -> RsyncStats {
2307    let mut stats = RsyncStats::default();
2308
2309    for line in output.lines() {
2310        let line = line.trim();
2311
2312        // Parse "Number of regular files transferred: N"
2313        if line.starts_with("Number of regular files transferred:")
2314            && let Some(num_str) = line.split(':').nth(1)
2315        {
2316            stats.files_transferred = num_str.trim().replace(',', "").parse().unwrap_or(0);
2317        }
2318
2319        // Parse "Total transferred file size: N bytes"
2320        if line.starts_with("Total transferred file size:")
2321            && let Some(size_part) = line.split(':').nth(1)
2322        {
2323            // Handle formats like "1,234 bytes" or "1234"
2324            let size_str = size_part
2325                .split_whitespace()
2326                .next()
2327                .unwrap_or("0")
2328                .replace(',', "");
2329            stats.bytes_transferred = size_str.parse().unwrap_or(0);
2330        }
2331    }
2332
2333    stats
2334}
2335
2336// =============================================================================
2337// Sync Status Persistence
2338// =============================================================================
2339
2340/// Result of a sync operation for a source.
2341#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
2342#[serde(rename_all = "snake_case")]
2343pub enum SyncResult {
2344    /// Sync completed successfully.
2345    Success,
2346    /// Some paths synced, some failed.
2347    PartialFailure(String),
2348    /// Sync failed completely.
2349    Failed(String),
2350    /// Sync was skipped (e.g., dry run).
2351    #[default]
2352    Skipped,
2353}
2354
2355impl SyncResult {
2356    /// Short display label for the result.
2357    pub fn label(&self) -> &'static str {
2358        match self {
2359            Self::Success => "success",
2360            Self::PartialFailure(_) => "partial",
2361            Self::Failed(_) => "failed",
2362            Self::Skipped => "never",
2363        }
2364    }
2365
2366    /// Error text for partial/full failures.
2367    pub fn error_message(&self) -> Option<&str> {
2368        match self {
2369            Self::PartialFailure(error) | Self::Failed(error) => Some(error.as_str()),
2370            Self::Success | Self::Skipped => None,
2371        }
2372    }
2373}
2374
2375/// Scheduler action for a remote source.
2376#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
2377#[serde(rename_all = "snake_case")]
2378pub enum SourceSyncAction {
2379    /// The source is eligible to sync now.
2380    Sync,
2381    /// The source is healthy enough but not due under its configured schedule.
2382    Skip,
2383    /// The source is temporarily or operationally unsafe to sync automatically.
2384    Defer,
2385}
2386
2387impl SourceSyncAction {
2388    pub fn as_str(self) -> &'static str {
2389        match self {
2390            Self::Sync => "sync",
2391            Self::Skip => "skip",
2392            Self::Defer => "defer",
2393        }
2394    }
2395}
2396
2397/// Health class used by the adaptive source scheduler.
2398#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
2399#[serde(rename_all = "snake_case")]
2400pub enum SourceHealthKind {
2401    NeverSynced,
2402    Healthy,
2403    Stale,
2404    HighLatency,
2405    Flapping,
2406    AuthFailed,
2407    BackingOff,
2408}
2409
2410impl SourceHealthKind {
2411    pub fn as_str(self) -> &'static str {
2412        match self {
2413            Self::NeverSynced => "never_synced",
2414            Self::Healthy => "healthy",
2415            Self::Stale => "stale",
2416            Self::HighLatency => "high_latency",
2417            Self::Flapping => "flapping",
2418            Self::AuthFailed => "auth_failed",
2419            Self::BackingOff => "backing_off",
2420        }
2421    }
2422}
2423
2424/// Evidence-backed scheduling decision for one source.
2425#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
2426pub struct SourceSyncDecision {
2427    /// Decision action the scheduler would take.
2428    pub action: SourceSyncAction,
2429    /// Current health class inferred from durable sync state.
2430    pub health: SourceHealthKind,
2431    /// Coarse 0..=100 health score for sorting/explanations.
2432    pub health_score: u8,
2433    /// Age of the last sync attempt, capped at zero when clocks move backward.
2434    pub staleness_ms: Option<i64>,
2435    /// Coarse 0..=100 estimate of value from refreshing stale remote data.
2436    pub stale_value_score: u8,
2437    /// Whether an explicit operator request is overriding automatic scheduling.
2438    pub manual_override: bool,
2439    /// Whether the decision is using the conservative fallback path.
2440    pub fallback_active: bool,
2441    /// Next time this source is eligible under its configured schedule.
2442    pub next_eligible_sync_ms: Option<i64>,
2443    /// End of transient failure backoff when applicable.
2444    pub backoff_until_ms: Option<i64>,
2445    /// Human-readable evidence terms, stable enough for robot consumers.
2446    pub reasons: Vec<String>,
2447}
2448
2449impl SourceSyncDecision {
2450    fn evaluate(
2451        source: &SourceDefinition,
2452        info: Option<&SourceSyncInfo>,
2453        now_ms: i64,
2454        manual_override: bool,
2455    ) -> Self {
2456        let period_ms = sync_schedule_period_ms(source.sync_schedule);
2457        let next_eligible_sync_ms = info
2458            .and_then(|info| info.last_sync)
2459            .and_then(|last_sync| period_ms.map(|period| last_sync.saturating_add(period)));
2460        let backoff_until_ms = info.and_then(failure_backoff_until_ms);
2461        let staleness_ms = info.and_then(|info| {
2462            info.last_sync
2463                .map(|last_sync| now_ms.saturating_sub(last_sync).max(0))
2464        });
2465        let stale_value_score =
2466            stale_value_score_for_source(source.sync_schedule, staleness_ms, info);
2467        let mut reasons = Vec::new();
2468
2469        let health = match info {
2470            None => {
2471                reasons.push("no durable sync status exists for this source".to_string());
2472                SourceHealthKind::NeverSynced
2473            }
2474            Some(info) if info.last_sync.is_none() => {
2475                reasons.push("source has never completed or attempted a sync".to_string());
2476                SourceHealthKind::NeverSynced
2477            }
2478            Some(info) if sync_result_auth_failure(&info.last_result) => {
2479                reasons
2480                    .push("last sync failed with an authentication or host-key error".to_string());
2481                SourceHealthKind::AuthFailed
2482            }
2483            Some(info) if matches!(info.last_result, SyncResult::PartialFailure(_)) => {
2484                reasons.push("last sync partially succeeded and partially failed".to_string());
2485                SourceHealthKind::Flapping
2486            }
2487            Some(info)
2488                if info.consecutive_failures > 0
2489                    && backoff_until_ms.is_some_and(|until| until > now_ms) =>
2490            {
2491                reasons.push(format!(
2492                    "{} consecutive failure(s) are inside retry backoff",
2493                    info.consecutive_failures
2494                ));
2495                SourceHealthKind::BackingOff
2496            }
2497            Some(info) if matches!(info.last_result, SyncResult::Failed(_)) => {
2498                let error = info.last_result.error_message().unwrap_or("unknown error");
2499                reasons.push(format!(
2500                    "last sync failed completely ({error}); local fallback remains active"
2501                ));
2502                SourceHealthKind::Flapping
2503            }
2504            Some(info) if info.duration_ms >= SOURCE_HIGH_LATENCY_MS => {
2505                reasons.push(format!(
2506                    "last sync took {}ms, above {}ms high-latency guard",
2507                    info.duration_ms, SOURCE_HIGH_LATENCY_MS
2508                ));
2509                SourceHealthKind::HighLatency
2510            }
2511            Some(info) if sync_schedule_due(info.last_sync, period_ms, now_ms) => {
2512                reasons.push("configured sync schedule is due".to_string());
2513                SourceHealthKind::Stale
2514            }
2515            Some(_) => SourceHealthKind::Healthy,
2516        };
2517
2518        let fallback_active = matches!(
2519            health,
2520            SourceHealthKind::AuthFailed
2521                | SourceHealthKind::BackingOff
2522                | SourceHealthKind::Flapping
2523                | SourceHealthKind::HighLatency
2524        );
2525
2526        let mut action = if manual_override {
2527            reasons.push("explicit sync command overrides automatic scheduling".to_string());
2528            SourceSyncAction::Sync
2529        } else {
2530            automatic_source_sync_action(source.sync_schedule, health, info, now_ms)
2531        };
2532
2533        if !manual_override && matches!(health, SourceHealthKind::AuthFailed) {
2534            action = SourceSyncAction::Defer;
2535        }
2536
2537        if !manual_override && matches!(source.sync_schedule, SyncSchedule::Manual) {
2538            reasons.push("sync_schedule=manual requires an explicit sync command".to_string());
2539        }
2540
2541        if !manual_override
2542            && matches!(action, SourceSyncAction::Skip)
2543            && let Some(next_ms) = next_eligible_sync_ms
2544        {
2545            reasons.push(format!(
2546                "next scheduled sync is eligible at unix_ms={next_ms}"
2547            ));
2548        }
2549
2550        if reasons.is_empty() {
2551            reasons.push("source is healthy and within schedule".to_string());
2552        }
2553
2554        Self {
2555            action,
2556            health,
2557            health_score: health_score_for_source(health),
2558            staleness_ms,
2559            stale_value_score,
2560            manual_override,
2561            fallback_active,
2562            next_eligible_sync_ms,
2563            backoff_until_ms,
2564            reasons,
2565        }
2566    }
2567}
2568
2569/// Sync information for a single source.
2570#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
2571pub struct SourceSyncInfo {
2572    /// Timestamp of last sync attempt.
2573    pub last_sync: Option<i64>,
2574    /// Result of last sync.
2575    pub last_result: SyncResult,
2576    /// Number of files synced in last sync.
2577    pub files_synced: u64,
2578    /// Number of bytes transferred in last sync.
2579    pub bytes_transferred: u64,
2580    /// Duration of last sync in milliseconds.
2581    pub duration_ms: u64,
2582    /// Consecutive failed sync attempts, reset to zero by a fully successful sync.
2583    #[serde(default)]
2584    pub consecutive_failures: u32,
2585}
2586
2587impl SourceSyncInfo {
2588    /// Build sync info from a sync report using the current wall clock time.
2589    pub fn from_report(report: &SyncReport) -> Self {
2590        let last_result = report.sync_result();
2591        Self {
2592            last_sync: Some(current_unix_ms()),
2593            consecutive_failures: u32::from(!report.all_succeeded),
2594            last_result,
2595            files_synced: report.total_files(),
2596            bytes_transferred: report.total_bytes(),
2597            duration_ms: report.total_duration_ms,
2598        }
2599    }
2600}
2601
2602/// Persistent sync status for all sources.
2603#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
2604pub struct SyncStatus {
2605    /// Sync info per source (keyed by source name).
2606    pub sources: std::collections::HashMap<String, SourceSyncInfo>,
2607}
2608
2609impl SyncStatus {
2610    /// Load sync status from disk.
2611    pub fn load(data_dir: &Path) -> Result<Self, std::io::Error> {
2612        let path = Self::status_path(data_dir);
2613        match std::fs::read_to_string(&path) {
2614            Ok(content) => serde_json::from_str(&content)
2615                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
2616            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()),
2617            Err(e) => Err(e),
2618        }
2619    }
2620
2621    /// Save sync status to disk.
2622    ///
2623    /// Uses an atomic rename on Unix. On Windows, falls back to remove-then-rename
2624    /// because replacing an existing destination with `std::fs::rename` fails.
2625    pub fn save(&self, data_dir: &Path) -> Result<(), std::io::Error> {
2626        let path = Self::status_path(data_dir);
2627        if let Some(parent) = path.parent() {
2628            std::fs::create_dir_all(parent)?;
2629        }
2630        let content = serde_json::to_string_pretty(self)?;
2631        let tmp_path = write_sync_status_temp_file(&path, content.as_bytes())?;
2632        replace_file_from_temp(&tmp_path, &path)
2633    }
2634
2635    /// Update status for a source from a sync report.
2636    pub fn update(&mut self, source_name: &str, report: &SyncReport) {
2637        let previous_failures = self
2638            .get(source_name)
2639            .map(|info| info.consecutive_failures)
2640            .unwrap_or_default();
2641        let mut info = SourceSyncInfo::from_report(report);
2642        if report.all_succeeded {
2643            info.consecutive_failures = 0;
2644        } else {
2645            info.consecutive_failures = previous_failures.saturating_add(1);
2646        }
2647        self.set_info(source_name, info);
2648    }
2649
2650    /// Set status for a source from precomputed sync info.
2651    pub fn set_info(&mut self, source_name: &str, info: SourceSyncInfo) {
2652        self.sources.insert(source_name.to_string(), info);
2653    }
2654
2655    /// Drop sync status entries for sources that no longer exist.
2656    ///
2657    /// Returns `true` when at least one stale entry was removed.
2658    pub fn retain_sources<'a>(&mut self, source_names: impl IntoIterator<Item = &'a str>) -> bool {
2659        let allowed: std::collections::HashSet<&str> = source_names.into_iter().collect();
2660        let previous_len = self.sources.len();
2661        self.sources
2662            .retain(|source_name, _| allowed.contains(source_name.as_str()));
2663        self.sources.len() != previous_len
2664    }
2665
2666    /// Get sync info for a source.
2667    pub fn get(&self, source_name: &str) -> Option<&SourceSyncInfo> {
2668        self.sources.get(source_name)
2669    }
2670
2671    /// Evaluate automatic scheduling for one source at a deterministic timestamp.
2672    pub fn decision_for_source_at(
2673        &self,
2674        source: &SourceDefinition,
2675        now_ms: i64,
2676        manual_override: bool,
2677    ) -> SourceSyncDecision {
2678        SourceSyncDecision::evaluate(source, self.get(&source.name), now_ms, manual_override)
2679    }
2680
2681    /// Get the path to the status file.
2682    fn status_path(data_dir: &Path) -> PathBuf {
2683        data_dir.join("sync_status.json")
2684    }
2685}
2686
2687const SOURCE_HIGH_LATENCY_MS: u64 = 60_000;
2688const SOURCE_FAILURE_BACKOFF_BASE_MS: i64 = 5 * 60 * 1000;
2689const SOURCE_FAILURE_BACKOFF_MAX_MS: i64 = 60 * 60 * 1000;
2690
2691pub(crate) fn current_unix_ms() -> i64 {
2692    let now = std::time::SystemTime::now()
2693        .duration_since(std::time::UNIX_EPOCH)
2694        .unwrap_or_default()
2695        .as_millis();
2696    i64::try_from(now).unwrap_or(i64::MAX)
2697}
2698
2699fn sync_schedule_period_ms(schedule: SyncSchedule) -> Option<i64> {
2700    match schedule {
2701        SyncSchedule::Manual => None,
2702        SyncSchedule::Hourly => Some(60 * 60 * 1000),
2703        SyncSchedule::Daily => Some(24 * 60 * 60 * 1000),
2704    }
2705}
2706
2707fn sync_schedule_due(last_sync: Option<i64>, period_ms: Option<i64>, now_ms: i64) -> bool {
2708    match (last_sync, period_ms) {
2709        (None, _) => true,
2710        (Some(_), None) => false,
2711        (Some(last_sync), Some(period_ms)) => last_sync.saturating_add(period_ms) <= now_ms,
2712    }
2713}
2714
2715fn automatic_source_sync_action(
2716    schedule: SyncSchedule,
2717    health: SourceHealthKind,
2718    info: Option<&SourceSyncInfo>,
2719    now_ms: i64,
2720) -> SourceSyncAction {
2721    match health {
2722        SourceHealthKind::AuthFailed | SourceHealthKind::BackingOff => SourceSyncAction::Defer,
2723        _ if matches!(schedule, SyncSchedule::Manual) => SourceSyncAction::Skip,
2724        SourceHealthKind::NeverSynced | SourceHealthKind::Stale => SourceSyncAction::Sync,
2725        SourceHealthKind::Flapping | SourceHealthKind::HighLatency => {
2726            if sync_schedule_due(
2727                info.and_then(|info| info.last_sync),
2728                sync_schedule_period_ms(schedule),
2729                now_ms,
2730            ) {
2731                SourceSyncAction::Sync
2732            } else {
2733                SourceSyncAction::Skip
2734            }
2735        }
2736        SourceHealthKind::Healthy => {
2737            if sync_schedule_due(
2738                info.and_then(|info| info.last_sync),
2739                sync_schedule_period_ms(schedule),
2740                now_ms,
2741            ) {
2742                SourceSyncAction::Sync
2743            } else {
2744                SourceSyncAction::Skip
2745            }
2746        }
2747    }
2748}
2749
2750fn health_score_for_source(health: SourceHealthKind) -> u8 {
2751    match health {
2752        SourceHealthKind::Healthy => 100,
2753        SourceHealthKind::Stale => 75,
2754        SourceHealthKind::NeverSynced => 65,
2755        SourceHealthKind::HighLatency => 55,
2756        SourceHealthKind::Flapping => 40,
2757        SourceHealthKind::BackingOff => 25,
2758        SourceHealthKind::AuthFailed => 10,
2759    }
2760}
2761
2762fn stale_value_score_for_source(
2763    schedule: SyncSchedule,
2764    staleness_ms: Option<i64>,
2765    info: Option<&SourceSyncInfo>,
2766) -> u8 {
2767    let Some(info) = info else {
2768        return 100;
2769    };
2770    if info.last_sync.is_none() {
2771        return 100;
2772    }
2773
2774    let Some(staleness_ms) = staleness_ms else {
2775        return 100;
2776    };
2777
2778    let Some(period_ms) = sync_schedule_period_ms(schedule) else {
2779        return 0;
2780    };
2781
2782    let score = staleness_ms.saturating_mul(100) / period_ms.max(1);
2783    u8::try_from(score.clamp(0, 100)).unwrap_or(100)
2784}
2785
2786fn failure_backoff_until_ms(info: &SourceSyncInfo) -> Option<i64> {
2787    if info.consecutive_failures == 0 {
2788        return None;
2789    }
2790    let last_sync = info.last_sync?;
2791    let exponent = info.consecutive_failures.saturating_sub(1).min(4);
2792    let multiplier = 1_i64.checked_shl(exponent).unwrap_or(16);
2793    let backoff_ms = SOURCE_FAILURE_BACKOFF_BASE_MS
2794        .saturating_mul(multiplier)
2795        .min(SOURCE_FAILURE_BACKOFF_MAX_MS);
2796    Some(last_sync.saturating_add(backoff_ms))
2797}
2798
2799fn sync_result_auth_failure(result: &SyncResult) -> bool {
2800    let Some(error) = result.error_message() else {
2801        return false;
2802    };
2803    let error = error.to_ascii_lowercase();
2804    error.contains("permission denied")
2805        || error.contains("authentication")
2806        || error.contains("host key verification failed")
2807        || error.contains("known_hosts")
2808        || error.contains("no valid authentication")
2809}
2810
2811fn unique_atomic_temp_path(path: &Path) -> PathBuf {
2812    unique_atomic_sidecar_path(path, "tmp", "sync_status.json")
2813}
2814
2815fn write_sync_status_temp_file(
2816    final_path: &Path,
2817    content: &[u8],
2818) -> Result<PathBuf, std::io::Error> {
2819    for _ in 0..100 {
2820        let tmp_path = unique_atomic_temp_path(final_path);
2821        match write_sync_status_temp_file_at(&tmp_path, content) {
2822            Ok(()) => return Ok(tmp_path),
2823            Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => continue,
2824            Err(err) => return Err(err),
2825        }
2826    }
2827
2828    Err(std::io::Error::new(
2829        std::io::ErrorKind::AlreadyExists,
2830        format!(
2831            "failed to allocate unique sync status temp path for {}",
2832            final_path.display()
2833        ),
2834    ))
2835}
2836
2837fn write_sync_status_temp_file_at(path: &Path, content: &[u8]) -> Result<(), std::io::Error> {
2838    let mut file = std::fs::OpenOptions::new()
2839        .write(true)
2840        .create_new(true)
2841        .open(path)?;
2842    file.write_all(content)?;
2843    file.sync_all()
2844}
2845
2846fn replace_file_from_temp(temp_path: &Path, final_path: &Path) -> Result<(), std::io::Error> {
2847    #[cfg(windows)]
2848    {
2849        match std::fs::rename(temp_path, final_path) {
2850            Ok(()) => sync_parent_directory(final_path),
2851            Err(first_err)
2852                if final_path.exists()
2853                    && matches!(
2854                        first_err.kind(),
2855                        std::io::ErrorKind::AlreadyExists | std::io::ErrorKind::PermissionDenied
2856                    ) =>
2857            {
2858                let backup_path = unique_replace_backup_path(final_path);
2859                std::fs::rename(final_path, &backup_path).map_err(|backup_err| {
2860                    let _ = std::fs::remove_file(temp_path);
2861                    std::io::Error::other(format!(
2862                        "failed preparing backup {} before replacing {}: first error: {}; backup error: {}",
2863                        backup_path.display(),
2864                        final_path.display(),
2865                        first_err,
2866                        backup_err
2867                    ))
2868                })?;
2869                match std::fs::rename(temp_path, final_path) {
2870                    Ok(()) => {
2871                        let _ = std::fs::remove_file(&backup_path);
2872                        sync_parent_directory(final_path)
2873                    }
2874                    Err(second_err) => {
2875                        let restore_result = std::fs::rename(&backup_path, final_path);
2876                        match restore_result {
2877                            Ok(()) => {
2878                                let _ = std::fs::remove_file(temp_path);
2879                                sync_parent_directory(final_path).map_err(|sync_err| {
2880                                    std::io::Error::other(format!(
2881                                        "failed replacing {} with {}: first error: {}; second error: {}; restored original file but failed syncing parent directory: {}",
2882                                        final_path.display(),
2883                                        temp_path.display(),
2884                                        first_err,
2885                                        second_err,
2886                                        sync_err
2887                                    ))
2888                                })?;
2889                                Err(std::io::Error::new(
2890                                    second_err.kind(),
2891                                    format!(
2892                                        "failed replacing {} with {}: first error: {}; second error: {}; restored original file",
2893                                        final_path.display(),
2894                                        temp_path.display(),
2895                                        first_err,
2896                                        second_err
2897                                    ),
2898                                ))
2899                            }
2900                            Err(restore_err) => Err(std::io::Error::other(format!(
2901                                "failed replacing {} with {}: first error: {}; second error: {}; restore error: {}; temp file retained at {}",
2902                                final_path.display(),
2903                                temp_path.display(),
2904                                first_err,
2905                                second_err,
2906                                restore_err,
2907                                temp_path.display()
2908                            ))),
2909                        }
2910                    }
2911                }
2912            }
2913            Err(rename_err) => Err(rename_err),
2914        }
2915    }
2916
2917    #[cfg(not(windows))]
2918    {
2919        std::fs::rename(temp_path, final_path)?;
2920        sync_parent_directory(final_path)
2921    }
2922}
2923
2924fn sync_file_path(path: &Path) -> Result<(), std::io::Error> {
2925    std::fs::File::open(path)?.sync_all()
2926}
2927
2928#[cfg(not(windows))]
2929fn sync_parent_directory(path: &Path) -> Result<(), std::io::Error> {
2930    let Some(parent) = path.parent() else {
2931        return Ok(());
2932    };
2933    std::fs::File::open(parent)?.sync_all()
2934}
2935
2936#[cfg(windows)]
2937fn sync_parent_directory(_path: &Path) -> Result<(), std::io::Error> {
2938    Ok(())
2939}
2940
2941#[cfg(windows)]
2942fn unique_replace_backup_path(path: &Path) -> PathBuf {
2943    unique_atomic_sidecar_path(path, "bak", "sync_status.json")
2944}
2945
2946fn unique_atomic_sidecar_path(path: &Path, suffix: &str, fallback_name: &str) -> PathBuf {
2947    static NEXT_NONCE: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
2948
2949    let timestamp = std::time::SystemTime::now()
2950        .duration_since(std::time::UNIX_EPOCH)
2951        .unwrap_or_default()
2952        .as_nanos();
2953    let nonce = NEXT_NONCE.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2954    let file_name = path
2955        .file_name()
2956        .and_then(|name| name.to_str())
2957        .unwrap_or(fallback_name);
2958
2959    path.with_file_name(format!(
2960        ".{file_name}.{suffix}.{}.{}.{}",
2961        std::process::id(),
2962        timestamp,
2963        nonce
2964    ))
2965}
2966
2967#[cfg(test)]
2968mod tests {
2969    use super::*;
2970    use tempfile::TempDir;
2971
2972    #[test]
2973    fn test_path_to_safe_dirname() {
2974        let res = path_to_safe_dirname("~/.claude/projects");
2975        assert!(res.starts_with(".claude_projects_"));
2976
2977        let res = path_to_safe_dirname("/home/user/data");
2978        assert!(res.starts_with("home_user_data_"));
2979
2980        let res = path_to_safe_dirname("~/");
2981        assert!(res.starts_with("root_"));
2982
2983        let res = path_to_safe_dirname("");
2984        assert!(res.starts_with("root_"));
2985    }
2986
2987    #[test]
2988    fn test_path_to_safe_dirname_empty() {
2989        let res = path_to_safe_dirname("~");
2990        assert!(res.starts_with("root_"));
2991
2992        let res = path_to_safe_dirname("/");
2993        assert!(res.starts_with("root_"));
2994    }
2995
2996    #[test]
2997    fn test_path_to_safe_dirname_strips_traversal_components() {
2998        let res = path_to_safe_dirname("../../etc/passwd");
2999
3000        assert!(res.starts_with("etc_passwd_"));
3001        assert!(!res.contains(".."));
3002        assert!(!res.contains('/'));
3003        assert!(!res.contains('\\'));
3004    }
3005
3006    #[test]
3007    fn test_get_remote_home_rejects_unsafe_hosts_before_ssh() {
3008        let temp = TempDir::new().unwrap();
3009        let engine = SyncEngine::new(temp.path());
3010
3011        for host in [
3012            "work-mac;touch /tmp/cass-owned",
3013            "work mac",
3014            "work-mac\nhostname",
3015            "work-mac`hostname`",
3016            "work-mac/../../secret",
3017            "-oProxyCommand=evil",
3018            "",
3019            "@host",
3020            "user@",
3021            "user@host@extra",
3022        ] {
3023            let err = engine.get_remote_home(host).unwrap_err();
3024            assert!(
3025                matches!(err, SyncError::SshFailed(ref message) if message.contains("Invalid characters in host")),
3026                "expected invalid-host rejection for {host:?}, got {err}"
3027            );
3028        }
3029    }
3030
3031    #[test]
3032    fn test_sync_source_rejects_invalid_source_name_before_mirror_creation() {
3033        let temp = TempDir::new().unwrap();
3034        let engine = SyncEngine::new(temp.path());
3035        let mut source = SourceDefinition::ssh("../escape", "user@host");
3036        source.paths = vec!["/tmp/sessions".to_string()];
3037
3038        let err = engine
3039            .sync_source(&source)
3040            .expect_err("invalid source name should fail before local writes");
3041
3042        assert!(
3043            matches!(err, SyncError::InvalidSource(ref message) if message.contains("Source name cannot contain path separators")),
3044            "expected invalid source-name rejection, got {err}"
3045        );
3046        assert!(
3047            !temp.path().join("escape").exists(),
3048            "invalid source name must not escape the remotes mirror layout"
3049        );
3050        assert!(
3051            !temp.path().join("remotes").exists(),
3052            "invalid source name must be rejected before creating mirror roots"
3053        );
3054    }
3055
3056    #[test]
3057    fn test_sync_source_rejects_invalid_host_before_mirror_creation() {
3058        let temp = TempDir::new().unwrap();
3059        let engine = SyncEngine::new(temp.path());
3060        let mut source = SourceDefinition::ssh("unsafe-host", "user@host withspace");
3061        source.paths = vec!["/tmp/sessions".to_string()];
3062
3063        let err = engine
3064            .sync_source(&source)
3065            .expect_err("invalid host should fail before local writes");
3066
3067        assert!(
3068            matches!(err, SyncError::InvalidSource(ref message) if message.contains("SSH host cannot contain whitespace")),
3069            "expected invalid host rejection, got {err}"
3070        );
3071        assert!(
3072            !temp.path().join("remotes").exists(),
3073            "invalid host must be rejected before creating mirror roots"
3074        );
3075    }
3076
3077    #[test]
3078    fn test_sync_source_reports_invalid_remote_paths_without_transfer() {
3079        let temp = TempDir::new().unwrap();
3080        let engine = SyncEngine::new(temp.path());
3081
3082        for (path, expected) in [
3083            ("", "paths[0] cannot be empty"),
3084            ("   ", "paths[0] cannot be empty"),
3085            (" ~/.claude/projects", "paths[0] cannot have leading"),
3086            ("~/.claude/projects ", "paths[0] cannot have leading"),
3087            ("~/.claude\nprojects", "paths[0] cannot contain control"),
3088        ] {
3089            let mut source = SourceDefinition::ssh("laptop", "user@laptop.local");
3090            source.paths = vec![path.to_string()];
3091
3092            let report = engine.sync_source(&source).unwrap();
3093            assert_eq!(report.path_results.len(), 1);
3094            let result = &report.path_results[0];
3095            assert!(!result.success);
3096            assert_eq!(result.remote_path, path);
3097            assert!(
3098                result
3099                    .error
3100                    .as_deref()
3101                    .is_some_and(|message| message.contains(expected)),
3102                "expected invalid path rejection for {path:?}, got {result:?}"
3103            );
3104        }
3105    }
3106
3107    #[test]
3108    fn test_remote_sync_path_validation_allows_internal_spaces() {
3109        assert!(
3110            validate_remote_sync_path_entry(
3111                0,
3112                "~/Library/Application Support/Cursor/User/globalStorage"
3113            )
3114            .is_ok()
3115        );
3116    }
3117
3118    #[test]
3119    fn test_sync_source_preserves_path_result_order_for_mixed_invalid_paths() {
3120        let temp = TempDir::new().unwrap();
3121        let engine = SyncEngine::new(temp.path()).with_connection_timeout(1);
3122        // Use a validation-safe TEST-NET host so source structure checks pass,
3123        // but remote-home lookup still fails quickly before path result ordering.
3124        let mut source = SourceDefinition::ssh("laptop", "192.0.2.1");
3125        source.paths = vec![
3126            "~/.codex/sessions".to_string(),
3127            " ~/.claude/projects".to_string(),
3128            "~/.gemini/tmp".to_string(),
3129        ];
3130
3131        let report = engine.sync_source(&source).unwrap();
3132        let remote_paths = report
3133            .path_results
3134            .iter()
3135            .map(|result| result.remote_path.as_str())
3136            .collect::<Vec<_>>();
3137
3138        assert_eq!(
3139            remote_paths,
3140            vec!["~/.codex/sessions", " ~/.claude/projects", "~/.gemini/tmp"]
3141        );
3142        assert!(
3143            report.path_results[1]
3144                .error
3145                .as_deref()
3146                .is_some_and(|message| message.contains("paths[1] cannot have leading")),
3147            "expected invalid path error in original slot: {:?}",
3148            report.path_results
3149        );
3150    }
3151
3152    #[test]
3153    fn test_remote_find_regular_files_command_uses_physical_traversal() {
3154        assert_eq!(
3155            remote_find_regular_files_command("/tmp/has space"),
3156            "find -P '/tmp/has space' -type f -print0"
3157        );
3158        assert_eq!(
3159            remote_find_regular_files_command("/tmp/that's all"),
3160            "find -P '/tmp/that'\\''s all' -type f -print0"
3161        );
3162    }
3163
3164    #[test]
3165    fn test_parse_remote_home_stdout_accepts_single_absolute_candidate() {
3166        assert_eq!(
3167            parse_remote_home_stdout(b"Welcome to host\nCASS_HOME_MARKER:/home/user\n"),
3168            Some("/home/user".to_string())
3169        );
3170        assert_eq!(
3171            parse_remote_home_stdout(b"CASS_HOME_MARKER:/Users/test user\r\n"),
3172            Some("/Users/test user".to_string())
3173        );
3174    }
3175
3176    #[test]
3177    fn test_parse_remote_home_stdout_rejects_missing_or_ambiguous_home() {
3178        assert_eq!(parse_remote_home_stdout(b"Welcome to host\n"), None);
3179        assert_eq!(
3180            parse_remote_home_stdout(b"CASS_HOME_MARKER:not_absolute\n"),
3181            None
3182        );
3183    }
3184
3185    #[test]
3186    fn test_parse_null_terminated_utf8_paths_skips_invalid_entries() {
3187        let paths = parse_null_terminated_utf8_paths(
3188            b"/remote/sessions/a.jsonl\0bad-\xff-name\0/remote/sessions/b.jsonl\0",
3189        );
3190        assert_eq!(
3191            paths,
3192            vec![
3193                "/remote/sessions/a.jsonl".to_string(),
3194                "/remote/sessions/b.jsonl".to_string()
3195            ]
3196        );
3197    }
3198
3199    #[test]
3200    fn test_remote_file_to_safe_local_path_rejects_outside_root() {
3201        let root = Path::new("/remote/sessions");
3202        let local = Path::new("/mirror/root");
3203
3204        assert_eq!(
3205            remote_file_to_safe_local_path(
3206                root,
3207                Path::new("/remote/sessions/a/b.jsonl"),
3208                local,
3209                "sessions"
3210            ),
3211            Some(PathBuf::from("/mirror/root/sessions/a/b.jsonl"))
3212        );
3213        assert_eq!(
3214            remote_file_to_safe_local_path(
3215                Path::new("/remote/session.jsonl"),
3216                Path::new("/remote/session.jsonl"),
3217                local,
3218                "session.jsonl"
3219            ),
3220            Some(PathBuf::from("/mirror/root/session.jsonl"))
3221        );
3222        assert_eq!(
3223            remote_file_to_safe_local_path(
3224                root,
3225                Path::new("/remote/sessions/../secret.txt"),
3226                local,
3227                "sessions"
3228            ),
3229            None
3230        );
3231        assert_eq!(
3232            remote_file_to_safe_local_path(
3233                root,
3234                Path::new("/remote/other/secret.txt"),
3235                local,
3236                "sessions"
3237            ),
3238            None
3239        );
3240    }
3241
3242    #[test]
3243    fn test_local_symlink_guard_allows_regular_paths() {
3244        let temp = TempDir::new().expect("tempdir");
3245        let root = temp.path().join("mirror");
3246        let target = root.join("sessions/session.jsonl");
3247
3248        assert!(reject_local_symlink_below_root(&root, &target).is_ok());
3249
3250        std::fs::create_dir_all(target.parent().expect("target parent")).expect("create parent");
3251        std::fs::write(&target, "{}").expect("write target");
3252
3253        assert!(reject_local_symlink_below_root(&root, &target).is_ok());
3254    }
3255
3256    #[cfg(unix)]
3257    #[test]
3258    fn test_local_symlink_guard_rejects_nested_symlink() {
3259        use std::os::unix::fs::symlink;
3260
3261        let temp = TempDir::new().expect("tempdir");
3262        let root = temp.path().join("mirror");
3263        let outside = temp.path().join("outside");
3264        std::fs::create_dir_all(&root).expect("create root");
3265        std::fs::create_dir_all(&outside).expect("create outside");
3266        symlink(&outside, root.join("sessions")).expect("symlink nested dir");
3267
3268        let err = reject_local_symlink_below_root(&root, &root.join("sessions/session.jsonl"))
3269            .expect_err("nested symlink should be rejected");
3270
3271        assert!(err.contains("Refusing to write"));
3272        assert!(err.contains("sessions"));
3273    }
3274
3275    #[cfg(unix)]
3276    #[test]
3277    fn test_local_symlink_guard_rejects_root_symlink() {
3278        use std::os::unix::fs::symlink;
3279
3280        let temp = TempDir::new().expect("tempdir");
3281        let outside = temp.path().join("outside");
3282        let root = temp.path().join("mirror-link");
3283        std::fs::create_dir_all(&outside).expect("create outside");
3284        symlink(&outside, &root).expect("symlink root");
3285
3286        let err = reject_local_symlink_below_root(&root, &root.join("session.jsonl"))
3287            .expect_err("root symlink should be rejected");
3288
3289        assert!(err.contains("Refusing to write"));
3290        assert!(err.contains("mirror-link"));
3291    }
3292
3293    #[test]
3294    fn test_prepare_local_sync_container_creates_regular_container() {
3295        let temp = TempDir::new().expect("tempdir");
3296        let root = temp.path().join("mirror");
3297        let target = root.join("sessions");
3298
3299        prepare_local_sync_container(&root, &target).expect("regular container should be created");
3300
3301        assert!(target.is_dir());
3302    }
3303
3304    #[cfg(unix)]
3305    #[test]
3306    fn test_prepare_local_sync_container_rejects_preexisting_target_symlink() {
3307        use std::os::unix::fs::symlink;
3308
3309        let temp = TempDir::new().expect("tempdir");
3310        let root = temp.path().join("mirror");
3311        let outside = temp.path().join("outside");
3312        let target = root.join("sessions");
3313        std::fs::create_dir_all(&root).expect("create root");
3314        std::fs::create_dir_all(&outside).expect("create outside");
3315        symlink(&outside, &target).expect("symlink target");
3316
3317        let err = prepare_local_sync_container(&root, &target)
3318            .expect_err("sync container symlink should be rejected");
3319
3320        assert!(err.contains("Refusing to write"));
3321        assert!(err.contains("sessions"));
3322    }
3323
3324    #[cfg(unix)]
3325    #[test]
3326    fn test_prepare_local_sync_container_rejects_root_symlink() {
3327        use std::os::unix::fs::symlink;
3328
3329        let temp = TempDir::new().expect("tempdir");
3330        let outside = temp.path().join("outside");
3331        let root = temp.path().join("mirror-link");
3332        let target = root.join("sessions");
3333        std::fs::create_dir_all(&outside).expect("create outside");
3334        symlink(&outside, &root).expect("symlink root");
3335
3336        let err = prepare_local_sync_container(&root, &target)
3337            .expect_err("sync root symlink should be rejected");
3338
3339        assert!(err.contains("Refusing to write"));
3340        assert!(err.contains("mirror-link"));
3341    }
3342
3343    #[cfg(unix)]
3344    #[test]
3345    fn test_prepare_local_sync_root_rejects_symlinked_source_parent() {
3346        use std::os::unix::fs::symlink;
3347
3348        let temp = TempDir::new().expect("tempdir");
3349        let local_store = temp.path().join("data");
3350        let remotes = local_store.join("remotes");
3351        let outside = temp.path().join("outside");
3352        let source_link = remotes.join("laptop");
3353        let mirror_dir = source_link.join("mirror");
3354
3355        std::fs::create_dir_all(&remotes).expect("create remotes");
3356        std::fs::create_dir_all(&outside).expect("create outside");
3357        symlink(&outside, &source_link).expect("symlink source parent");
3358
3359        let err = prepare_local_sync_root(&local_store, &mirror_dir)
3360            .expect_err("symlinked source parent should be rejected before mkdir");
3361
3362        assert!(err.contains("Refusing to write"));
3363        assert!(err.contains("laptop"));
3364        assert!(
3365            !outside.join("mirror").exists(),
3366            "sync root preparation must not create directories through source parent symlinks"
3367        );
3368    }
3369
3370    #[test]
3371    fn test_sftp_file_stat_is_symlink_detects_link_modes() {
3372        let symlink = FileStat {
3373            size: None,
3374            uid: None,
3375            gid: None,
3376            perm: Some(0o120000 | 0o777),
3377            atime: None,
3378            mtime: None,
3379        };
3380        let regular = FileStat {
3381            size: None,
3382            uid: None,
3383            gid: None,
3384            perm: Some(0o100000 | 0o644),
3385            atime: None,
3386            mtime: None,
3387        };
3388
3389        assert!(sftp_file_stat_is_symlink(&symlink));
3390        assert!(!sftp_file_stat_is_symlink(&regular));
3391    }
3392
3393    #[test]
3394    fn test_sftp_entry_file_name_accepts_regular_names() {
3395        let parent = Path::new("/remote");
3396        let entry = parent.join("session.jsonl");
3397
3398        assert_eq!(sftp_entry_file_name(&entry, parent), Some("session.jsonl"));
3399    }
3400
3401    #[test]
3402    fn test_sftp_entry_file_name_skips_dot_entries() {
3403        let parent = Path::new("/remote");
3404
3405        assert_eq!(sftp_entry_file_name(Path::new("."), parent), None);
3406        assert_eq!(sftp_entry_file_name(Path::new(".."), parent), None);
3407    }
3408
3409    #[cfg(unix)]
3410    #[test]
3411    fn test_sftp_entry_file_name_rejects_non_utf8_names() {
3412        use std::ffi::OsStr;
3413        use std::os::unix::ffi::OsStrExt;
3414
3415        let parent = Path::new("/remote");
3416        let bad_component = Path::new(OsStr::from_bytes(b"bad-\xff-name"));
3417        let entry = parent.join(bad_component);
3418
3419        assert_eq!(sftp_entry_file_name(&entry, parent), None);
3420    }
3421
3422    #[test]
3423    fn test_parse_rsync_stats() {
3424        let output = r#"
3425Number of files: 42
3426Number of regular files transferred: 10
3427Total transferred file size: 1,234 bytes
3428        "#;
3429
3430        let stats = parse_rsync_stats(output);
3431        assert_eq!(stats.files_transferred, 10);
3432        assert_eq!(stats.bytes_transferred, 1234);
3433    }
3434
3435    #[test]
3436    fn test_parse_rsync_stats_empty() {
3437        let stats = parse_rsync_stats("");
3438        assert_eq!(stats.files_transferred, 0);
3439        assert_eq!(stats.bytes_transferred, 0);
3440    }
3441
3442    #[test]
3443    fn test_quote_remote_shell_path_handles_spaces_and_quotes() {
3444        assert_eq!(
3445            quote_remote_shell_path("/Users/me/Library/Application Support/Cursor"),
3446            "'/Users/me/Library/Application Support/Cursor'"
3447        );
3448        assert_eq!(
3449            quote_remote_shell_path("/tmp/that's all"),
3450            "'/tmp/that'\\''s all'"
3451        );
3452    }
3453
3454    #[test]
3455    fn test_remote_spec_for_rsync_quotes_only_when_needed() {
3456        assert_eq!(
3457            remote_spec_for_rsync("work-mac", "/tmp/has space", true),
3458            "work-mac:/tmp/has space"
3459        );
3460        assert_eq!(
3461            remote_spec_for_rsync("work-mac", "/tmp/that's all", true),
3462            "work-mac:/tmp/that's all"
3463        );
3464        assert_eq!(
3465            remote_spec_for_rsync("work-mac", "/tmp/has space", false),
3466            "work-mac:'/tmp/has space'"
3467        );
3468    }
3469
3470    #[test]
3471    fn rsync_arg_protection_enum_maps_flags_correctly() {
3472        // Regression for #191: Homebrew rsync 3.4.1 renamed the flag to
3473        // --secluded-args; earlier 3.0–3.3 use --protect-args. The caller
3474        // must pass the name the installed rsync actually accepts in its
3475        // own --help listing.
3476        assert_eq!(
3477            RsyncArgProtection::ProtectArgs.flag(),
3478            Some("--protect-args")
3479        );
3480        assert_eq!(
3481            RsyncArgProtection::SecludedArgs.flag(),
3482            Some("--secluded-args")
3483        );
3484        assert_eq!(RsyncArgProtection::None.flag(), None);
3485        assert!(RsyncArgProtection::ProtectArgs.is_supported());
3486        assert!(RsyncArgProtection::SecludedArgs.is_supported());
3487        assert!(!RsyncArgProtection::None.is_supported());
3488    }
3489
3490    #[test]
3491    fn rsync_arg_protection_remote_rejected_detects_openrsync_errors() {
3492        // GNU rsync error formats (pre-existing coverage):
3493        assert!(rsync_arg_protection_remote_rejected(
3494            "rsync: invalid option -- s\nrsync error: syntax or usage error"
3495        ));
3496        assert!(rsync_arg_protection_remote_rejected(
3497            "remote rsync: unrecognized option '--secluded-args'"
3498        ));
3499        assert!(rsync_arg_protection_remote_rejected(
3500            "remote rsync: unknown option -- protect-args"
3501        ));
3502        // BSD/macOS getopt format (regression for cass#266 Bug 5):
3503        // Apple's getopt(3) prints "illegal option" rather than "invalid option".
3504        assert!(rsync_arg_protection_remote_rejected(
3505            "rsync: illegal option -- s\nrsync error: error in rsync protocol data stream (code 12)"
3506        ));
3507        // Case-insensitive check:
3508        assert!(rsync_arg_protection_remote_rejected(
3509            "rsync: Illegal option -- s\n"
3510        ));
3511        assert!(!rsync_arg_protection_remote_rejected(
3512            "rsync: change_dir \"/missing\" failed: No such file or directory"
3513        ));
3514    }
3515
3516    // -------------------------------------------------------------------------
3517    // openrsync remote version probe (cass#266 Bug 5)
3518    // -------------------------------------------------------------------------
3519
3520    #[test]
3521    fn parse_remote_rsync_is_openrsync_detects_apple_openrsync() {
3522        // Typical output from Apple's openrsync on macOS 14/15:
3523        assert!(parse_remote_rsync_is_openrsync(
3524            "openrsync: protocol version 29"
3525        ));
3526        // Case-insensitive:
3527        assert!(parse_remote_rsync_is_openrsync(
3528            "OpenRsync: protocol version 29"
3529        ));
3530        // GNU rsync must NOT be flagged as openrsync:
3531        assert!(!parse_remote_rsync_is_openrsync(
3532            "rsync  version 3.4.1  protocol version 31"
3533        ));
3534        assert!(!parse_remote_rsync_is_openrsync(
3535            "rsync  version 2.6.9  protocol version 29"
3536        ));
3537        // Homebrew rsync on macOS (GNU rsync) should not be flagged:
3538        assert!(!parse_remote_rsync_is_openrsync(
3539            "rsync  version 3.3.0  protocol version 31\nCopyright ..."
3540        ));
3541        // Empty output:
3542        assert!(!parse_remote_rsync_is_openrsync(""));
3543    }
3544
3545    #[test]
3546    fn openrsync_remote_forces_no_arg_protection_flag() {
3547        // When remote_is_openrsync is true, run_rsync_command must be called
3548        // with RsyncArgProtection::None (no --secluded-args / -s).
3549        // We verify the flag() result directly so this test is self-contained.
3550        let protection_when_openrsync = if true {
3551            // mirrors the branch in sync_path_rsync when remote_is_openrsync
3552            RsyncArgProtection::None
3553        } else {
3554            detect_rsync_arg_protection()
3555        };
3556        assert!(
3557            protection_when_openrsync.flag().is_none(),
3558            "openrsync remote must not receive --secluded-args or --protect-args; got {:?}",
3559            protection_when_openrsync.flag()
3560        );
3561    }
3562
3563    #[test]
3564    fn gnu_rsync_remote_allows_arg_protection_flag() {
3565        // When remote is NOT openrsync, the local rsync detection should be
3566        // consulted (whatever the local build supports).  Simulate a
3567        // non-openrsync path and verify the protection level comes from the
3568        // local probe rather than being forced to None.
3569        //
3570        // We can't call detect_rsync_arg_protection() in a unit test reliably
3571        // (depends on the installed rsync binary), so we just confirm that the
3572        // code path does NOT unconditionally use None.
3573        let remote_is_openrsync = false;
3574        // Simulate the branch logic from sync_path_rsync:
3575        let _protection = if remote_is_openrsync {
3576            RsyncArgProtection::None
3577        } else {
3578            // In production this calls detect_rsync_arg_protection();
3579            // for the test, use a known value to assert the branch is taken.
3580            RsyncArgProtection::SecludedArgs
3581        };
3582        // If remote_is_openrsync is false, the flag must NOT be None
3583        // (assuming the local rsync supports at least one variant).
3584        assert!(
3585            !remote_is_openrsync,
3586            "sanity: non-openrsync path should not be treated as openrsync"
3587        );
3588    }
3589
3590    #[test]
3591    fn test_remote_spec_for_shell_bound_copy_quotes_remote_path() {
3592        assert_eq!(
3593            remote_spec_for_shell_bound_copy("work-mac", "/tmp/has space"),
3594            "work-mac:'/tmp/has space'"
3595        );
3596    }
3597
3598    #[test]
3599    fn test_remote_spec_for_scp_always_quotes_remote_path() {
3600        assert_eq!(
3601            remote_spec_for_scp("work-mac", "/tmp/that's all"),
3602            "work-mac:'/tmp/that'\\''s all'"
3603        );
3604    }
3605
3606    #[test]
3607    fn test_sync_report_totals() {
3608        let mut report = SyncReport::new("test", SyncMethod::Rsync);
3609        report.add_path_result(PathSyncResult {
3610            files_transferred: 5,
3611            bytes_transferred: 100,
3612            success: true,
3613            ..Default::default()
3614        });
3615        report.add_path_result(PathSyncResult {
3616            files_transferred: 3,
3617            bytes_transferred: 50,
3618            success: true,
3619            ..Default::default()
3620        });
3621
3622        assert_eq!(report.total_files(), 8);
3623        assert_eq!(report.total_bytes(), 150);
3624        assert!(report.all_succeeded);
3625    }
3626
3627    #[test]
3628    fn test_sync_report_with_failure() {
3629        let mut report = SyncReport::new("test", SyncMethod::Rsync);
3630        report.add_path_result(PathSyncResult {
3631            success: true,
3632            ..Default::default()
3633        });
3634        report.add_path_result(PathSyncResult {
3635            success: false,
3636            error: Some("Connection refused".into()),
3637            ..Default::default()
3638        });
3639
3640        assert!(!report.all_succeeded);
3641        assert_eq!(report.successful_paths(), 1);
3642        assert_eq!(report.failed_paths(), 1);
3643    }
3644
3645    #[test]
3646    fn test_detect_sync_method() {
3647        // This test is platform-dependent but should at least not panic
3648        let method = SyncEngine::detect_sync_method();
3649        assert!(matches!(
3650            method,
3651            SyncMethod::Rsync | SyncMethod::WslRsync | SyncMethod::Scp | SyncMethod::Sftp
3652        ));
3653    }
3654
3655    #[test]
3656    fn test_sync_engine_mirror_dir() {
3657        let engine = SyncEngine::new(Path::new("/data/cass"));
3658        let mirror = engine.mirror_dir("laptop");
3659        assert_eq!(mirror, PathBuf::from("/data/cass/remotes/laptop/mirror"));
3660    }
3661
3662    #[test]
3663    fn test_sync_method_display() {
3664        for (method, expected) in [
3665            (SyncMethod::Rsync, "rsync"),
3666            (SyncMethod::WslRsync, "wsl-rsync"),
3667            (SyncMethod::Scp, "scp"),
3668            (SyncMethod::Sftp, "sftp"),
3669        ] {
3670            assert_eq!(method.as_str(), expected);
3671            assert_eq!(method.to_string(), expected);
3672        }
3673    }
3674
3675    #[test]
3676    fn test_windows_path_to_wsl_drive() {
3677        assert_eq!(
3678            windows_path_to_wsl("C:\\Users\\george\\AppData\\Roaming\\cass"),
3679            "/mnt/c/Users/george/AppData/Roaming/cass"
3680        );
3681    }
3682
3683    #[test]
3684    fn test_windows_path_to_wsl_forward_slash() {
3685        assert_eq!(
3686            windows_path_to_wsl("C:/Users/george/data"),
3687            "/mnt/c/Users/george/data"
3688        );
3689    }
3690
3691    #[test]
3692    fn test_windows_path_to_wsl_non_windows_path_unchanged() {
3693        // A Unix absolute path should pass through unchanged.
3694        assert_eq!(
3695            windows_path_to_wsl("/home/george/data"),
3696            "/home/george/data"
3697        );
3698    }
3699
3700    #[test]
3701    fn test_expand_tilde_with_home() {
3702        // No tilde - returns unchanged
3703        assert_eq!(
3704            SyncEngine::expand_tilde_with_home("/home/user/projects", Some("/home/user")),
3705            "/home/user/projects"
3706        );
3707
3708        // Tilde with home provided
3709        assert_eq!(
3710            SyncEngine::expand_tilde_with_home("~/.claude/projects", Some("/home/user")),
3711            "/home/user/.claude/projects"
3712        );
3713
3714        // Just tilde
3715        assert_eq!(
3716            SyncEngine::expand_tilde_with_home("~", Some("/home/user")),
3717            "/home/user"
3718        );
3719
3720        // Tilde without home - returns unchanged
3721        assert_eq!(
3722            SyncEngine::expand_tilde_with_home("~/.claude/projects", None),
3723            "~/.claude/projects"
3724        );
3725
3726        // ~otheruser/path case - not expanded
3727        assert_eq!(
3728            SyncEngine::expand_tilde_with_home("~otheruser/projects", Some("/home/user")),
3729            "~otheruser/projects"
3730        );
3731    }
3732
3733    #[test]
3734    fn test_sync_report_failed() {
3735        let report = SyncReport::failed("test-source", SyncError::NoHost);
3736        assert_eq!(report.source_name, "test-source");
3737        assert!(!report.all_succeeded);
3738        assert_eq!(report.path_results.len(), 1);
3739        assert!(!report.path_results[0].success);
3740        assert!(report.path_results[0].error.is_some());
3741    }
3742
3743    #[test]
3744    fn test_sync_result_default() {
3745        let result = SyncResult::default();
3746        assert!(matches!(result, SyncResult::Skipped));
3747        assert_eq!(result.label(), "never");
3748    }
3749
3750    #[test]
3751    fn test_source_sync_info_default() {
3752        let info = SourceSyncInfo::default();
3753        assert!(info.last_sync.is_none());
3754        assert_eq!(info.files_synced, 0);
3755        assert_eq!(info.bytes_transferred, 0);
3756        assert_eq!(info.duration_ms, 0);
3757    }
3758
3759    #[test]
3760    fn test_sync_status_update() {
3761        let mut status = SyncStatus::default();
3762
3763        let mut report = SyncReport::new("laptop", SyncMethod::Rsync);
3764        report.add_path_result(PathSyncResult {
3765            files_transferred: 10,
3766            bytes_transferred: 1000,
3767            success: true,
3768            ..Default::default()
3769        });
3770        report.total_duration_ms = 500;
3771
3772        status.update("laptop", &report);
3773
3774        let info = status.get("laptop").unwrap();
3775        assert!(info.last_sync.is_some());
3776        assert!(matches!(info.last_result, SyncResult::Success));
3777        assert_eq!(info.files_synced, 10);
3778        assert_eq!(info.bytes_transferred, 1000);
3779        assert_eq!(info.duration_ms, 500);
3780    }
3781
3782    #[test]
3783    fn test_sync_status_partial_failure() {
3784        let mut status = SyncStatus::default();
3785
3786        let mut report = SyncReport::new("server", SyncMethod::Rsync);
3787        report.add_path_result(PathSyncResult {
3788            success: true,
3789            files_transferred: 5,
3790            ..Default::default()
3791        });
3792        report.add_path_result(PathSyncResult {
3793            success: false,
3794            error: Some("Connection refused".into()),
3795            ..Default::default()
3796        });
3797
3798        status.update("server", &report);
3799
3800        let info = status.get("server").unwrap();
3801        assert!(matches!(info.last_result, SyncResult::PartialFailure(_)));
3802    }
3803
3804    #[test]
3805    fn test_sync_status_full_failure() {
3806        let mut status = SyncStatus::default();
3807
3808        let mut report = SyncReport::new("dead-host", SyncMethod::Rsync);
3809        report.add_path_result(PathSyncResult {
3810            success: false,
3811            error: Some("Host unreachable".into()),
3812            ..Default::default()
3813        });
3814
3815        status.update("dead-host", &report);
3816
3817        let info = status.get("dead-host").unwrap();
3818        assert!(matches!(info.last_result, SyncResult::Failed(_)));
3819    }
3820
3821    #[test]
3822    fn test_sync_status_save_round_trips() {
3823        let temp = TempDir::new().expect("tempdir");
3824        let mut status = SyncStatus::default();
3825        let mut report = SyncReport::new("laptop", SyncMethod::Rsync);
3826        report.add_path_result(PathSyncResult {
3827            files_transferred: 3,
3828            bytes_transferred: 42,
3829            success: true,
3830            ..Default::default()
3831        });
3832        status.update("laptop", &report);
3833
3834        status.save(temp.path()).expect("save status");
3835        let loaded = SyncStatus::load(temp.path()).expect("load status");
3836
3837        let info = loaded.get("laptop").expect("round-tripped source");
3838        assert_eq!(info.files_synced, 3);
3839        assert_eq!(info.bytes_transferred, 42);
3840        assert!(matches!(info.last_result, SyncResult::Success));
3841    }
3842
3843    #[cfg(unix)]
3844    #[test]
3845    fn test_sync_status_temp_write_refuses_existing_symlink() {
3846        use std::os::unix::fs::symlink;
3847
3848        let temp = TempDir::new().expect("tempdir");
3849        let protected = temp.path().join("protected.json");
3850        let temp_path = temp.path().join(".sync_status.json.tmp");
3851
3852        std::fs::write(&protected, b"protected").expect("write protected target");
3853        symlink(&protected, &temp_path).expect("create temp symlink");
3854
3855        let err = write_sync_status_temp_file_at(&temp_path, br#"{"sources":{}}"#)
3856            .expect_err("existing temp symlink must be rejected");
3857
3858        assert_eq!(err.kind(), std::io::ErrorKind::AlreadyExists);
3859        assert_eq!(
3860            std::fs::read(&protected).expect("read protected target"),
3861            b"protected"
3862        );
3863        assert!(
3864            std::fs::symlink_metadata(&temp_path)
3865                .expect("temp path metadata")
3866                .file_type()
3867                .is_symlink(),
3868            "failed temp write should leave the existing symlink untouched"
3869        );
3870    }
3871
3872    #[test]
3873    fn test_sync_status_retain_sources_prunes_removed_entries() {
3874        let mut status = SyncStatus::default();
3875        status.sources.insert(
3876            "laptop".into(),
3877            SourceSyncInfo {
3878                files_synced: 3,
3879                ..Default::default()
3880            },
3881        );
3882        status.sources.insert(
3883            "desktop".into(),
3884            SourceSyncInfo {
3885                files_synced: 5,
3886                ..Default::default()
3887            },
3888        );
3889
3890        let removed_any = status.retain_sources(["laptop"]);
3891
3892        assert!(removed_any);
3893        assert!(status.get("laptop").is_some());
3894        assert!(status.get("desktop").is_none());
3895    }
3896
3897    fn source_with_schedule(schedule: SyncSchedule) -> SourceDefinition {
3898        let mut source = SourceDefinition::ssh("laptop", "user@laptop.local");
3899        source.sync_schedule = schedule;
3900        source.paths = vec!["~/.claude/projects".to_string()];
3901        source
3902    }
3903
3904    fn status_with_info(info: SourceSyncInfo) -> SyncStatus {
3905        let mut status = SyncStatus::default();
3906        status.set_info("laptop", info);
3907        status
3908    }
3909
3910    #[test]
3911    fn source_sync_decision_skips_healthy_source_until_schedule_due() {
3912        let now_ms = 1_700_000_000_000;
3913        let source = source_with_schedule(SyncSchedule::Hourly);
3914        let status = status_with_info(SourceSyncInfo {
3915            last_sync: Some(now_ms - 10 * 60 * 1000),
3916            last_result: SyncResult::Success,
3917            duration_ms: 250,
3918            ..Default::default()
3919        });
3920
3921        let decision = status.decision_for_source_at(&source, now_ms, false);
3922
3923        assert_eq!(decision.action, SourceSyncAction::Skip);
3924        assert_eq!(decision.health, SourceHealthKind::Healthy);
3925        assert!(!decision.fallback_active);
3926        assert_eq!(
3927            decision.next_eligible_sync_ms,
3928            Some(now_ms + 50 * 60 * 1000)
3929        );
3930        assert_eq!(decision.staleness_ms, Some(10 * 60 * 1000));
3931        assert_eq!(decision.stale_value_score, 16);
3932    }
3933
3934    #[test]
3935    fn source_sync_decision_syncs_stale_scheduled_source() {
3936        let now_ms = 1_700_000_000_000;
3937        let source = source_with_schedule(SyncSchedule::Hourly);
3938        let status = status_with_info(SourceSyncInfo {
3939            last_sync: Some(now_ms - 2 * 60 * 60 * 1000),
3940            last_result: SyncResult::Success,
3941            duration_ms: 250,
3942            ..Default::default()
3943        });
3944
3945        let decision = status.decision_for_source_at(&source, now_ms, false);
3946
3947        assert_eq!(decision.action, SourceSyncAction::Sync);
3948        assert_eq!(decision.health, SourceHealthKind::Stale);
3949        assert_eq!(decision.stale_value_score, 100);
3950        assert!(
3951            decision
3952                .reasons
3953                .iter()
3954                .any(|reason| reason.contains("schedule is due"))
3955        );
3956    }
3957
3958    #[test]
3959    fn source_sync_decision_defers_auth_failures_with_fallback_reason() {
3960        let now_ms = 1_700_000_000_000;
3961        let source = source_with_schedule(SyncSchedule::Hourly);
3962        let status = status_with_info(SourceSyncInfo {
3963            last_sync: Some(now_ms - 10 * 60 * 1000),
3964            last_result: SyncResult::Failed("Permission denied (publickey)".into()),
3965            duration_ms: 800,
3966            consecutive_failures: 1,
3967            ..Default::default()
3968        });
3969
3970        let decision = status.decision_for_source_at(&source, now_ms, false);
3971
3972        assert_eq!(decision.action, SourceSyncAction::Defer);
3973        assert_eq!(decision.health, SourceHealthKind::AuthFailed);
3974        assert!(decision.fallback_active);
3975        assert_eq!(decision.health_score, 10);
3976    }
3977
3978    #[test]
3979    fn source_sync_decision_marks_partial_success_as_flapping() {
3980        let now_ms = 1_700_000_000_000;
3981        let source = source_with_schedule(SyncSchedule::Hourly);
3982        let status = status_with_info(SourceSyncInfo {
3983            last_sync: Some(now_ms - 10 * 60 * 1000),
3984            last_result: SyncResult::PartialFailure("one path failed".into()),
3985            files_synced: 7,
3986            duration_ms: 900,
3987            consecutive_failures: 1,
3988            ..Default::default()
3989        });
3990
3991        let decision = status.decision_for_source_at(&source, now_ms, false);
3992
3993        assert_eq!(decision.action, SourceSyncAction::Skip);
3994        assert_eq!(decision.health, SourceHealthKind::Flapping);
3995        assert!(decision.fallback_active);
3996    }
3997
3998    #[test]
3999    fn source_sync_decision_keeps_local_fallback_after_unreachable_backoff_expires() {
4000        let now_ms = 1_700_000_000_000;
4001        let source = source_with_schedule(SyncSchedule::Hourly);
4002        let last_sync = now_ms - 10 * 60 * 1000;
4003        let status = status_with_info(SourceSyncInfo {
4004            last_sync: Some(last_sync),
4005            last_result: SyncResult::Failed("Host unreachable".into()),
4006            duration_ms: 900,
4007            consecutive_failures: 1,
4008            ..Default::default()
4009        });
4010
4011        let decision = status.decision_for_source_at(&source, now_ms, false);
4012
4013        assert_eq!(decision.action, SourceSyncAction::Skip);
4014        assert_eq!(decision.health, SourceHealthKind::Flapping);
4015        assert!(decision.fallback_active);
4016        assert_eq!(
4017            decision.backoff_until_ms,
4018            Some(last_sync + SOURCE_FAILURE_BACKOFF_BASE_MS)
4019        );
4020        assert!(
4021            decision
4022                .reasons
4023                .iter()
4024                .any(|reason| reason.contains("local fallback remains active"))
4025        );
4026    }
4027
4028    #[test]
4029    fn source_sync_decision_marks_slow_source_as_high_latency() {
4030        let now_ms = 1_700_000_000_000;
4031        let source = source_with_schedule(SyncSchedule::Hourly);
4032        let status = status_with_info(SourceSyncInfo {
4033            last_sync: Some(now_ms - 10 * 60 * 1000),
4034            last_result: SyncResult::Success,
4035            duration_ms: SOURCE_HIGH_LATENCY_MS + 1,
4036            ..Default::default()
4037        });
4038
4039        let decision = status.decision_for_source_at(&source, now_ms, false);
4040
4041        assert_eq!(decision.action, SourceSyncAction::Skip);
4042        assert_eq!(decision.health, SourceHealthKind::HighLatency);
4043        assert!(decision.fallback_active);
4044    }
4045
4046    #[test]
4047    fn source_sync_decision_manual_override_forces_sync() {
4048        let now_ms = 1_700_000_000_000;
4049        let source = source_with_schedule(SyncSchedule::Manual);
4050        let status = status_with_info(SourceSyncInfo {
4051            last_sync: Some(now_ms),
4052            last_result: SyncResult::Success,
4053            duration_ms: 100,
4054            ..Default::default()
4055        });
4056
4057        let decision = status.decision_for_source_at(&source, now_ms, true);
4058
4059        assert_eq!(decision.action, SourceSyncAction::Sync);
4060        assert!(decision.manual_override);
4061        assert!(
4062            decision
4063                .reasons
4064                .iter()
4065                .any(|reason| reason.contains("overrides automatic scheduling"))
4066        );
4067    }
4068
4069    #[test]
4070    fn test_unique_atomic_temp_path_changes_each_call() {
4071        let final_path = Path::new("/tmp/sync_status.json");
4072        let first = unique_atomic_temp_path(final_path);
4073        let second = unique_atomic_temp_path(final_path);
4074
4075        assert_ne!(first, second);
4076        assert_eq!(first.parent(), final_path.parent());
4077        assert_eq!(second.parent(), final_path.parent());
4078    }
4079
4080    #[test]
4081    fn test_replace_file_from_temp_overwrites_existing_file() {
4082        let temp = TempDir::new().expect("tempdir");
4083        let final_path = temp.path().join("sync_status.json");
4084        let first_tmp = temp.path().join("first.tmp");
4085        let second_tmp = temp.path().join("second.tmp");
4086
4087        std::fs::write(&first_tmp, "{\"first\":true}").expect("write first temp");
4088        replace_file_from_temp(&first_tmp, &final_path).expect("initial replace");
4089        assert_eq!(
4090            std::fs::read_to_string(&final_path).expect("read first final"),
4091            "{\"first\":true}"
4092        );
4093
4094        std::fs::write(&second_tmp, "{\"second\":true}").expect("write second temp");
4095        replace_file_from_temp(&second_tmp, &final_path).expect("overwrite replace");
4096        assert_eq!(
4097            std::fs::read_to_string(&final_path).expect("read second final"),
4098            "{\"second\":true}"
4099        );
4100    }
4101
4102    #[test]
4103    fn test_sync_engine_with_timeouts() {
4104        let engine = SyncEngine::new(Path::new("/data"))
4105            .with_connection_timeout(30)
4106            .with_transfer_timeout(600);
4107
4108        assert_eq!(engine.connection_timeout, 30);
4109        assert_eq!(engine.transfer_timeout, 600);
4110    }
4111
4112    #[test]
4113    fn test_sync_error_display() {
4114        assert_eq!(
4115            SyncError::NoHost.to_string(),
4116            "Source has no host configured"
4117        );
4118        assert_eq!(
4119            SyncError::NoPaths.to_string(),
4120            "Source has no paths configured"
4121        );
4122        assert_eq!(
4123            SyncError::InvalidPath("paths[0] cannot be empty".to_string()).to_string(),
4124            "Invalid source path: paths[0] cannot be empty"
4125        );
4126        assert_eq!(
4127            SyncError::Timeout(30).to_string(),
4128            "Connection timed out after 30 seconds"
4129        );
4130        assert_eq!(SyncError::Cancelled.to_string(), "Sync cancelled");
4131    }
4132
4133    // =========================================================================
4134    // SFTP helper function tests
4135    // =========================================================================
4136
4137    #[test]
4138    fn test_parse_ssh_host_simple() {
4139        let (user, host) = parse_ssh_host("myserver");
4140        assert!(user.is_none());
4141        assert_eq!(host, "myserver");
4142    }
4143
4144    #[test]
4145    fn test_parse_ssh_host_with_user() {
4146        let (user, host) = parse_ssh_host("admin@myserver");
4147        assert_eq!(user, Some("admin"));
4148        assert_eq!(host, "myserver");
4149    }
4150
4151    #[test]
4152    fn test_parse_ssh_host_with_domain() {
4153        let (user, host) = parse_ssh_host("deploy@server.example.com");
4154        assert_eq!(user, Some("deploy"));
4155        assert_eq!(host, "server.example.com");
4156    }
4157
4158    #[test]
4159    fn test_parse_ssh_host_email_like() {
4160        // Edge case: user looks like email prefix
4161        let (user, host) = parse_ssh_host("user@host");
4162        assert_eq!(user, Some("user"));
4163        assert_eq!(host, "host");
4164    }
4165
4166    #[test]
4167    fn test_first_nonblank_username_priority_and_trimming() {
4168        assert_eq!(
4169            first_nonblank_username([Some("  alice  "), Some("bob")]),
4170            Some("alice".to_string())
4171        );
4172        assert_eq!(
4173            first_nonblank_username([Some("  "), None, Some("carol")]),
4174            Some("carol".to_string())
4175        );
4176        assert_eq!(first_nonblank_username([None, Some("\t")]), None);
4177    }
4178
4179    #[test]
4180    fn test_expand_tilde_local_with_tilde_prefix() {
4181        let expanded = expand_tilde_local("~/Documents/file.txt");
4182        // Should start with home directory, not tilde
4183        assert!(!expanded.starts_with('~'));
4184        assert!(expanded.ends_with("/Documents/file.txt"));
4185    }
4186
4187    #[test]
4188    fn test_expand_tilde_local_just_tilde() {
4189        let expanded = expand_tilde_local("~");
4190        // Should be just home directory
4191        assert!(!expanded.starts_with('~'));
4192        assert!(!expanded.is_empty());
4193    }
4194
4195    #[test]
4196    fn test_expand_tilde_local_no_tilde() {
4197        let path = "/absolute/path/to/file";
4198        let expanded = expand_tilde_local(path);
4199        assert_eq!(expanded, path);
4200    }
4201
4202    #[test]
4203    fn test_expand_tilde_local_tilde_in_middle() {
4204        // Tilde in middle should not be expanded
4205        let path = "/path/with/~tilde/inside";
4206        let expanded = expand_tilde_local(path);
4207        assert_eq!(expanded, path);
4208    }
4209}