1use std::collections::HashMap;
28use std::path::{Path, PathBuf};
29use std::process::{Command, Stdio};
30use std::sync::{Mutex, OnceLock};
31use std::time::{Duration, Instant};
32
33use thiserror::Error;
34
35use super::{
36 config::{
37 SourceDefinition, SyncSchedule, discover_ssh_hosts, source_path_entry_error,
38 ssh_host_has_safe_token_chars, validate_optional_user_host_shape,
39 },
40 configure_child_process_group, host_key_verification_error, is_host_key_verification_failure,
41 strict_ssh_cli_tokens, strict_ssh_command_for_rsync, wait_for_child_output_with_timeout,
42};
43use ssh2::{FileStat, Session, Sftp};
44use std::io::{Read as IoRead, Write as IoWrite};
45use std::net::{Shutdown, TcpStream};
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56enum RsyncArgProtection {
57 None,
60 ProtectArgs,
62 SecludedArgs,
64}
65
66impl RsyncArgProtection {
67 fn is_supported(self) -> bool {
68 !matches!(self, Self::None)
69 }
70
71 fn flag(self) -> Option<&'static str> {
74 match self {
75 Self::ProtectArgs => Some("--protect-args"),
76 Self::SecludedArgs => Some("--secluded-args"),
77 Self::None => None,
78 }
79 }
80}
81
82fn detect_rsync_arg_protection() -> RsyncArgProtection {
83 static CACHED: OnceLock<RsyncArgProtection> = OnceLock::new();
84 *CACHED.get_or_init(|| {
85 let Some(out) = Command::new("rsync").arg("--help").output().ok() else {
86 return RsyncArgProtection::None;
87 };
88 let mut combined = String::from_utf8_lossy(&out.stdout).into_owned();
92 combined.push_str(&String::from_utf8_lossy(&out.stderr));
93 if combined.contains("--secluded-args") {
98 RsyncArgProtection::SecludedArgs
99 } else if combined.contains("--protect-args") {
100 RsyncArgProtection::ProtectArgs
101 } else {
102 RsyncArgProtection::None
103 }
104 })
105}
106
107fn quote_remote_shell_path(path: &str) -> String {
108 format!("'{}'", path.replace('\'', r#"'\''"#))
114}
115
116fn remote_spec_for_shell_bound_copy(host: &str, remote_path: &str) -> String {
117 format!("{host}:{}", quote_remote_shell_path(remote_path))
121}
122
123fn remote_spec_for_rsync(host: &str, remote_path: &str, protect_args_supported: bool) -> String {
124 if protect_args_supported {
125 format!("{host}:{remote_path}")
127 } else {
128 remote_spec_for_shell_bound_copy(host, remote_path)
130 }
131}
132
133fn run_rsync_command(
134 timeout_str: &str,
135 ssh_opts: &str,
136 remote_spec: &str,
137 local_path_str: &str,
138 arg_protection: RsyncArgProtection,
139) -> std::io::Result<std::process::Output> {
140 let mut cmd = Command::new("rsync");
141 cmd.args(["-avz", "--links", "--safe-links", "--stats", "--partial"]);
142 if let Some(flag) = arg_protection.flag() {
143 cmd.arg(flag);
144 }
145 cmd.args([
146 "--timeout",
147 timeout_str,
148 "-e",
149 ssh_opts,
150 "--",
151 remote_spec,
152 local_path_str,
153 ]);
154 cmd.output()
155}
156
157fn rsync_arg_protection_remote_rejected(stderr: &str) -> bool {
158 let lower = stderr.to_ascii_lowercase();
159 lower.contains("invalid option -- s")
161 || lower.contains("unknown option -- s")
162 || lower.contains("unrecognized option '--secluded-args'")
163 || lower.contains("unknown option -- secluded-args")
164 || lower.contains("unrecognized option '--protect-args'")
165 || lower.contains("unknown option -- protect-args")
166 || lower.contains("illegal option -- s")
168}
169
170fn parse_remote_rsync_is_openrsync(version_output: &str) -> bool {
178 version_output
179 .lines()
180 .next()
181 .map(|line| line.to_ascii_lowercase().contains("openrsync"))
182 .unwrap_or(false)
183}
184
185fn remote_openrsync_cache() -> &'static Mutex<HashMap<String, bool>> {
189 static CACHE: OnceLock<Mutex<HashMap<String, bool>>> = OnceLock::new();
190 CACHE.get_or_init(|| Mutex::new(HashMap::new()))
191}
192
193fn probe_remote_rsync_is_openrsync(host: &str, timeout_secs: u64) -> bool {
200 {
202 let cache = remote_openrsync_cache()
203 .lock()
204 .unwrap_or_else(|e| e.into_inner());
205 if let Some(&cached) = cache.get(host) {
206 return cached;
207 }
208 }
209
210 let is_openrsync = detect_remote_rsync_is_openrsync_via_ssh(host, timeout_secs);
212
213 {
215 let mut cache = remote_openrsync_cache()
216 .lock()
217 .unwrap_or_else(|e| e.into_inner());
218 cache.insert(host.to_string(), is_openrsync);
219 }
220
221 is_openrsync
222}
223
224fn detect_remote_rsync_is_openrsync_via_ssh(host: &str, timeout_secs: u64) -> bool {
226 let timeout_secs = timeout_secs.clamp(1, 30);
227 let mut cmd = Command::new("ssh");
228 cmd.args(strict_ssh_cli_tokens(timeout_secs))
229 .arg("-o")
230 .arg("LogLevel=ERROR")
231 .arg("--")
232 .arg(host)
233 .arg("rsync --version 2>&1 | head -1")
234 .stdout(Stdio::piped())
235 .stderr(Stdio::piped());
236 configure_child_process_group(&mut cmd);
237
238 let Ok(Some(output)) = cmd.spawn().and_then(|child| {
239 wait_for_child_output_with_timeout(child, Duration::from_secs(timeout_secs))
240 }) else {
241 return false;
242 };
243 if !output.status.success() && output.stdout.is_empty() {
244 return false;
245 }
246 let first_line = String::from_utf8_lossy(&output.stdout);
247 let result = parse_remote_rsync_is_openrsync(first_line.trim());
248 tracing::debug!(
249 host = %host,
250 rsync_version_line = %first_line.trim(),
251 is_openrsync = result,
252 "remote rsync version probe"
253 );
254 result
255}
256
257fn remote_spec_for_scp(host: &str, remote_path: &str) -> String {
258 remote_spec_for_shell_bound_copy(host, remote_path)
261}
262
263fn remote_find_regular_files_command(remote_path: &str) -> String {
264 format!(
265 "find -P {} -type f -print0",
266 quote_remote_shell_path(remote_path)
267 )
268}
269
270fn parse_remote_home_stdout(stdout: &[u8]) -> Option<String> {
271 let output = String::from_utf8_lossy(stdout);
272 for line in output.lines() {
273 if let Some(home) = line.trim().strip_prefix("CASS_HOME_MARKER:")
274 && home.starts_with('/')
275 && !home.contains('\0')
276 {
277 return Some(home.to_string());
278 }
279 }
280 None
281}
282
283fn parse_null_terminated_utf8_paths(bytes: &[u8]) -> Vec<String> {
284 bytes
285 .split(|byte| *byte == 0)
286 .filter(|part| !part.is_empty())
287 .filter_map(|part| std::str::from_utf8(part).ok())
288 .map(ToOwned::to_owned)
289 .collect()
290}
291
292fn validate_remote_sync_path_entry(index: usize, path: &str) -> Result<(), SyncError> {
293 match source_path_entry_error(index, path) {
294 Some(message) => Err(SyncError::InvalidPath(message)),
295 None => Ok(()),
296 }
297}
298
299fn invalid_remote_sync_path_result(remote_path: &str, err: SyncError) -> PathSyncResult {
300 PathSyncResult {
301 remote_path: remote_path.to_string(),
302 success: false,
303 error: Some(err.to_string()),
304 ..Default::default()
305 }
306}
307
308fn remote_file_to_safe_local_path(
309 remote_root: &Path,
310 remote_file: &Path,
311 local_container: &Path,
312 leaf_name: &str,
313) -> Option<PathBuf> {
314 let mut local_path = local_container.join(leaf_name);
315 if remote_file == remote_root {
316 return Some(local_path);
317 }
318
319 let relative = remote_file.strip_prefix(remote_root).ok()?;
320 for component in relative.components() {
321 match component {
322 std::path::Component::Normal(name) => local_path.push(name),
323 std::path::Component::CurDir => {}
324 _ => return None,
325 }
326 }
327
328 Some(local_path)
329}
330
331fn existing_local_symlink_below_root(root: &Path, path: &Path) -> Result<Option<PathBuf>, String> {
332 let rel = path.strip_prefix(root).map_err(|_| {
333 format!(
334 "Local path {} is outside sync root {}",
335 path.display(),
336 root.display()
337 )
338 })?;
339
340 let mut current = root.to_path_buf();
341 if let Some(link) = existing_path_symlink(¤t)? {
342 return Ok(Some(link));
343 }
344
345 for component in rel.components() {
346 match component {
347 std::path::Component::Normal(name) => current.push(name),
348 std::path::Component::CurDir => continue,
349 _ => {
350 return Err(format!(
351 "Local path {} contains unsafe component below sync root {}",
352 path.display(),
353 root.display()
354 ));
355 }
356 }
357
358 if let Some(link) = existing_path_symlink(¤t)? {
359 return Ok(Some(link));
360 }
361 }
362
363 Ok(None)
364}
365
366fn existing_path_symlink(path: &Path) -> Result<Option<PathBuf>, String> {
367 match std::fs::symlink_metadata(path) {
368 Ok(metadata) if metadata.file_type().is_symlink() => Ok(Some(path.to_path_buf())),
369 Ok(_) => Ok(None),
370 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
371 Err(e) => Err(format!("Failed to inspect {}: {}", path.display(), e)),
372 }
373}
374
375fn reject_local_symlink_below_root(root: &Path, path: &Path) -> Result<(), String> {
376 if let Some(link) = existing_local_symlink_below_root(root, path)? {
377 return Err(format!(
378 "Refusing to write {} through local symlink {}",
379 path.display(),
380 link.display()
381 ));
382 }
383
384 Ok(())
385}
386
387fn prepare_local_sync_container(sync_root: &Path, local_path: &Path) -> Result<(), String> {
388 reject_local_symlink_below_root(sync_root, local_path)?;
389 std::fs::create_dir_all(local_path)
390 .map_err(|e| format!("Failed to create directory: {}", e))?;
391 reject_local_symlink_below_root(sync_root, local_path)?;
392 Ok(())
393}
394
395fn prepare_local_sync_root(local_store: &Path, mirror_dir: &Path) -> Result<(), String> {
396 reject_local_symlink_below_root(local_store, mirror_dir)?;
397 std::fs::create_dir_all(mirror_dir)
398 .map_err(|e| format!("Failed to create directory: {}", e))?;
399 reject_local_symlink_below_root(local_store, mirror_dir)?;
400 Ok(())
401}
402
403fn sftp_file_stat_is_symlink(stat: &FileStat) -> bool {
404 stat.file_type().is_symlink()
405}
406
407#[derive(Error, Debug)]
409pub enum SyncError {
410 #[error("Source has no host configured")]
411 NoHost,
412
413 #[error("Source has no paths configured")]
414 NoPaths,
415
416 #[error("Invalid source path: {0}")]
417 InvalidPath(String),
418
419 #[error("Invalid source definition: {0}")]
420 InvalidSource(String),
421
422 #[error("rsync command failed: {0}")]
423 RsyncFailed(String),
424
425 #[error("Failed to create local directory: {0}")]
426 CreateDirFailed(#[from] std::io::Error),
427
428 #[error("SSH connection failed: {0}")]
429 SshFailed(String),
430
431 #[error("Connection timed out after {0} seconds")]
432 Timeout(u64),
433
434 #[error("Sync cancelled")]
435 Cancelled,
436}
437
438#[derive(Debug, Clone, Copy, PartialEq, Eq)]
440pub enum SyncMethod {
441 Rsync,
443 WslRsync,
446 Scp,
453 Sftp,
459}
460
461impl SyncMethod {
462 pub fn as_str(self) -> &'static str {
463 match self {
464 Self::Rsync => "rsync",
465 Self::WslRsync => "wsl-rsync",
466 Self::Scp => "scp",
467 Self::Sftp => "sftp",
468 }
469 }
470}
471
472impl std::fmt::Display for SyncMethod {
473 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
474 f.write_str(self.as_str())
475 }
476}
477
478#[derive(Debug, Clone, Default)]
480pub struct PathSyncResult {
481 pub remote_path: String,
483 pub local_path: PathBuf,
485 pub files_transferred: u64,
487 pub bytes_transferred: u64,
489 pub success: bool,
491 pub error: Option<String>,
493 pub duration_ms: u64,
495}
496
497#[derive(Debug, Clone)]
499pub struct SyncReport {
500 pub source_name: String,
502 pub method: SyncMethod,
504 pub path_results: Vec<PathSyncResult>,
506 pub total_duration_ms: u64,
508 pub all_succeeded: bool,
510}
511
512impl SyncReport {
513 pub fn new(source_name: impl Into<String>, method: SyncMethod) -> Self {
515 Self {
516 source_name: source_name.into(),
517 method,
518 path_results: Vec::new(),
519 total_duration_ms: 0,
520 all_succeeded: true,
521 }
522 }
523
524 pub fn failed(source_name: impl Into<String>, error: SyncError) -> Self {
526 Self {
527 source_name: source_name.into(),
528 method: SyncMethod::Rsync,
529 path_results: vec![PathSyncResult {
530 error: Some(error.to_string()),
531 success: false,
532 ..Default::default()
533 }],
534 total_duration_ms: 0,
535 all_succeeded: false,
536 }
537 }
538
539 pub fn add_path_result(&mut self, result: PathSyncResult) {
541 if !result.success {
542 self.all_succeeded = false;
543 }
544 self.path_results.push(result);
545 }
546
547 pub fn total_files(&self) -> u64 {
549 self.path_results.iter().map(|r| r.files_transferred).sum()
550 }
551
552 pub fn total_bytes(&self) -> u64 {
554 self.path_results.iter().map(|r| r.bytes_transferred).sum()
555 }
556
557 pub fn successful_paths(&self) -> usize {
559 self.path_results.iter().filter(|r| r.success).count()
560 }
561
562 pub fn failed_paths(&self) -> usize {
564 self.path_results.iter().filter(|r| !r.success).count()
565 }
566
567 pub fn sync_result(&self) -> SyncResult {
569 if self.all_succeeded {
570 SyncResult::Success
571 } else {
572 let errors: Vec<String> = self
573 .path_results
574 .iter()
575 .filter_map(|r| r.error.clone())
576 .collect();
577 if self.successful_paths() > 0 {
578 SyncResult::PartialFailure(errors.join("; "))
579 } else {
580 SyncResult::Failed(errors.join("; "))
581 }
582 }
583 }
584}
585
586#[derive(Debug, Default)]
588struct RsyncStats {
589 files_transferred: u64,
590 bytes_transferred: u64,
591}
592
593pub struct SyncEngine {
595 local_store: PathBuf,
598 connection_timeout: u64,
600 transfer_timeout: u64,
602}
603
604impl SyncEngine {
605 pub fn new(data_dir: &Path) -> Self {
610 Self {
611 local_store: data_dir.to_path_buf(),
612 connection_timeout: 10,
613 transfer_timeout: 300, }
615 }
616
617 pub fn with_connection_timeout(mut self, seconds: u64) -> Self {
619 self.connection_timeout = seconds;
620 self
621 }
622
623 pub fn with_transfer_timeout(mut self, seconds: u64) -> Self {
625 self.transfer_timeout = seconds;
626 self
627 }
628
629 pub fn mirror_dir(&self, source_name: &str) -> PathBuf {
631 self.local_store
632 .join("remotes")
633 .join(source_name)
634 .join("mirror")
635 }
636
637 fn get_remote_home(&self, host: &str) -> Result<String, SyncError> {
641 if host.trim().is_empty()
643 || host.starts_with('-')
644 || !ssh_host_has_safe_token_chars(host)
645 || validate_optional_user_host_shape(host).is_err()
646 {
647 return Err(SyncError::SshFailed(format!(
648 "Invalid characters in host: {}",
649 host
650 )));
651 }
652
653 let timeout_secs = self.connection_timeout.max(1);
654 let mut cmd = Command::new("ssh");
655 cmd.args(strict_ssh_cli_tokens(timeout_secs))
656 .arg("--")
657 .arg(host)
658 .arg("printf 'CASS_HOME_MARKER:%s\\n' \"$HOME\"")
659 .stdout(Stdio::piped())
660 .stderr(Stdio::piped());
661 configure_child_process_group(&mut cmd);
662
663 let child = cmd
664 .spawn()
665 .map_err(|e| SyncError::SshFailed(format!("Failed to execute ssh: {}", e)))?;
666 let output = wait_for_child_output_with_timeout(child, Duration::from_secs(timeout_secs))
667 .map_err(|e| SyncError::SshFailed(format!("SSH command failed: {}", e)))?
668 .ok_or(SyncError::Timeout(timeout_secs))?;
669
670 if !output.status.success() {
671 let stderr = String::from_utf8_lossy(&output.stderr);
672 if is_host_key_verification_failure(&stderr) {
673 return Err(SyncError::SshFailed(host_key_verification_error(host)));
674 }
675 return Err(SyncError::SshFailed(format!(
676 "Failed to get remote home directory: {}",
677 stderr.trim()
678 )));
679 }
680
681 let remote_home = parse_remote_home_stdout(&output.stdout).ok_or_else(|| {
682 SyncError::SshFailed(
683 "Unable to parse remote home directory from SSH output".to_string(),
684 )
685 })?;
686
687 tracing::debug!(host = %host, remote_home = %remote_home, "got remote home directory");
688 Ok(remote_home)
689 }
690
691 fn expand_tilde_with_home(path: &str, remote_home: Option<&str>) -> String {
695 if !path.starts_with('~') {
696 return path.to_string();
697 }
698
699 let Some(home) = remote_home else {
700 return path.to_string();
701 };
702
703 if path == "~" {
704 home.to_string()
705 } else if let Some(rest) = path.strip_prefix("~/") {
706 format!("{}/{}", home, rest)
707 } else {
708 path.to_string()
710 }
711 }
712
713 pub fn detect_sync_method() -> SyncMethod {
727 if Command::new("rsync")
729 .arg("--version")
730 .output()
731 .map(|o| o.status.success())
732 .unwrap_or(false)
733 {
734 return SyncMethod::Rsync;
735 }
736
737 #[cfg(target_os = "windows")]
739 if Command::new("wsl")
740 .args(["rsync", "--version"])
741 .output()
742 .map(|o| o.status.success())
743 .unwrap_or(false)
744 {
745 return SyncMethod::WslRsync;
746 }
747
748 if Command::new("scp")
751 .arg("-S")
752 .arg("ssh")
753 .arg("--")
754 .output()
757 .is_ok()
758 {
759 if which_scp_exists() {
761 return SyncMethod::Scp;
762 }
763 }
764
765 SyncMethod::Sftp
767 }
768
769 pub fn sync_source(&self, source: &SourceDefinition) -> Result<SyncReport, SyncError> {
774 if !source.is_remote() {
775 return Err(SyncError::NoHost);
776 }
777
778 let host = source.host.as_ref().ok_or(SyncError::NoHost)?;
779
780 if source.paths.is_empty() {
781 return Err(SyncError::NoPaths);
782 }
783
784 source
785 .validate_structure()
786 .map_err(|e| SyncError::InvalidSource(e.to_string()))?;
787
788 let method = Self::detect_sync_method();
789 let mut report = SyncReport::new(&source.name, method);
790 let overall_start = Instant::now();
791
792 let mirror_dir = self.mirror_dir(&source.name);
794 prepare_local_sync_root(&self.local_store, &mirror_dir)
795 .map_err(|e| SyncError::CreateDirFailed(std::io::Error::other(e)))?;
796
797 let remote_home = if source.paths.iter().enumerate().any(|(index, path)| {
799 path.starts_with('~') && validate_remote_sync_path_entry(index, path).is_ok()
800 }) {
801 match self.get_remote_home(host) {
802 Ok(home) => Some(home),
803 Err(e) => {
804 tracing::warn!(host = %host, error = %e, "Failed to get remote home directory");
805 None
806 }
807 }
808 } else {
809 None
810 };
811
812 let remote_is_openrsync = if matches!(method, SyncMethod::Rsync | SyncMethod::WslRsync) {
818 probe_remote_rsync_is_openrsync(host, self.connection_timeout)
819 } else {
820 false
821 };
822 if remote_is_openrsync {
823 tracing::info!(
824 host = %host,
825 "remote rsync identified as openrsync; will omit --secluded-args / -s"
826 );
827 }
828
829 for (index, remote_path) in source.paths.iter().enumerate() {
830 if let Err(err) = validate_remote_sync_path_entry(index, remote_path) {
831 report.add_path_result(invalid_remote_sync_path_result(remote_path, err));
832 continue;
833 }
834
835 let result = match method {
836 SyncMethod::Rsync => self.sync_path_rsync(
837 host,
838 remote_path,
839 &mirror_dir,
840 remote_home.as_deref(),
841 remote_is_openrsync,
842 ),
843 SyncMethod::WslRsync => {
844 self.sync_path_wsl_rsync(host, remote_path, &mirror_dir, remote_home.as_deref())
845 }
846 SyncMethod::Scp => {
847 self.sync_path_scp(host, remote_path, &mirror_dir, remote_home.as_deref())
848 }
849 SyncMethod::Sftp => {
850 self.sync_path_sftp(host, remote_path, &mirror_dir, remote_home.as_deref())
851 }
852 };
853 report.add_path_result(result);
854 }
855
856 report.total_duration_ms = overall_start.elapsed().as_millis() as u64;
857 Ok(report)
858 }
859
860 pub fn sync_all(
864 &self,
865 sources: impl Iterator<Item = impl std::borrow::Borrow<SourceDefinition>>,
866 ) -> Vec<SyncReport> {
867 sources
868 .map(|source| {
869 let source = source.borrow();
870 self.sync_source(source)
871 .unwrap_or_else(|e| SyncReport::failed(&source.name, e))
872 })
873 .collect()
874 }
875
876 fn sync_path_rsync(
887 &self,
888 host: &str,
889 remote_path: &str,
890 dest_dir: &Path,
891 remote_home: Option<&str>,
892 remote_is_openrsync: bool,
893 ) -> PathSyncResult {
894 let start = Instant::now();
895 if remote_path.starts_with('~') && remote_home.is_none() {
896 let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
897 return PathSyncResult {
898 remote_path: remote_path.to_string(),
899 local_path,
900 success: false,
901 error: Some(
902 "Cannot expand '~' in remote path; failed to determine remote home directory"
903 .to_string(),
904 ),
905 duration_ms: start.elapsed().as_millis() as u64,
906 ..Default::default()
907 };
908 }
909
910 let expanded_path = Self::expand_tilde_with_home(remote_path, remote_home);
912
913 if remote_path.starts_with('~') && expanded_path == remote_path {
915 tracing::warn!(
916 remote_path = %remote_path,
917 "Could not expand tilde in path (remote home directory not available)"
918 );
919 }
920
921 let safe_name = path_to_safe_dirname(remote_path);
924 let local_path = dest_dir.join(&safe_name);
925
926 if let Err(e) = prepare_local_sync_container(dest_dir, &local_path) {
928 return PathSyncResult {
929 remote_path: remote_path.to_string(),
930 local_path: local_path.clone(),
931 success: false,
932 error: Some(e),
933 duration_ms: start.elapsed().as_millis() as u64,
934 ..Default::default()
935 };
936 }
937
938 let arg_protection = if remote_is_openrsync {
947 RsyncArgProtection::None
948 } else {
949 detect_rsync_arg_protection()
950 };
951 let protect_args_supported = arg_protection.is_supported();
952 let remote_spec = remote_spec_for_rsync(host, &expanded_path, protect_args_supported);
953 let ssh_opts = strict_ssh_command_for_rsync(self.connection_timeout);
954
955 let local_path_str = match local_path.to_str() {
956 Some(s) => s,
957 None => {
958 return PathSyncResult {
959 remote_path: remote_path.to_string(),
960 local_path,
961 success: false,
962 error: Some("Local path contains invalid UTF-8".to_string()),
963 duration_ms: start.elapsed().as_millis() as u64,
964 ..Default::default()
965 };
966 }
967 };
968
969 tracing::debug!(
970 host = %host,
971 remote_path = %expanded_path,
972 local_path = %local_path.display(),
973 "starting rsync"
974 );
975
976 let timeout_str = self.transfer_timeout.to_string();
977 let output = match run_rsync_command(
978 &timeout_str,
979 &ssh_opts,
980 &remote_spec,
981 local_path_str,
982 arg_protection,
983 ) {
984 Ok(o) => o,
985 Err(e) => {
986 return PathSyncResult {
987 remote_path: remote_path.to_string(),
988 local_path,
989 success: false,
990 error: Some(format!("Failed to execute rsync: {}", e)),
991 duration_ms: start.elapsed().as_millis() as u64,
992 ..Default::default()
993 };
994 }
995 };
996
997 let mut duration_ms = start.elapsed().as_millis() as u64;
998 let mut status_success = output.status.success();
999 let mut stdout = String::from_utf8_lossy(&output.stdout).into_owned();
1000 let mut stderr = String::from_utf8_lossy(&output.stderr).into_owned();
1001
1002 if !status_success
1003 && arg_protection.is_supported()
1004 && rsync_arg_protection_remote_rejected(&stderr)
1005 {
1006 tracing::warn!(
1007 host = %host,
1008 remote_path = %expanded_path,
1009 protection_flag = ?arg_protection.flag(),
1010 "remote rsync rejected argument-protection flag; retrying with shell-quoted remote path"
1011 );
1012
1013 let fallback_remote_spec = remote_spec_for_rsync(host, &expanded_path, false);
1014 let retry = match run_rsync_command(
1015 &timeout_str,
1016 &ssh_opts,
1017 &fallback_remote_spec,
1018 local_path_str,
1019 RsyncArgProtection::None,
1020 ) {
1021 Ok(o) => o,
1022 Err(e) => {
1023 return PathSyncResult {
1024 remote_path: remote_path.to_string(),
1025 local_path,
1026 success: false,
1027 error: Some(format!(
1028 "Failed to execute rsync fallback without argument protection: {}",
1029 e
1030 )),
1031 duration_ms: start.elapsed().as_millis() as u64,
1032 ..Default::default()
1033 };
1034 }
1035 };
1036
1037 duration_ms = start.elapsed().as_millis() as u64;
1038 status_success = retry.status.success();
1039 stdout = String::from_utf8_lossy(&retry.stdout).into_owned();
1040 stderr = String::from_utf8_lossy(&retry.stderr).into_owned();
1041 }
1042
1043 if !status_success {
1044 let error_msg = if stderr.contains("Connection refused")
1046 || stderr.contains("Connection timed out")
1047 {
1048 format!("SSH connection failed: {}", stderr.trim())
1049 } else if is_host_key_verification_failure(&stderr) {
1050 host_key_verification_error(host)
1051 } else if stderr.contains("No such file or directory") {
1052 format!("Remote path not found: {}", expanded_path)
1053 } else if stderr.contains("Permission denied") {
1054 format!("Permission denied: {}", stderr.trim())
1055 } else {
1056 format!("rsync failed: {}", stderr.trim())
1057 };
1058
1059 tracing::warn!(
1060 host = %host,
1061 remote_path = %expanded_path,
1062 error = %error_msg,
1063 "rsync failed"
1064 );
1065
1066 return PathSyncResult {
1067 remote_path: remote_path.to_string(),
1068 local_path,
1069 success: false,
1070 error: Some(error_msg),
1071 duration_ms,
1072 ..Default::default()
1073 };
1074 }
1075
1076 let stats = parse_rsync_stats(&stdout);
1078
1079 tracing::info!(
1080 host = %host,
1081 remote_path = %expanded_path,
1082 files = stats.files_transferred,
1083 bytes = stats.bytes_transferred,
1084 duration_ms,
1085 "rsync completed"
1086 );
1087
1088 PathSyncResult {
1089 remote_path: remote_path.to_string(),
1090 local_path,
1091 files_transferred: stats.files_transferred,
1092 bytes_transferred: stats.bytes_transferred,
1093 success: true,
1094 error: None,
1095 duration_ms,
1096 }
1097 }
1098
1099 fn sync_path_wsl_rsync(
1104 &self,
1105 host: &str,
1106 remote_path: &str,
1107 dest_dir: &Path,
1108 remote_home: Option<&str>,
1109 ) -> PathSyncResult {
1110 let start = Instant::now();
1111
1112 if remote_path.starts_with('~') && remote_home.is_none() {
1113 let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
1114 return PathSyncResult {
1115 remote_path: remote_path.to_string(),
1116 local_path,
1117 success: false,
1118 error: Some(
1119 "Cannot expand '~' in remote path; failed to determine remote home directory"
1120 .to_string(),
1121 ),
1122 duration_ms: start.elapsed().as_millis() as u64,
1123 ..Default::default()
1124 };
1125 }
1126
1127 let expanded_path = Self::expand_tilde_with_home(remote_path, remote_home);
1128 let safe_name = path_to_safe_dirname(remote_path);
1129 let local_path = dest_dir.join(&safe_name);
1130
1131 if let Err(e) = prepare_local_sync_container(dest_dir, &local_path) {
1132 return PathSyncResult {
1133 remote_path: remote_path.to_string(),
1134 local_path,
1135 success: false,
1136 error: Some(e),
1137 duration_ms: start.elapsed().as_millis() as u64,
1138 ..Default::default()
1139 };
1140 }
1141
1142 let local_path_str = match local_path.to_str() {
1143 Some(s) => s,
1144 None => {
1145 return PathSyncResult {
1146 remote_path: remote_path.to_string(),
1147 local_path,
1148 success: false,
1149 error: Some("Local path contains invalid UTF-8".to_string()),
1150 duration_ms: start.elapsed().as_millis() as u64,
1151 ..Default::default()
1152 };
1153 }
1154 };
1155
1156 let wsl_dest = windows_path_to_wsl(local_path_str);
1160
1161 let remote_spec = remote_spec_for_rsync(host, &expanded_path, true);
1162 let ssh_opts = strict_ssh_command_for_rsync(self.connection_timeout);
1163 let timeout_str = self.transfer_timeout.to_string();
1164
1165 let mut cmd = Command::new("wsl");
1166 cmd.args([
1167 "rsync",
1168 "-avz",
1169 "--links",
1170 "--safe-links",
1171 "--stats",
1172 "--partial",
1173 ]);
1174 cmd.arg("--protect-args");
1176 cmd.args([
1177 "--timeout",
1178 &timeout_str,
1179 "-e",
1180 &ssh_opts,
1181 "--",
1182 &remote_spec,
1183 &wsl_dest,
1184 ]);
1185
1186 tracing::debug!(
1187 host = %host,
1188 remote_path = %expanded_path,
1189 local_path = %local_path.display(),
1190 wsl_dest = %wsl_dest,
1191 "starting wsl rsync"
1192 );
1193
1194 let output = match cmd.output() {
1195 Ok(o) => o,
1196 Err(e) => {
1197 return PathSyncResult {
1198 remote_path: remote_path.to_string(),
1199 local_path,
1200 success: false,
1201 error: Some(format!("Failed to execute wsl rsync: {}", e)),
1202 duration_ms: start.elapsed().as_millis() as u64,
1203 ..Default::default()
1204 };
1205 }
1206 };
1207
1208 let duration_ms = start.elapsed().as_millis() as u64;
1209 let stdout = String::from_utf8_lossy(&output.stdout);
1210 let stderr = String::from_utf8_lossy(&output.stderr);
1211
1212 if !output.status.success() {
1213 let error_msg = if stderr.contains("Connection refused")
1214 || stderr.contains("Connection timed out")
1215 {
1216 format!("SSH connection failed: {}", stderr.trim())
1217 } else if is_host_key_verification_failure(&stderr) {
1218 host_key_verification_error(host)
1219 } else if stderr.contains("No such file or directory") {
1220 format!("Remote path not found: {}", expanded_path)
1221 } else if stderr.contains("Permission denied") {
1222 format!("Permission denied: {}", stderr.trim())
1223 } else {
1224 format!("wsl rsync failed: {}", stderr.trim())
1225 };
1226
1227 tracing::warn!(
1228 host = %host,
1229 remote_path = %expanded_path,
1230 error = %error_msg,
1231 "wsl rsync failed"
1232 );
1233
1234 return PathSyncResult {
1235 remote_path: remote_path.to_string(),
1236 local_path,
1237 success: false,
1238 error: Some(error_msg),
1239 duration_ms,
1240 ..Default::default()
1241 };
1242 }
1243
1244 let stats = parse_rsync_stats(&stdout);
1245
1246 tracing::info!(
1247 host = %host,
1248 remote_path = %expanded_path,
1249 files = stats.files_transferred,
1250 bytes = stats.bytes_transferred,
1251 duration_ms,
1252 "wsl rsync completed"
1253 );
1254
1255 PathSyncResult {
1256 remote_path: remote_path.to_string(),
1257 local_path,
1258 files_transferred: stats.files_transferred,
1259 bytes_transferred: stats.bytes_transferred,
1260 success: true,
1261 error: None,
1262 duration_ms,
1263 }
1264 }
1265
1266 fn sync_path_scp(
1276 &self,
1277 host: &str,
1278 remote_path: &str,
1279 dest_dir: &Path,
1280 remote_home: Option<&str>,
1281 ) -> PathSyncResult {
1282 let start = Instant::now();
1283
1284 if remote_path.starts_with('~') && remote_home.is_none() {
1285 let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
1286 return PathSyncResult {
1287 remote_path: remote_path.to_string(),
1288 local_path,
1289 success: false,
1290 error: Some(
1291 "Cannot expand '~' in remote path; failed to determine remote home directory"
1292 .to_string(),
1293 ),
1294 duration_ms: start.elapsed().as_millis() as u64,
1295 ..Default::default()
1296 };
1297 }
1298
1299 let expanded_path = Self::expand_tilde_with_home(remote_path, remote_home);
1300 let safe_name = path_to_safe_dirname(remote_path);
1301 let local_path = dest_dir.join(&safe_name);
1302
1303 if let Err(e) = prepare_local_sync_container(dest_dir, &local_path) {
1304 return PathSyncResult {
1305 remote_path: remote_path.to_string(),
1306 local_path,
1307 success: false,
1308 error: Some(e),
1309 duration_ms: start.elapsed().as_millis() as u64,
1310 ..Default::default()
1311 };
1312 }
1313
1314 let connect_timeout = self.connection_timeout.to_string();
1317 let find_command = remote_find_regular_files_command(&expanded_path);
1318
1319 tracing::debug!(
1320 host = %host,
1321 remote_path = %expanded_path,
1322 local_path = %local_path.display(),
1323 "listing regular files for scp sync"
1324 );
1325
1326 let timeout_secs = self.connection_timeout.max(1);
1327 let mut cmd = Command::new("ssh");
1328 cmd.args(strict_ssh_cli_tokens(timeout_secs))
1329 .arg("--")
1330 .arg(host)
1331 .arg(&find_command)
1332 .stdout(Stdio::piped())
1333 .stderr(Stdio::piped());
1334 configure_child_process_group(&mut cmd);
1335
1336 let output = match cmd.spawn().and_then(|child| {
1337 wait_for_child_output_with_timeout(child, Duration::from_secs(timeout_secs))
1338 }) {
1339 Ok(Some(o)) => o,
1340 Ok(None) => {
1341 return PathSyncResult {
1342 remote_path: remote_path.to_string(),
1343 local_path,
1344 success: false,
1345 error: Some(format!(
1346 "SSH file listing timed out after {timeout_secs} seconds"
1347 )),
1348 duration_ms: start.elapsed().as_millis() as u64,
1349 ..Default::default()
1350 };
1351 }
1352 Err(e) => {
1353 return PathSyncResult {
1354 remote_path: remote_path.to_string(),
1355 local_path,
1356 success: false,
1357 error: Some(format!("Failed to execute ssh file listing: {}", e)),
1358 duration_ms: start.elapsed().as_millis() as u64,
1359 ..Default::default()
1360 };
1361 }
1362 };
1363
1364 let stderr = String::from_utf8_lossy(&output.stderr);
1365 if !output.status.success() {
1366 let error_msg = if stderr.contains("Connection refused")
1367 || stderr.contains("Connection timed out")
1368 {
1369 format!("SSH connection failed: {}", stderr.trim())
1370 } else if is_host_key_verification_failure(&stderr) {
1371 host_key_verification_error(host)
1372 } else if stderr.contains("No such file or directory") {
1373 format!("Remote path not found: {}", expanded_path)
1374 } else if stderr.contains("Permission denied") {
1375 format!("Permission denied: {}", stderr.trim())
1376 } else {
1377 format!("Remote file listing failed: {}", stderr.trim())
1378 };
1379
1380 tracing::warn!(
1381 host = %host,
1382 remote_path = %expanded_path,
1383 error = %error_msg,
1384 "scp file listing failed"
1385 );
1386
1387 return PathSyncResult {
1388 remote_path: remote_path.to_string(),
1389 local_path,
1390 success: false,
1391 error: Some(error_msg),
1392 duration_ms: start.elapsed().as_millis() as u64,
1393 ..Default::default()
1394 };
1395 }
1396
1397 let remote_files = parse_null_terminated_utf8_paths(&output.stdout);
1398 let remote_root = Path::new(&expanded_path);
1399 let leaf_name = Path::new(remote_path)
1400 .file_name()
1401 .and_then(|n| n.to_str())
1402 .unwrap_or("remote");
1403 let mut files_transferred = 0u64;
1404 let mut bytes_transferred = 0u64;
1405
1406 for remote_file in remote_files {
1407 let remote_file_path = Path::new(&remote_file);
1408 let Some(local_file) = remote_file_to_safe_local_path(
1409 remote_root,
1410 remote_file_path,
1411 &local_path,
1412 leaf_name,
1413 ) else {
1414 tracing::warn!(
1415 remote_path = %remote_file,
1416 root = %expanded_path,
1417 "skipping scp file outside listed root"
1418 );
1419 continue;
1420 };
1421
1422 if let Err(e) = reject_local_symlink_below_root(&local_path, &local_file) {
1423 return PathSyncResult {
1424 remote_path: remote_path.to_string(),
1425 local_path,
1426 success: false,
1427 error: Some(e),
1428 duration_ms: start.elapsed().as_millis() as u64,
1429 ..Default::default()
1430 };
1431 }
1432
1433 if let Some(parent) = local_file.parent() {
1434 if let Err(e) = std::fs::create_dir_all(parent) {
1435 return PathSyncResult {
1436 remote_path: remote_path.to_string(),
1437 local_path,
1438 success: false,
1439 error: Some(format!("Failed to create {}: {}", parent.display(), e)),
1440 duration_ms: start.elapsed().as_millis() as u64,
1441 ..Default::default()
1442 };
1443 }
1444
1445 if let Err(e) = reject_local_symlink_below_root(&local_path, parent) {
1446 return PathSyncResult {
1447 remote_path: remote_path.to_string(),
1448 local_path,
1449 success: false,
1450 error: Some(e),
1451 duration_ms: start.elapsed().as_millis() as u64,
1452 ..Default::default()
1453 };
1454 }
1455 }
1456
1457 if let Err(e) = reject_local_symlink_below_root(&local_path, &local_file) {
1458 return PathSyncResult {
1459 remote_path: remote_path.to_string(),
1460 local_path,
1461 success: false,
1462 error: Some(e),
1463 duration_ms: start.elapsed().as_millis() as u64,
1464 ..Default::default()
1465 };
1466 }
1467
1468 let temp_path =
1469 unique_atomic_sidecar_path(&local_file, "download", "cass-sync-scp-download");
1470 let Some(temp_path_str) = temp_path.to_str() else {
1471 return PathSyncResult {
1472 remote_path: remote_path.to_string(),
1473 local_path,
1474 success: false,
1475 error: Some("Local path contains invalid UTF-8".to_string()),
1476 duration_ms: start.elapsed().as_millis() as u64,
1477 ..Default::default()
1478 };
1479 };
1480 if let Err(e) = std::fs::OpenOptions::new()
1481 .write(true)
1482 .create_new(true)
1483 .open(&temp_path)
1484 .and_then(|file| file.sync_all())
1485 {
1486 return PathSyncResult {
1487 remote_path: remote_path.to_string(),
1488 local_path,
1489 success: false,
1490 error: Some(format!("Failed to create {}: {}", temp_path.display(), e)),
1491 duration_ms: start.elapsed().as_millis() as u64,
1492 ..Default::default()
1493 };
1494 }
1495
1496 let remote_spec = remote_spec_for_scp(host, &remote_file);
1497 let mut cmd = Command::new("scp");
1498 cmd.args([
1499 "-B",
1500 "-o",
1501 &format!("ConnectTimeout={}", connect_timeout),
1502 "-o",
1503 "ServerAliveInterval=15",
1504 "-o",
1505 "ServerAliveCountMax=3",
1506 "-o",
1507 "StrictHostKeyChecking=yes",
1508 "--",
1509 &remote_spec,
1510 temp_path_str,
1511 ]);
1512
1513 let output = match cmd.output() {
1514 Ok(o) => o,
1515 Err(e) => {
1516 return PathSyncResult {
1517 remote_path: remote_path.to_string(),
1518 local_path,
1519 success: false,
1520 error: Some(format!("Failed to execute scp: {}", e)),
1521 duration_ms: start.elapsed().as_millis() as u64,
1522 ..Default::default()
1523 };
1524 }
1525 };
1526
1527 if !output.status.success() {
1528 let _ = std::fs::remove_file(&temp_path);
1529 let stderr = String::from_utf8_lossy(&output.stderr);
1530 let error_msg = if is_host_key_verification_failure(&stderr) {
1531 host_key_verification_error(host)
1532 } else if stderr.contains("Permission denied") {
1533 format!("Permission denied: {}", stderr.trim())
1534 } else {
1535 format!("scp failed: {}", stderr.trim())
1536 };
1537
1538 tracing::warn!(
1539 host = %host,
1540 remote_path = %remote_file,
1541 error = %error_msg,
1542 "scp file transfer failed"
1543 );
1544
1545 return PathSyncResult {
1546 remote_path: remote_path.to_string(),
1547 local_path,
1548 success: false,
1549 error: Some(error_msg),
1550 duration_ms: start.elapsed().as_millis() as u64,
1551 ..Default::default()
1552 };
1553 }
1554
1555 files_transferred += 1;
1556 if let Err(e) = sync_file_path(&temp_path) {
1557 return PathSyncResult {
1558 remote_path: remote_path.to_string(),
1559 local_path,
1560 success: false,
1561 error: Some(format!("Failed to sync {}: {}", temp_path.display(), e)),
1562 duration_ms: start.elapsed().as_millis() as u64,
1563 ..Default::default()
1564 };
1565 }
1566 if let Ok(metadata) = std::fs::metadata(&temp_path) {
1567 bytes_transferred = bytes_transferred.saturating_add(metadata.len());
1568 }
1569 if let Err(e) = replace_file_from_temp(&temp_path, &local_file) {
1570 return PathSyncResult {
1571 remote_path: remote_path.to_string(),
1572 local_path,
1573 success: false,
1574 error: Some(format!(
1575 "Failed to publish {} to {}: {}",
1576 temp_path.display(),
1577 local_file.display(),
1578 e
1579 )),
1580 duration_ms: start.elapsed().as_millis() as u64,
1581 ..Default::default()
1582 };
1583 }
1584 }
1585
1586 let duration_ms = start.elapsed().as_millis() as u64;
1587
1588 tracing::info!(
1589 host = %host,
1590 remote_path = %expanded_path,
1591 files = files_transferred,
1592 bytes = bytes_transferred,
1593 duration_ms,
1594 "scp sync completed"
1595 );
1596
1597 PathSyncResult {
1598 remote_path: remote_path.to_string(),
1599 local_path,
1600 files_transferred,
1601 bytes_transferred,
1602 success: true,
1603 error: None,
1604 duration_ms,
1605 }
1606 }
1607
1608 fn sync_path_sftp(
1613 &self,
1614 host: &str,
1615 remote_path: &str,
1616 dest_dir: &Path,
1617 remote_home: Option<&str>,
1618 ) -> PathSyncResult {
1619 let start = Instant::now();
1620 if remote_path.starts_with('~') && remote_home.is_none() {
1621 let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
1622 return PathSyncResult {
1623 remote_path: remote_path.to_string(),
1624 local_path,
1625 success: false,
1626 error: Some(
1627 "Cannot expand '~' in remote path; failed to determine remote home directory"
1628 .to_string(),
1629 ),
1630 duration_ms: start.elapsed().as_millis() as u64,
1631 ..Default::default()
1632 };
1633 }
1634 let expanded_path = Self::expand_tilde_with_home(remote_path, remote_home);
1635 let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
1637
1638 if let Err(e) = prepare_local_sync_container(dest_dir, &local_path) {
1640 return PathSyncResult {
1641 remote_path: remote_path.to_string(),
1642 local_path,
1643 success: false,
1644 error: Some(e),
1645 duration_ms: start.elapsed().as_millis() as u64,
1646 ..Default::default()
1647 };
1648 }
1649
1650 let (ssh_user, ssh_host) = parse_ssh_host(host);
1652
1653 let ssh_config = discover_ssh_hosts();
1656 let host_config = ssh_config.iter().find(|h| h.name == ssh_host).or_else(|| {
1657 ssh_config
1658 .iter()
1659 .find(|h| h.hostname.as_deref() == Some(ssh_host))
1660 });
1661
1662 let hostname = host_config
1664 .and_then(|h| h.hostname.as_deref())
1665 .unwrap_or(ssh_host);
1666 let port = host_config.and_then(|h| h.port).unwrap_or(22);
1667 let username = match first_nonblank_username([
1669 ssh_user,
1670 host_config.and_then(|h| h.user.as_deref()),
1671 ])
1672 .or_else(|| env_username("USER"))
1673 .or_else(|| env_username("LOGNAME"))
1674 {
1675 Some(user) => user,
1676 None => {
1677 return PathSyncResult {
1678 remote_path: remote_path.to_string(),
1679 local_path,
1680 success: false,
1681 error: Some(format!(
1682 "Unable to determine SSH username for host '{}' (missing/blank user@host, SSH config user, USER, and LOGNAME)",
1683 host
1684 )),
1685 duration_ms: start.elapsed().as_millis() as u64,
1686 ..Default::default()
1687 };
1688 }
1689 };
1690 let identity_file = host_config.and_then(|h| h.identity_file.as_deref());
1691
1692 tracing::debug!(
1693 hostname = %hostname,
1694 port,
1695 username = %username,
1696 identity_file = ?identity_file,
1697 remote_path = %expanded_path,
1698 "SFTP connection parameters"
1699 );
1700
1701 let conn_timeout = std::time::Duration::from_secs(self.connection_timeout);
1703 let addr = format!("{}:{}", hostname, port);
1704 let sock_addr: std::net::SocketAddr = match addr.parse().or_else(|_| {
1705 use std::net::ToSocketAddrs;
1707 (hostname, port)
1708 .to_socket_addrs()
1709 .ok()
1710 .and_then(|mut addrs| addrs.next())
1711 .ok_or(std::io::Error::new(
1712 std::io::ErrorKind::InvalidInput,
1713 "cannot resolve hostname",
1714 ))
1715 }) {
1716 Ok(a) => a,
1717 Err(e) => {
1718 return PathSyncResult {
1719 remote_path: remote_path.to_string(),
1720 local_path,
1721 success: false,
1722 error: Some(format!("DNS resolution failed for {hostname}:{port}: {e}")),
1723 duration_ms: start.elapsed().as_millis() as u64,
1724 ..Default::default()
1725 };
1726 }
1727 };
1728 let tcp = match TcpStream::connect_timeout(&sock_addr, conn_timeout) {
1729 Ok(t) => t,
1730 Err(e) => {
1731 return PathSyncResult {
1732 remote_path: remote_path.to_string(),
1733 local_path,
1734 success: false,
1735 error: Some(format!(
1736 "TCP connection failed to {}:{}: {}",
1737 hostname, port, e
1738 )),
1739 duration_ms: start.elapsed().as_millis() as u64,
1740 ..Default::default()
1741 };
1742 }
1743 };
1744
1745 let timeout = std::time::Duration::from_secs(self.transfer_timeout);
1747 if let Err(e) = tcp.set_read_timeout(Some(timeout)) {
1748 tracing::warn!("Failed to set TCP read timeout: {}", e);
1749 }
1750 if let Err(e) = tcp.set_write_timeout(Some(timeout)) {
1751 tracing::warn!("Failed to set TCP write timeout: {}", e);
1752 }
1753 let tcp_shutdown = tcp.try_clone().ok();
1754
1755 let mut session = match Session::new() {
1757 Ok(s) => s,
1758 Err(e) => {
1759 let _ = tcp.shutdown(Shutdown::Both);
1760 return PathSyncResult {
1761 remote_path: remote_path.to_string(),
1762 local_path,
1763 success: false,
1764 error: Some(format!("Failed to create SSH session: {}", e)),
1765 duration_ms: start.elapsed().as_millis() as u64,
1766 ..Default::default()
1767 };
1768 }
1769 };
1770
1771 session.set_tcp_stream(tcp);
1772 let close_connections = |session: &mut Session, reason: &str| {
1773 let _ = session.disconnect(None, reason, None);
1774 if let Some(stream) = tcp_shutdown.as_ref() {
1775 let _ = stream.shutdown(Shutdown::Both);
1776 }
1777 };
1778
1779 if let Err(e) = session.handshake() {
1780 close_connections(&mut session, "handshake failed");
1781 return PathSyncResult {
1782 remote_path: remote_path.to_string(),
1783 local_path,
1784 success: false,
1785 error: Some(format!("SSH handshake failed: {}", e)),
1786 duration_ms: start.elapsed().as_millis() as u64,
1787 ..Default::default()
1788 };
1789 }
1790
1791 if let Err(e) = self.authenticate_ssh(&session, &username, identity_file) {
1793 close_connections(&mut session, "authentication failed");
1794 return PathSyncResult {
1795 remote_path: remote_path.to_string(),
1796 local_path,
1797 success: false,
1798 error: Some(format!("SSH authentication failed: {}", e)),
1799 duration_ms: start.elapsed().as_millis() as u64,
1800 ..Default::default()
1801 };
1802 }
1803
1804 let sftp = match session.sftp() {
1806 Ok(s) => s,
1807 Err(e) => {
1808 close_connections(&mut session, "sftp open failed");
1809 return PathSyncResult {
1810 remote_path: remote_path.to_string(),
1811 local_path,
1812 success: false,
1813 error: Some(format!("Failed to open SFTP session: {}", e)),
1814 duration_ms: start.elapsed().as_millis() as u64,
1815 ..Default::default()
1816 };
1817 }
1818 };
1819
1820 tracing::info!(
1821 host = %host,
1822 remote_path = %expanded_path,
1823 local_path = %local_path.display(),
1824 "starting SFTP sync"
1825 );
1826
1827 let mut files_transferred = 0u64;
1829 let mut bytes_transferred = 0u64;
1830
1831 let leaf_name = Path::new(remote_path)
1834 .file_name()
1835 .and_then(|n| n.to_str())
1836 .unwrap_or("remote");
1837 let target_local_path = local_path.join(leaf_name);
1838
1839 if let Err(e) = self.sftp_download_recursive(
1840 &sftp,
1841 Path::new(&expanded_path),
1842 &target_local_path,
1843 &local_path,
1844 &mut files_transferred,
1845 &mut bytes_transferred,
1846 ) {
1847 close_connections(&mut session, "sftp download failed");
1848 return PathSyncResult {
1849 remote_path: remote_path.to_string(),
1850 local_path,
1851 files_transferred,
1852 bytes_transferred,
1853 success: false,
1854 error: Some(format!("SFTP download failed: {}", e)),
1855 duration_ms: start.elapsed().as_millis() as u64,
1856 };
1857 }
1858
1859 let duration_ms = start.elapsed().as_millis() as u64;
1860
1861 tracing::info!(
1862 host = %host,
1863 remote_path = %expanded_path,
1864 files = files_transferred,
1865 bytes = bytes_transferred,
1866 duration_ms,
1867 "SFTP sync completed"
1868 );
1869
1870 close_connections(&mut session, "sync complete");
1871 PathSyncResult {
1872 remote_path: remote_path.to_string(),
1873 local_path,
1874 files_transferred,
1875 bytes_transferred,
1876 success: true,
1877 error: None,
1878 duration_ms,
1879 }
1880 }
1881
1882 fn authenticate_ssh(
1884 &self,
1885 session: &Session,
1886 username: &str,
1887 identity_file: Option<&str>,
1888 ) -> Result<(), String> {
1889 if let Ok(mut agent) = session.agent()
1891 && agent.connect().is_ok()
1892 && agent.list_identities().is_ok()
1893 {
1894 for identity in agent.identities().unwrap_or_default() {
1895 if agent.userauth(username, &identity).is_ok() && session.authenticated() {
1896 tracing::debug!("Authenticated via SSH agent");
1897 return Ok(());
1898 }
1899 }
1900 }
1901
1902 if let Some(key_path) = identity_file {
1904 let key_path_expanded = expand_tilde_local(key_path);
1905 let key_path_buf = Path::new(&key_path_expanded);
1906
1907 if key_path_buf.exists()
1908 && session
1909 .userauth_pubkey_file(username, None, key_path_buf, None)
1910 .is_ok()
1911 && session.authenticated()
1912 {
1913 tracing::debug!(key = %key_path_buf.display(), "Authenticated via key file");
1914 return Ok(());
1915 }
1916 }
1917
1918 if let Some(home) = dirs::home_dir() {
1920 for key_name in ["id_ed25519", "id_rsa", "id_ecdsa"] {
1921 let key_path = home.join(".ssh").join(key_name);
1922 if key_path.exists()
1923 && session
1924 .userauth_pubkey_file(username, None, &key_path, None)
1925 .is_ok()
1926 && session.authenticated()
1927 {
1928 tracing::debug!(key = %key_path.display(), "Authenticated via default key");
1929 return Ok(());
1930 }
1931 }
1932 }
1933
1934 Err(format!(
1935 "No valid authentication method found for user '{}'",
1936 username
1937 ))
1938 }
1939
1940 fn sftp_download_recursive(
1942 &self,
1943 sftp: &Sftp,
1944 remote_path: &Path,
1945 local_path: &Path,
1946 local_root: &Path,
1947 files_transferred: &mut u64,
1948 bytes_transferred: &mut u64,
1949 ) -> Result<(), String> {
1950 let stat = sftp
1953 .lstat(remote_path)
1954 .map_err(|e| format!("Failed to lstat {}: {}", remote_path.display(), e))?;
1955
1956 if sftp_file_stat_is_symlink(&stat) {
1957 tracing::warn!(
1958 path = %remote_path.display(),
1959 "Skipping remote symlink during SFTP sync"
1960 );
1961 return Ok(());
1962 }
1963
1964 if stat.is_dir() {
1965 reject_local_symlink_below_root(local_root, local_path)?;
1967 std::fs::create_dir_all(local_path)
1968 .map_err(|e| format!("Failed to create {}: {}", local_path.display(), e))?;
1969 reject_local_symlink_below_root(local_root, local_path)?;
1970
1971 let entries = sftp
1973 .readdir(remote_path)
1974 .map_err(|e| format!("Failed to list {}: {}", remote_path.display(), e))?;
1975
1976 for (entry_path, _entry_stat) in entries {
1977 let Some(file_name) = sftp_entry_file_name(&entry_path, remote_path) else {
1978 continue;
1979 };
1980
1981 let entry_stat = sftp
1982 .lstat(&entry_path)
1983 .map_err(|e| format!("Failed to lstat {}: {}", entry_path.display(), e))?;
1984 if sftp_file_stat_is_symlink(&entry_stat) {
1985 tracing::warn!(
1986 path = %entry_path.display(),
1987 "Skipping remote symlink during SFTP sync"
1988 );
1989 continue;
1990 }
1991
1992 let local_entry = local_path.join(file_name);
1993
1994 if entry_stat.is_dir() {
1995 self.sftp_download_recursive(
1997 sftp,
1998 &entry_path,
1999 &local_entry,
2000 local_root,
2001 files_transferred,
2002 bytes_transferred,
2003 )?;
2004 } else if entry_stat.is_file() {
2005 if self.sftp_download_file(
2007 sftp,
2008 &entry_path,
2009 &local_entry,
2010 local_root,
2011 bytes_transferred,
2012 )? {
2013 *files_transferred += 1;
2014 }
2015 }
2016 }
2018 } else if stat.is_file() {
2019 if let Some(parent) = local_path.parent() {
2021 reject_local_symlink_below_root(local_root, parent)?;
2022 std::fs::create_dir_all(parent).map_err(|e| {
2023 format!("Failed to create local dir {}: {}", parent.display(), e)
2024 })?;
2025 reject_local_symlink_below_root(local_root, parent)?;
2026 }
2027
2028 if self.sftp_download_file(
2029 sftp,
2030 remote_path,
2031 local_path,
2032 local_root,
2033 bytes_transferred,
2034 )? {
2035 *files_transferred += 1;
2036 }
2037 } else {
2038 tracing::warn!(
2040 path = %remote_path.display(),
2041 "Skipping remote path: not a regular file or directory"
2042 );
2043 }
2044
2045 Ok(())
2046 }
2047
2048 fn sftp_download_file(
2050 &self,
2051 sftp: &Sftp,
2052 remote_path: &Path,
2053 local_path: &Path,
2054 local_root: &Path,
2055 bytes_transferred: &mut u64,
2056 ) -> Result<bool, String> {
2057 let stat = sftp
2058 .lstat(remote_path)
2059 .map_err(|e| format!("Failed to lstat {}: {}", remote_path.display(), e))?;
2060 if sftp_file_stat_is_symlink(&stat) {
2061 tracing::warn!(
2062 path = %remote_path.display(),
2063 "Skipping remote symlink during SFTP sync"
2064 );
2065 return Ok(false);
2066 }
2067 if !stat.is_file() {
2068 tracing::warn!(
2069 path = %remote_path.display(),
2070 "Skipping remote path: not a regular file"
2071 );
2072 return Ok(false);
2073 }
2074
2075 let mut remote_file = sftp
2076 .open(remote_path)
2077 .map_err(|e| format!("Failed to open {}: {}", remote_path.display(), e))?;
2078
2079 reject_local_symlink_below_root(local_root, local_path)?;
2080
2081 let temp_path = unique_atomic_sidecar_path(local_path, "download", "cass-sync-download");
2082 let mut local_file = std::fs::OpenOptions::new()
2083 .write(true)
2084 .create_new(true)
2085 .open(&temp_path)
2086 .map_err(|e| format!("Failed to create {}: {}", temp_path.display(), e))?;
2087
2088 let mut buffer = [0u8; 32768]; loop {
2091 let bytes_read = remote_file
2092 .read(&mut buffer)
2093 .map_err(|e| format!("Failed to read {}: {}", remote_path.display(), e))?;
2094
2095 if bytes_read == 0 {
2096 break;
2097 }
2098
2099 local_file
2100 .write_all(&buffer[..bytes_read])
2101 .map_err(|e| format!("Failed to write {}: {}", local_path.display(), e))?;
2102
2103 *bytes_transferred += bytes_read as u64;
2104 }
2105
2106 tracing::trace!(
2107 remote = %remote_path.display(),
2108 local = %local_path.display(),
2109 "downloaded file"
2110 );
2111
2112 local_file
2113 .sync_all()
2114 .map_err(|e| format!("Failed to sync {}: {}", temp_path.display(), e))?;
2115 drop(local_file);
2116 replace_file_from_temp(&temp_path, local_path).map_err(|e| {
2117 format!(
2118 "Failed to publish {} to {}: {}",
2119 temp_path.display(),
2120 local_path.display(),
2121 e
2122 )
2123 })?;
2124
2125 Ok(true)
2126 }
2127}
2128
2129fn sftp_entry_file_name<'a>(entry_path: &'a Path, parent_path: &Path) -> Option<&'a str> {
2131 let Some(file_name) = entry_path.file_name() else {
2132 tracing::warn!(
2133 parent = %parent_path.display(),
2134 entry = ?entry_path,
2135 "Skipping SFTP entry without a file name"
2136 );
2137 return None;
2138 };
2139
2140 let Some(file_name) = file_name.to_str() else {
2141 tracing::warn!(
2142 parent = %parent_path.display(),
2143 entry = ?entry_path,
2144 "Skipping SFTP entry with non-UTF-8 file name"
2145 );
2146 return None;
2147 };
2148
2149 if file_name.is_empty() {
2150 tracing::warn!(
2151 parent = %parent_path.display(),
2152 entry = ?entry_path,
2153 "Skipping SFTP entry with empty file name"
2154 );
2155 return None;
2156 }
2157
2158 if file_name == "." || file_name == ".." {
2159 return None;
2160 }
2161
2162 Some(file_name)
2163}
2164
2165fn which_scp_exists() -> bool {
2170 std::env::var_os("PATH")
2171 .map(|path_var| {
2172 std::env::split_paths(&path_var).any(|dir| {
2173 let candidate = dir.join(if cfg!(target_os = "windows") {
2174 "scp.exe"
2175 } else {
2176 "scp"
2177 });
2178 candidate.is_file()
2179 })
2180 })
2181 .unwrap_or(false)
2182}
2183
2184fn windows_path_to_wsl(path: &str) -> String {
2191 if path.len() >= 3 {
2193 let bytes = path.as_bytes();
2194 if bytes[1] == b':' && (bytes[2] == b'\\' || bytes[2] == b'/') {
2195 let drive = (bytes[0] as char).to_lowercase().next().unwrap_or('c');
2196 let rest = path[3..].replace('\\', "/");
2197 return format!("/mnt/{}/{}", drive, rest);
2198 }
2199 }
2200 path.to_string()
2201}
2202
2203fn parse_ssh_host(host: &str) -> (Option<&str>, &str) {
2209 if let Some(at_pos) = host.find('@') {
2210 let user = &host[..at_pos];
2211 let hostname = &host[at_pos + 1..];
2212 (Some(user), hostname)
2213 } else {
2214 (None, host)
2215 }
2216}
2217
2218fn first_nonblank_username<'a>(
2219 candidates: impl IntoIterator<Item = Option<&'a str>>,
2220) -> Option<String> {
2221 candidates.into_iter().find_map(|candidate| {
2222 let trimmed = candidate?.trim();
2223 if trimmed.is_empty() {
2224 None
2225 } else {
2226 Some(trimmed.to_string())
2227 }
2228 })
2229}
2230
2231fn env_username(key: &str) -> Option<String> {
2232 dotenvy::var(key)
2233 .ok()
2234 .and_then(|value| first_nonblank_username([Some(value.as_str())]))
2235}
2236
2237fn expand_tilde_local(path: &str) -> String {
2239 if let Some(stripped) = path.strip_prefix("~/")
2240 && let Some(home) = dirs::home_dir()
2241 {
2242 return format!("{}/{}", home.display(), stripped);
2243 } else if path == "~"
2244 && let Some(home) = dirs::home_dir()
2245 {
2246 return home.display().to_string();
2247 }
2248 path.to_string()
2249}
2250
2251pub fn path_to_safe_dirname(path: &str) -> String {
2260 use std::path::{Component, Path};
2261
2262 let path_obj = Path::new(path);
2263 let mut parts: Vec<&str> = Vec::new();
2264
2265 for component in path_obj.components() {
2266 match component {
2267 Component::Normal(name) => {
2268 if let Some(s) = name.to_str() {
2269 if !s.is_empty() && s != "." && s != "~" {
2271 parts.push(s);
2272 }
2273 }
2274 }
2275 Component::ParentDir
2277 | Component::CurDir
2278 | Component::RootDir
2279 | Component::Prefix(_) => {}
2280 }
2281 }
2282
2283 let cleaned = parts.join("_").replace([' ', '\\'], "_");
2284
2285 let hash = fnv1a_hash(path);
2287 let hash_suffix = format!("{:08x}", hash);
2288
2289 if cleaned.is_empty() {
2290 format!("root_{}", hash_suffix)
2291 } else {
2292 format!("{}_{}", cleaned, hash_suffix)
2293 }
2294}
2295
2296fn fnv1a_hash(text: &str) -> u64 {
2297 let mut hash: u64 = 0xcbf29ce484222325;
2298 for byte in text.bytes() {
2299 hash ^= u64::from(byte);
2300 hash = hash.wrapping_mul(0x100000001b3);
2301 }
2302 hash
2303}
2304
2305fn parse_rsync_stats(output: &str) -> RsyncStats {
2307 let mut stats = RsyncStats::default();
2308
2309 for line in output.lines() {
2310 let line = line.trim();
2311
2312 if line.starts_with("Number of regular files transferred:")
2314 && let Some(num_str) = line.split(':').nth(1)
2315 {
2316 stats.files_transferred = num_str.trim().replace(',', "").parse().unwrap_or(0);
2317 }
2318
2319 if line.starts_with("Total transferred file size:")
2321 && let Some(size_part) = line.split(':').nth(1)
2322 {
2323 let size_str = size_part
2325 .split_whitespace()
2326 .next()
2327 .unwrap_or("0")
2328 .replace(',', "");
2329 stats.bytes_transferred = size_str.parse().unwrap_or(0);
2330 }
2331 }
2332
2333 stats
2334}
2335
2336#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
2342#[serde(rename_all = "snake_case")]
2343pub enum SyncResult {
2344 Success,
2346 PartialFailure(String),
2348 Failed(String),
2350 #[default]
2352 Skipped,
2353}
2354
2355impl SyncResult {
2356 pub fn label(&self) -> &'static str {
2358 match self {
2359 Self::Success => "success",
2360 Self::PartialFailure(_) => "partial",
2361 Self::Failed(_) => "failed",
2362 Self::Skipped => "never",
2363 }
2364 }
2365
2366 pub fn error_message(&self) -> Option<&str> {
2368 match self {
2369 Self::PartialFailure(error) | Self::Failed(error) => Some(error.as_str()),
2370 Self::Success | Self::Skipped => None,
2371 }
2372 }
2373}
2374
2375#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
2377#[serde(rename_all = "snake_case")]
2378pub enum SourceSyncAction {
2379 Sync,
2381 Skip,
2383 Defer,
2385}
2386
2387impl SourceSyncAction {
2388 pub fn as_str(self) -> &'static str {
2389 match self {
2390 Self::Sync => "sync",
2391 Self::Skip => "skip",
2392 Self::Defer => "defer",
2393 }
2394 }
2395}
2396
2397#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
2399#[serde(rename_all = "snake_case")]
2400pub enum SourceHealthKind {
2401 NeverSynced,
2402 Healthy,
2403 Stale,
2404 HighLatency,
2405 Flapping,
2406 AuthFailed,
2407 BackingOff,
2408}
2409
2410impl SourceHealthKind {
2411 pub fn as_str(self) -> &'static str {
2412 match self {
2413 Self::NeverSynced => "never_synced",
2414 Self::Healthy => "healthy",
2415 Self::Stale => "stale",
2416 Self::HighLatency => "high_latency",
2417 Self::Flapping => "flapping",
2418 Self::AuthFailed => "auth_failed",
2419 Self::BackingOff => "backing_off",
2420 }
2421 }
2422}
2423
2424#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
2426pub struct SourceSyncDecision {
2427 pub action: SourceSyncAction,
2429 pub health: SourceHealthKind,
2431 pub health_score: u8,
2433 pub staleness_ms: Option<i64>,
2435 pub stale_value_score: u8,
2437 pub manual_override: bool,
2439 pub fallback_active: bool,
2441 pub next_eligible_sync_ms: Option<i64>,
2443 pub backoff_until_ms: Option<i64>,
2445 pub reasons: Vec<String>,
2447}
2448
2449impl SourceSyncDecision {
2450 fn evaluate(
2451 source: &SourceDefinition,
2452 info: Option<&SourceSyncInfo>,
2453 now_ms: i64,
2454 manual_override: bool,
2455 ) -> Self {
2456 let period_ms = sync_schedule_period_ms(source.sync_schedule);
2457 let next_eligible_sync_ms = info
2458 .and_then(|info| info.last_sync)
2459 .and_then(|last_sync| period_ms.map(|period| last_sync.saturating_add(period)));
2460 let backoff_until_ms = info.and_then(failure_backoff_until_ms);
2461 let staleness_ms = info.and_then(|info| {
2462 info.last_sync
2463 .map(|last_sync| now_ms.saturating_sub(last_sync).max(0))
2464 });
2465 let stale_value_score =
2466 stale_value_score_for_source(source.sync_schedule, staleness_ms, info);
2467 let mut reasons = Vec::new();
2468
2469 let health = match info {
2470 None => {
2471 reasons.push("no durable sync status exists for this source".to_string());
2472 SourceHealthKind::NeverSynced
2473 }
2474 Some(info) if info.last_sync.is_none() => {
2475 reasons.push("source has never completed or attempted a sync".to_string());
2476 SourceHealthKind::NeverSynced
2477 }
2478 Some(info) if sync_result_auth_failure(&info.last_result) => {
2479 reasons
2480 .push("last sync failed with an authentication or host-key error".to_string());
2481 SourceHealthKind::AuthFailed
2482 }
2483 Some(info) if matches!(info.last_result, SyncResult::PartialFailure(_)) => {
2484 reasons.push("last sync partially succeeded and partially failed".to_string());
2485 SourceHealthKind::Flapping
2486 }
2487 Some(info)
2488 if info.consecutive_failures > 0
2489 && backoff_until_ms.is_some_and(|until| until > now_ms) =>
2490 {
2491 reasons.push(format!(
2492 "{} consecutive failure(s) are inside retry backoff",
2493 info.consecutive_failures
2494 ));
2495 SourceHealthKind::BackingOff
2496 }
2497 Some(info) if matches!(info.last_result, SyncResult::Failed(_)) => {
2498 let error = info.last_result.error_message().unwrap_or("unknown error");
2499 reasons.push(format!(
2500 "last sync failed completely ({error}); local fallback remains active"
2501 ));
2502 SourceHealthKind::Flapping
2503 }
2504 Some(info) if info.duration_ms >= SOURCE_HIGH_LATENCY_MS => {
2505 reasons.push(format!(
2506 "last sync took {}ms, above {}ms high-latency guard",
2507 info.duration_ms, SOURCE_HIGH_LATENCY_MS
2508 ));
2509 SourceHealthKind::HighLatency
2510 }
2511 Some(info) if sync_schedule_due(info.last_sync, period_ms, now_ms) => {
2512 reasons.push("configured sync schedule is due".to_string());
2513 SourceHealthKind::Stale
2514 }
2515 Some(_) => SourceHealthKind::Healthy,
2516 };
2517
2518 let fallback_active = matches!(
2519 health,
2520 SourceHealthKind::AuthFailed
2521 | SourceHealthKind::BackingOff
2522 | SourceHealthKind::Flapping
2523 | SourceHealthKind::HighLatency
2524 );
2525
2526 let mut action = if manual_override {
2527 reasons.push("explicit sync command overrides automatic scheduling".to_string());
2528 SourceSyncAction::Sync
2529 } else {
2530 automatic_source_sync_action(source.sync_schedule, health, info, now_ms)
2531 };
2532
2533 if !manual_override && matches!(health, SourceHealthKind::AuthFailed) {
2534 action = SourceSyncAction::Defer;
2535 }
2536
2537 if !manual_override && matches!(source.sync_schedule, SyncSchedule::Manual) {
2538 reasons.push("sync_schedule=manual requires an explicit sync command".to_string());
2539 }
2540
2541 if !manual_override
2542 && matches!(action, SourceSyncAction::Skip)
2543 && let Some(next_ms) = next_eligible_sync_ms
2544 {
2545 reasons.push(format!(
2546 "next scheduled sync is eligible at unix_ms={next_ms}"
2547 ));
2548 }
2549
2550 if reasons.is_empty() {
2551 reasons.push("source is healthy and within schedule".to_string());
2552 }
2553
2554 Self {
2555 action,
2556 health,
2557 health_score: health_score_for_source(health),
2558 staleness_ms,
2559 stale_value_score,
2560 manual_override,
2561 fallback_active,
2562 next_eligible_sync_ms,
2563 backoff_until_ms,
2564 reasons,
2565 }
2566 }
2567}
2568
2569#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
2571pub struct SourceSyncInfo {
2572 pub last_sync: Option<i64>,
2574 pub last_result: SyncResult,
2576 pub files_synced: u64,
2578 pub bytes_transferred: u64,
2580 pub duration_ms: u64,
2582 #[serde(default)]
2584 pub consecutive_failures: u32,
2585}
2586
2587impl SourceSyncInfo {
2588 pub fn from_report(report: &SyncReport) -> Self {
2590 let last_result = report.sync_result();
2591 Self {
2592 last_sync: Some(current_unix_ms()),
2593 consecutive_failures: u32::from(!report.all_succeeded),
2594 last_result,
2595 files_synced: report.total_files(),
2596 bytes_transferred: report.total_bytes(),
2597 duration_ms: report.total_duration_ms,
2598 }
2599 }
2600}
2601
2602#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
2604pub struct SyncStatus {
2605 pub sources: std::collections::HashMap<String, SourceSyncInfo>,
2607}
2608
2609impl SyncStatus {
2610 pub fn load(data_dir: &Path) -> Result<Self, std::io::Error> {
2612 let path = Self::status_path(data_dir);
2613 match std::fs::read_to_string(&path) {
2614 Ok(content) => serde_json::from_str(&content)
2615 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
2616 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()),
2617 Err(e) => Err(e),
2618 }
2619 }
2620
2621 pub fn save(&self, data_dir: &Path) -> Result<(), std::io::Error> {
2626 let path = Self::status_path(data_dir);
2627 if let Some(parent) = path.parent() {
2628 std::fs::create_dir_all(parent)?;
2629 }
2630 let content = serde_json::to_string_pretty(self)?;
2631 let tmp_path = write_sync_status_temp_file(&path, content.as_bytes())?;
2632 replace_file_from_temp(&tmp_path, &path)
2633 }
2634
2635 pub fn update(&mut self, source_name: &str, report: &SyncReport) {
2637 let previous_failures = self
2638 .get(source_name)
2639 .map(|info| info.consecutive_failures)
2640 .unwrap_or_default();
2641 let mut info = SourceSyncInfo::from_report(report);
2642 if report.all_succeeded {
2643 info.consecutive_failures = 0;
2644 } else {
2645 info.consecutive_failures = previous_failures.saturating_add(1);
2646 }
2647 self.set_info(source_name, info);
2648 }
2649
2650 pub fn set_info(&mut self, source_name: &str, info: SourceSyncInfo) {
2652 self.sources.insert(source_name.to_string(), info);
2653 }
2654
2655 pub fn retain_sources<'a>(&mut self, source_names: impl IntoIterator<Item = &'a str>) -> bool {
2659 let allowed: std::collections::HashSet<&str> = source_names.into_iter().collect();
2660 let previous_len = self.sources.len();
2661 self.sources
2662 .retain(|source_name, _| allowed.contains(source_name.as_str()));
2663 self.sources.len() != previous_len
2664 }
2665
2666 pub fn get(&self, source_name: &str) -> Option<&SourceSyncInfo> {
2668 self.sources.get(source_name)
2669 }
2670
2671 pub fn decision_for_source_at(
2673 &self,
2674 source: &SourceDefinition,
2675 now_ms: i64,
2676 manual_override: bool,
2677 ) -> SourceSyncDecision {
2678 SourceSyncDecision::evaluate(source, self.get(&source.name), now_ms, manual_override)
2679 }
2680
2681 fn status_path(data_dir: &Path) -> PathBuf {
2683 data_dir.join("sync_status.json")
2684 }
2685}
2686
2687const SOURCE_HIGH_LATENCY_MS: u64 = 60_000;
2688const SOURCE_FAILURE_BACKOFF_BASE_MS: i64 = 5 * 60 * 1000;
2689const SOURCE_FAILURE_BACKOFF_MAX_MS: i64 = 60 * 60 * 1000;
2690
2691pub(crate) fn current_unix_ms() -> i64 {
2692 let now = std::time::SystemTime::now()
2693 .duration_since(std::time::UNIX_EPOCH)
2694 .unwrap_or_default()
2695 .as_millis();
2696 i64::try_from(now).unwrap_or(i64::MAX)
2697}
2698
2699fn sync_schedule_period_ms(schedule: SyncSchedule) -> Option<i64> {
2700 match schedule {
2701 SyncSchedule::Manual => None,
2702 SyncSchedule::Hourly => Some(60 * 60 * 1000),
2703 SyncSchedule::Daily => Some(24 * 60 * 60 * 1000),
2704 }
2705}
2706
2707fn sync_schedule_due(last_sync: Option<i64>, period_ms: Option<i64>, now_ms: i64) -> bool {
2708 match (last_sync, period_ms) {
2709 (None, _) => true,
2710 (Some(_), None) => false,
2711 (Some(last_sync), Some(period_ms)) => last_sync.saturating_add(period_ms) <= now_ms,
2712 }
2713}
2714
2715fn automatic_source_sync_action(
2716 schedule: SyncSchedule,
2717 health: SourceHealthKind,
2718 info: Option<&SourceSyncInfo>,
2719 now_ms: i64,
2720) -> SourceSyncAction {
2721 match health {
2722 SourceHealthKind::AuthFailed | SourceHealthKind::BackingOff => SourceSyncAction::Defer,
2723 _ if matches!(schedule, SyncSchedule::Manual) => SourceSyncAction::Skip,
2724 SourceHealthKind::NeverSynced | SourceHealthKind::Stale => SourceSyncAction::Sync,
2725 SourceHealthKind::Flapping | SourceHealthKind::HighLatency => {
2726 if sync_schedule_due(
2727 info.and_then(|info| info.last_sync),
2728 sync_schedule_period_ms(schedule),
2729 now_ms,
2730 ) {
2731 SourceSyncAction::Sync
2732 } else {
2733 SourceSyncAction::Skip
2734 }
2735 }
2736 SourceHealthKind::Healthy => {
2737 if sync_schedule_due(
2738 info.and_then(|info| info.last_sync),
2739 sync_schedule_period_ms(schedule),
2740 now_ms,
2741 ) {
2742 SourceSyncAction::Sync
2743 } else {
2744 SourceSyncAction::Skip
2745 }
2746 }
2747 }
2748}
2749
2750fn health_score_for_source(health: SourceHealthKind) -> u8 {
2751 match health {
2752 SourceHealthKind::Healthy => 100,
2753 SourceHealthKind::Stale => 75,
2754 SourceHealthKind::NeverSynced => 65,
2755 SourceHealthKind::HighLatency => 55,
2756 SourceHealthKind::Flapping => 40,
2757 SourceHealthKind::BackingOff => 25,
2758 SourceHealthKind::AuthFailed => 10,
2759 }
2760}
2761
2762fn stale_value_score_for_source(
2763 schedule: SyncSchedule,
2764 staleness_ms: Option<i64>,
2765 info: Option<&SourceSyncInfo>,
2766) -> u8 {
2767 let Some(info) = info else {
2768 return 100;
2769 };
2770 if info.last_sync.is_none() {
2771 return 100;
2772 }
2773
2774 let Some(staleness_ms) = staleness_ms else {
2775 return 100;
2776 };
2777
2778 let Some(period_ms) = sync_schedule_period_ms(schedule) else {
2779 return 0;
2780 };
2781
2782 let score = staleness_ms.saturating_mul(100) / period_ms.max(1);
2783 u8::try_from(score.clamp(0, 100)).unwrap_or(100)
2784}
2785
2786fn failure_backoff_until_ms(info: &SourceSyncInfo) -> Option<i64> {
2787 if info.consecutive_failures == 0 {
2788 return None;
2789 }
2790 let last_sync = info.last_sync?;
2791 let exponent = info.consecutive_failures.saturating_sub(1).min(4);
2792 let multiplier = 1_i64.checked_shl(exponent).unwrap_or(16);
2793 let backoff_ms = SOURCE_FAILURE_BACKOFF_BASE_MS
2794 .saturating_mul(multiplier)
2795 .min(SOURCE_FAILURE_BACKOFF_MAX_MS);
2796 Some(last_sync.saturating_add(backoff_ms))
2797}
2798
2799fn sync_result_auth_failure(result: &SyncResult) -> bool {
2800 let Some(error) = result.error_message() else {
2801 return false;
2802 };
2803 let error = error.to_ascii_lowercase();
2804 error.contains("permission denied")
2805 || error.contains("authentication")
2806 || error.contains("host key verification failed")
2807 || error.contains("known_hosts")
2808 || error.contains("no valid authentication")
2809}
2810
2811fn unique_atomic_temp_path(path: &Path) -> PathBuf {
2812 unique_atomic_sidecar_path(path, "tmp", "sync_status.json")
2813}
2814
2815fn write_sync_status_temp_file(
2816 final_path: &Path,
2817 content: &[u8],
2818) -> Result<PathBuf, std::io::Error> {
2819 for _ in 0..100 {
2820 let tmp_path = unique_atomic_temp_path(final_path);
2821 match write_sync_status_temp_file_at(&tmp_path, content) {
2822 Ok(()) => return Ok(tmp_path),
2823 Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => continue,
2824 Err(err) => return Err(err),
2825 }
2826 }
2827
2828 Err(std::io::Error::new(
2829 std::io::ErrorKind::AlreadyExists,
2830 format!(
2831 "failed to allocate unique sync status temp path for {}",
2832 final_path.display()
2833 ),
2834 ))
2835}
2836
2837fn write_sync_status_temp_file_at(path: &Path, content: &[u8]) -> Result<(), std::io::Error> {
2838 let mut file = std::fs::OpenOptions::new()
2839 .write(true)
2840 .create_new(true)
2841 .open(path)?;
2842 file.write_all(content)?;
2843 file.sync_all()
2844}
2845
2846fn replace_file_from_temp(temp_path: &Path, final_path: &Path) -> Result<(), std::io::Error> {
2847 #[cfg(windows)]
2848 {
2849 match std::fs::rename(temp_path, final_path) {
2850 Ok(()) => sync_parent_directory(final_path),
2851 Err(first_err)
2852 if final_path.exists()
2853 && matches!(
2854 first_err.kind(),
2855 std::io::ErrorKind::AlreadyExists | std::io::ErrorKind::PermissionDenied
2856 ) =>
2857 {
2858 let backup_path = unique_replace_backup_path(final_path);
2859 std::fs::rename(final_path, &backup_path).map_err(|backup_err| {
2860 let _ = std::fs::remove_file(temp_path);
2861 std::io::Error::other(format!(
2862 "failed preparing backup {} before replacing {}: first error: {}; backup error: {}",
2863 backup_path.display(),
2864 final_path.display(),
2865 first_err,
2866 backup_err
2867 ))
2868 })?;
2869 match std::fs::rename(temp_path, final_path) {
2870 Ok(()) => {
2871 let _ = std::fs::remove_file(&backup_path);
2872 sync_parent_directory(final_path)
2873 }
2874 Err(second_err) => {
2875 let restore_result = std::fs::rename(&backup_path, final_path);
2876 match restore_result {
2877 Ok(()) => {
2878 let _ = std::fs::remove_file(temp_path);
2879 sync_parent_directory(final_path).map_err(|sync_err| {
2880 std::io::Error::other(format!(
2881 "failed replacing {} with {}: first error: {}; second error: {}; restored original file but failed syncing parent directory: {}",
2882 final_path.display(),
2883 temp_path.display(),
2884 first_err,
2885 second_err,
2886 sync_err
2887 ))
2888 })?;
2889 Err(std::io::Error::new(
2890 second_err.kind(),
2891 format!(
2892 "failed replacing {} with {}: first error: {}; second error: {}; restored original file",
2893 final_path.display(),
2894 temp_path.display(),
2895 first_err,
2896 second_err
2897 ),
2898 ))
2899 }
2900 Err(restore_err) => Err(std::io::Error::other(format!(
2901 "failed replacing {} with {}: first error: {}; second error: {}; restore error: {}; temp file retained at {}",
2902 final_path.display(),
2903 temp_path.display(),
2904 first_err,
2905 second_err,
2906 restore_err,
2907 temp_path.display()
2908 ))),
2909 }
2910 }
2911 }
2912 }
2913 Err(rename_err) => Err(rename_err),
2914 }
2915 }
2916
2917 #[cfg(not(windows))]
2918 {
2919 std::fs::rename(temp_path, final_path)?;
2920 sync_parent_directory(final_path)
2921 }
2922}
2923
2924fn sync_file_path(path: &Path) -> Result<(), std::io::Error> {
2925 std::fs::File::open(path)?.sync_all()
2926}
2927
2928#[cfg(not(windows))]
2929fn sync_parent_directory(path: &Path) -> Result<(), std::io::Error> {
2930 let Some(parent) = path.parent() else {
2931 return Ok(());
2932 };
2933 std::fs::File::open(parent)?.sync_all()
2934}
2935
2936#[cfg(windows)]
2937fn sync_parent_directory(_path: &Path) -> Result<(), std::io::Error> {
2938 Ok(())
2939}
2940
2941#[cfg(windows)]
2942fn unique_replace_backup_path(path: &Path) -> PathBuf {
2943 unique_atomic_sidecar_path(path, "bak", "sync_status.json")
2944}
2945
2946fn unique_atomic_sidecar_path(path: &Path, suffix: &str, fallback_name: &str) -> PathBuf {
2947 static NEXT_NONCE: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
2948
2949 let timestamp = std::time::SystemTime::now()
2950 .duration_since(std::time::UNIX_EPOCH)
2951 .unwrap_or_default()
2952 .as_nanos();
2953 let nonce = NEXT_NONCE.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2954 let file_name = path
2955 .file_name()
2956 .and_then(|name| name.to_str())
2957 .unwrap_or(fallback_name);
2958
2959 path.with_file_name(format!(
2960 ".{file_name}.{suffix}.{}.{}.{}",
2961 std::process::id(),
2962 timestamp,
2963 nonce
2964 ))
2965}
2966
2967#[cfg(test)]
2968mod tests {
2969 use super::*;
2970 use tempfile::TempDir;
2971
2972 #[test]
2973 fn test_path_to_safe_dirname() {
2974 let res = path_to_safe_dirname("~/.claude/projects");
2975 assert!(res.starts_with(".claude_projects_"));
2976
2977 let res = path_to_safe_dirname("/home/user/data");
2978 assert!(res.starts_with("home_user_data_"));
2979
2980 let res = path_to_safe_dirname("~/");
2981 assert!(res.starts_with("root_"));
2982
2983 let res = path_to_safe_dirname("");
2984 assert!(res.starts_with("root_"));
2985 }
2986
2987 #[test]
2988 fn test_path_to_safe_dirname_empty() {
2989 let res = path_to_safe_dirname("~");
2990 assert!(res.starts_with("root_"));
2991
2992 let res = path_to_safe_dirname("/");
2993 assert!(res.starts_with("root_"));
2994 }
2995
2996 #[test]
2997 fn test_path_to_safe_dirname_strips_traversal_components() {
2998 let res = path_to_safe_dirname("../../etc/passwd");
2999
3000 assert!(res.starts_with("etc_passwd_"));
3001 assert!(!res.contains(".."));
3002 assert!(!res.contains('/'));
3003 assert!(!res.contains('\\'));
3004 }
3005
3006 #[test]
3007 fn test_get_remote_home_rejects_unsafe_hosts_before_ssh() {
3008 let temp = TempDir::new().unwrap();
3009 let engine = SyncEngine::new(temp.path());
3010
3011 for host in [
3012 "work-mac;touch /tmp/cass-owned",
3013 "work mac",
3014 "work-mac\nhostname",
3015 "work-mac`hostname`",
3016 "work-mac/../../secret",
3017 "-oProxyCommand=evil",
3018 "",
3019 "@host",
3020 "user@",
3021 "user@host@extra",
3022 ] {
3023 let err = engine.get_remote_home(host).unwrap_err();
3024 assert!(
3025 matches!(err, SyncError::SshFailed(ref message) if message.contains("Invalid characters in host")),
3026 "expected invalid-host rejection for {host:?}, got {err}"
3027 );
3028 }
3029 }
3030
3031 #[test]
3032 fn test_sync_source_rejects_invalid_source_name_before_mirror_creation() {
3033 let temp = TempDir::new().unwrap();
3034 let engine = SyncEngine::new(temp.path());
3035 let mut source = SourceDefinition::ssh("../escape", "user@host");
3036 source.paths = vec!["/tmp/sessions".to_string()];
3037
3038 let err = engine
3039 .sync_source(&source)
3040 .expect_err("invalid source name should fail before local writes");
3041
3042 assert!(
3043 matches!(err, SyncError::InvalidSource(ref message) if message.contains("Source name cannot contain path separators")),
3044 "expected invalid source-name rejection, got {err}"
3045 );
3046 assert!(
3047 !temp.path().join("escape").exists(),
3048 "invalid source name must not escape the remotes mirror layout"
3049 );
3050 assert!(
3051 !temp.path().join("remotes").exists(),
3052 "invalid source name must be rejected before creating mirror roots"
3053 );
3054 }
3055
3056 #[test]
3057 fn test_sync_source_rejects_invalid_host_before_mirror_creation() {
3058 let temp = TempDir::new().unwrap();
3059 let engine = SyncEngine::new(temp.path());
3060 let mut source = SourceDefinition::ssh("unsafe-host", "user@host withspace");
3061 source.paths = vec!["/tmp/sessions".to_string()];
3062
3063 let err = engine
3064 .sync_source(&source)
3065 .expect_err("invalid host should fail before local writes");
3066
3067 assert!(
3068 matches!(err, SyncError::InvalidSource(ref message) if message.contains("SSH host cannot contain whitespace")),
3069 "expected invalid host rejection, got {err}"
3070 );
3071 assert!(
3072 !temp.path().join("remotes").exists(),
3073 "invalid host must be rejected before creating mirror roots"
3074 );
3075 }
3076
3077 #[test]
3078 fn test_sync_source_reports_invalid_remote_paths_without_transfer() {
3079 let temp = TempDir::new().unwrap();
3080 let engine = SyncEngine::new(temp.path());
3081
3082 for (path, expected) in [
3083 ("", "paths[0] cannot be empty"),
3084 (" ", "paths[0] cannot be empty"),
3085 (" ~/.claude/projects", "paths[0] cannot have leading"),
3086 ("~/.claude/projects ", "paths[0] cannot have leading"),
3087 ("~/.claude\nprojects", "paths[0] cannot contain control"),
3088 ] {
3089 let mut source = SourceDefinition::ssh("laptop", "user@laptop.local");
3090 source.paths = vec![path.to_string()];
3091
3092 let report = engine.sync_source(&source).unwrap();
3093 assert_eq!(report.path_results.len(), 1);
3094 let result = &report.path_results[0];
3095 assert!(!result.success);
3096 assert_eq!(result.remote_path, path);
3097 assert!(
3098 result
3099 .error
3100 .as_deref()
3101 .is_some_and(|message| message.contains(expected)),
3102 "expected invalid path rejection for {path:?}, got {result:?}"
3103 );
3104 }
3105 }
3106
3107 #[test]
3108 fn test_remote_sync_path_validation_allows_internal_spaces() {
3109 assert!(
3110 validate_remote_sync_path_entry(
3111 0,
3112 "~/Library/Application Support/Cursor/User/globalStorage"
3113 )
3114 .is_ok()
3115 );
3116 }
3117
3118 #[test]
3119 fn test_sync_source_preserves_path_result_order_for_mixed_invalid_paths() {
3120 let temp = TempDir::new().unwrap();
3121 let engine = SyncEngine::new(temp.path()).with_connection_timeout(1);
3122 let mut source = SourceDefinition::ssh("laptop", "192.0.2.1");
3125 source.paths = vec![
3126 "~/.codex/sessions".to_string(),
3127 " ~/.claude/projects".to_string(),
3128 "~/.gemini/tmp".to_string(),
3129 ];
3130
3131 let report = engine.sync_source(&source).unwrap();
3132 let remote_paths = report
3133 .path_results
3134 .iter()
3135 .map(|result| result.remote_path.as_str())
3136 .collect::<Vec<_>>();
3137
3138 assert_eq!(
3139 remote_paths,
3140 vec!["~/.codex/sessions", " ~/.claude/projects", "~/.gemini/tmp"]
3141 );
3142 assert!(
3143 report.path_results[1]
3144 .error
3145 .as_deref()
3146 .is_some_and(|message| message.contains("paths[1] cannot have leading")),
3147 "expected invalid path error in original slot: {:?}",
3148 report.path_results
3149 );
3150 }
3151
3152 #[test]
3153 fn test_remote_find_regular_files_command_uses_physical_traversal() {
3154 assert_eq!(
3155 remote_find_regular_files_command("/tmp/has space"),
3156 "find -P '/tmp/has space' -type f -print0"
3157 );
3158 assert_eq!(
3159 remote_find_regular_files_command("/tmp/that's all"),
3160 "find -P '/tmp/that'\\''s all' -type f -print0"
3161 );
3162 }
3163
3164 #[test]
3165 fn test_parse_remote_home_stdout_accepts_single_absolute_candidate() {
3166 assert_eq!(
3167 parse_remote_home_stdout(b"Welcome to host\nCASS_HOME_MARKER:/home/user\n"),
3168 Some("/home/user".to_string())
3169 );
3170 assert_eq!(
3171 parse_remote_home_stdout(b"CASS_HOME_MARKER:/Users/test user\r\n"),
3172 Some("/Users/test user".to_string())
3173 );
3174 }
3175
3176 #[test]
3177 fn test_parse_remote_home_stdout_rejects_missing_or_ambiguous_home() {
3178 assert_eq!(parse_remote_home_stdout(b"Welcome to host\n"), None);
3179 assert_eq!(
3180 parse_remote_home_stdout(b"CASS_HOME_MARKER:not_absolute\n"),
3181 None
3182 );
3183 }
3184
3185 #[test]
3186 fn test_parse_null_terminated_utf8_paths_skips_invalid_entries() {
3187 let paths = parse_null_terminated_utf8_paths(
3188 b"/remote/sessions/a.jsonl\0bad-\xff-name\0/remote/sessions/b.jsonl\0",
3189 );
3190 assert_eq!(
3191 paths,
3192 vec![
3193 "/remote/sessions/a.jsonl".to_string(),
3194 "/remote/sessions/b.jsonl".to_string()
3195 ]
3196 );
3197 }
3198
3199 #[test]
3200 fn test_remote_file_to_safe_local_path_rejects_outside_root() {
3201 let root = Path::new("/remote/sessions");
3202 let local = Path::new("/mirror/root");
3203
3204 assert_eq!(
3205 remote_file_to_safe_local_path(
3206 root,
3207 Path::new("/remote/sessions/a/b.jsonl"),
3208 local,
3209 "sessions"
3210 ),
3211 Some(PathBuf::from("/mirror/root/sessions/a/b.jsonl"))
3212 );
3213 assert_eq!(
3214 remote_file_to_safe_local_path(
3215 Path::new("/remote/session.jsonl"),
3216 Path::new("/remote/session.jsonl"),
3217 local,
3218 "session.jsonl"
3219 ),
3220 Some(PathBuf::from("/mirror/root/session.jsonl"))
3221 );
3222 assert_eq!(
3223 remote_file_to_safe_local_path(
3224 root,
3225 Path::new("/remote/sessions/../secret.txt"),
3226 local,
3227 "sessions"
3228 ),
3229 None
3230 );
3231 assert_eq!(
3232 remote_file_to_safe_local_path(
3233 root,
3234 Path::new("/remote/other/secret.txt"),
3235 local,
3236 "sessions"
3237 ),
3238 None
3239 );
3240 }
3241
3242 #[test]
3243 fn test_local_symlink_guard_allows_regular_paths() {
3244 let temp = TempDir::new().expect("tempdir");
3245 let root = temp.path().join("mirror");
3246 let target = root.join("sessions/session.jsonl");
3247
3248 assert!(reject_local_symlink_below_root(&root, &target).is_ok());
3249
3250 std::fs::create_dir_all(target.parent().expect("target parent")).expect("create parent");
3251 std::fs::write(&target, "{}").expect("write target");
3252
3253 assert!(reject_local_symlink_below_root(&root, &target).is_ok());
3254 }
3255
3256 #[cfg(unix)]
3257 #[test]
3258 fn test_local_symlink_guard_rejects_nested_symlink() {
3259 use std::os::unix::fs::symlink;
3260
3261 let temp = TempDir::new().expect("tempdir");
3262 let root = temp.path().join("mirror");
3263 let outside = temp.path().join("outside");
3264 std::fs::create_dir_all(&root).expect("create root");
3265 std::fs::create_dir_all(&outside).expect("create outside");
3266 symlink(&outside, root.join("sessions")).expect("symlink nested dir");
3267
3268 let err = reject_local_symlink_below_root(&root, &root.join("sessions/session.jsonl"))
3269 .expect_err("nested symlink should be rejected");
3270
3271 assert!(err.contains("Refusing to write"));
3272 assert!(err.contains("sessions"));
3273 }
3274
3275 #[cfg(unix)]
3276 #[test]
3277 fn test_local_symlink_guard_rejects_root_symlink() {
3278 use std::os::unix::fs::symlink;
3279
3280 let temp = TempDir::new().expect("tempdir");
3281 let outside = temp.path().join("outside");
3282 let root = temp.path().join("mirror-link");
3283 std::fs::create_dir_all(&outside).expect("create outside");
3284 symlink(&outside, &root).expect("symlink root");
3285
3286 let err = reject_local_symlink_below_root(&root, &root.join("session.jsonl"))
3287 .expect_err("root symlink should be rejected");
3288
3289 assert!(err.contains("Refusing to write"));
3290 assert!(err.contains("mirror-link"));
3291 }
3292
3293 #[test]
3294 fn test_prepare_local_sync_container_creates_regular_container() {
3295 let temp = TempDir::new().expect("tempdir");
3296 let root = temp.path().join("mirror");
3297 let target = root.join("sessions");
3298
3299 prepare_local_sync_container(&root, &target).expect("regular container should be created");
3300
3301 assert!(target.is_dir());
3302 }
3303
3304 #[cfg(unix)]
3305 #[test]
3306 fn test_prepare_local_sync_container_rejects_preexisting_target_symlink() {
3307 use std::os::unix::fs::symlink;
3308
3309 let temp = TempDir::new().expect("tempdir");
3310 let root = temp.path().join("mirror");
3311 let outside = temp.path().join("outside");
3312 let target = root.join("sessions");
3313 std::fs::create_dir_all(&root).expect("create root");
3314 std::fs::create_dir_all(&outside).expect("create outside");
3315 symlink(&outside, &target).expect("symlink target");
3316
3317 let err = prepare_local_sync_container(&root, &target)
3318 .expect_err("sync container symlink should be rejected");
3319
3320 assert!(err.contains("Refusing to write"));
3321 assert!(err.contains("sessions"));
3322 }
3323
3324 #[cfg(unix)]
3325 #[test]
3326 fn test_prepare_local_sync_container_rejects_root_symlink() {
3327 use std::os::unix::fs::symlink;
3328
3329 let temp = TempDir::new().expect("tempdir");
3330 let outside = temp.path().join("outside");
3331 let root = temp.path().join("mirror-link");
3332 let target = root.join("sessions");
3333 std::fs::create_dir_all(&outside).expect("create outside");
3334 symlink(&outside, &root).expect("symlink root");
3335
3336 let err = prepare_local_sync_container(&root, &target)
3337 .expect_err("sync root symlink should be rejected");
3338
3339 assert!(err.contains("Refusing to write"));
3340 assert!(err.contains("mirror-link"));
3341 }
3342
3343 #[cfg(unix)]
3344 #[test]
3345 fn test_prepare_local_sync_root_rejects_symlinked_source_parent() {
3346 use std::os::unix::fs::symlink;
3347
3348 let temp = TempDir::new().expect("tempdir");
3349 let local_store = temp.path().join("data");
3350 let remotes = local_store.join("remotes");
3351 let outside = temp.path().join("outside");
3352 let source_link = remotes.join("laptop");
3353 let mirror_dir = source_link.join("mirror");
3354
3355 std::fs::create_dir_all(&remotes).expect("create remotes");
3356 std::fs::create_dir_all(&outside).expect("create outside");
3357 symlink(&outside, &source_link).expect("symlink source parent");
3358
3359 let err = prepare_local_sync_root(&local_store, &mirror_dir)
3360 .expect_err("symlinked source parent should be rejected before mkdir");
3361
3362 assert!(err.contains("Refusing to write"));
3363 assert!(err.contains("laptop"));
3364 assert!(
3365 !outside.join("mirror").exists(),
3366 "sync root preparation must not create directories through source parent symlinks"
3367 );
3368 }
3369
3370 #[test]
3371 fn test_sftp_file_stat_is_symlink_detects_link_modes() {
3372 let symlink = FileStat {
3373 size: None,
3374 uid: None,
3375 gid: None,
3376 perm: Some(0o120000 | 0o777),
3377 atime: None,
3378 mtime: None,
3379 };
3380 let regular = FileStat {
3381 size: None,
3382 uid: None,
3383 gid: None,
3384 perm: Some(0o100000 | 0o644),
3385 atime: None,
3386 mtime: None,
3387 };
3388
3389 assert!(sftp_file_stat_is_symlink(&symlink));
3390 assert!(!sftp_file_stat_is_symlink(®ular));
3391 }
3392
3393 #[test]
3394 fn test_sftp_entry_file_name_accepts_regular_names() {
3395 let parent = Path::new("/remote");
3396 let entry = parent.join("session.jsonl");
3397
3398 assert_eq!(sftp_entry_file_name(&entry, parent), Some("session.jsonl"));
3399 }
3400
3401 #[test]
3402 fn test_sftp_entry_file_name_skips_dot_entries() {
3403 let parent = Path::new("/remote");
3404
3405 assert_eq!(sftp_entry_file_name(Path::new("."), parent), None);
3406 assert_eq!(sftp_entry_file_name(Path::new(".."), parent), None);
3407 }
3408
3409 #[cfg(unix)]
3410 #[test]
3411 fn test_sftp_entry_file_name_rejects_non_utf8_names() {
3412 use std::ffi::OsStr;
3413 use std::os::unix::ffi::OsStrExt;
3414
3415 let parent = Path::new("/remote");
3416 let bad_component = Path::new(OsStr::from_bytes(b"bad-\xff-name"));
3417 let entry = parent.join(bad_component);
3418
3419 assert_eq!(sftp_entry_file_name(&entry, parent), None);
3420 }
3421
3422 #[test]
3423 fn test_parse_rsync_stats() {
3424 let output = r#"
3425Number of files: 42
3426Number of regular files transferred: 10
3427Total transferred file size: 1,234 bytes
3428 "#;
3429
3430 let stats = parse_rsync_stats(output);
3431 assert_eq!(stats.files_transferred, 10);
3432 assert_eq!(stats.bytes_transferred, 1234);
3433 }
3434
3435 #[test]
3436 fn test_parse_rsync_stats_empty() {
3437 let stats = parse_rsync_stats("");
3438 assert_eq!(stats.files_transferred, 0);
3439 assert_eq!(stats.bytes_transferred, 0);
3440 }
3441
3442 #[test]
3443 fn test_quote_remote_shell_path_handles_spaces_and_quotes() {
3444 assert_eq!(
3445 quote_remote_shell_path("/Users/me/Library/Application Support/Cursor"),
3446 "'/Users/me/Library/Application Support/Cursor'"
3447 );
3448 assert_eq!(
3449 quote_remote_shell_path("/tmp/that's all"),
3450 "'/tmp/that'\\''s all'"
3451 );
3452 }
3453
3454 #[test]
3455 fn test_remote_spec_for_rsync_quotes_only_when_needed() {
3456 assert_eq!(
3457 remote_spec_for_rsync("work-mac", "/tmp/has space", true),
3458 "work-mac:/tmp/has space"
3459 );
3460 assert_eq!(
3461 remote_spec_for_rsync("work-mac", "/tmp/that's all", true),
3462 "work-mac:/tmp/that's all"
3463 );
3464 assert_eq!(
3465 remote_spec_for_rsync("work-mac", "/tmp/has space", false),
3466 "work-mac:'/tmp/has space'"
3467 );
3468 }
3469
3470 #[test]
3471 fn rsync_arg_protection_enum_maps_flags_correctly() {
3472 assert_eq!(
3477 RsyncArgProtection::ProtectArgs.flag(),
3478 Some("--protect-args")
3479 );
3480 assert_eq!(
3481 RsyncArgProtection::SecludedArgs.flag(),
3482 Some("--secluded-args")
3483 );
3484 assert_eq!(RsyncArgProtection::None.flag(), None);
3485 assert!(RsyncArgProtection::ProtectArgs.is_supported());
3486 assert!(RsyncArgProtection::SecludedArgs.is_supported());
3487 assert!(!RsyncArgProtection::None.is_supported());
3488 }
3489
3490 #[test]
3491 fn rsync_arg_protection_remote_rejected_detects_openrsync_errors() {
3492 assert!(rsync_arg_protection_remote_rejected(
3494 "rsync: invalid option -- s\nrsync error: syntax or usage error"
3495 ));
3496 assert!(rsync_arg_protection_remote_rejected(
3497 "remote rsync: unrecognized option '--secluded-args'"
3498 ));
3499 assert!(rsync_arg_protection_remote_rejected(
3500 "remote rsync: unknown option -- protect-args"
3501 ));
3502 assert!(rsync_arg_protection_remote_rejected(
3505 "rsync: illegal option -- s\nrsync error: error in rsync protocol data stream (code 12)"
3506 ));
3507 assert!(rsync_arg_protection_remote_rejected(
3509 "rsync: Illegal option -- s\n"
3510 ));
3511 assert!(!rsync_arg_protection_remote_rejected(
3512 "rsync: change_dir \"/missing\" failed: No such file or directory"
3513 ));
3514 }
3515
3516 #[test]
3521 fn parse_remote_rsync_is_openrsync_detects_apple_openrsync() {
3522 assert!(parse_remote_rsync_is_openrsync(
3524 "openrsync: protocol version 29"
3525 ));
3526 assert!(parse_remote_rsync_is_openrsync(
3528 "OpenRsync: protocol version 29"
3529 ));
3530 assert!(!parse_remote_rsync_is_openrsync(
3532 "rsync version 3.4.1 protocol version 31"
3533 ));
3534 assert!(!parse_remote_rsync_is_openrsync(
3535 "rsync version 2.6.9 protocol version 29"
3536 ));
3537 assert!(!parse_remote_rsync_is_openrsync(
3539 "rsync version 3.3.0 protocol version 31\nCopyright ..."
3540 ));
3541 assert!(!parse_remote_rsync_is_openrsync(""));
3543 }
3544
3545 #[test]
3546 fn openrsync_remote_forces_no_arg_protection_flag() {
3547 let protection_when_openrsync = if true {
3551 RsyncArgProtection::None
3553 } else {
3554 detect_rsync_arg_protection()
3555 };
3556 assert!(
3557 protection_when_openrsync.flag().is_none(),
3558 "openrsync remote must not receive --secluded-args or --protect-args; got {:?}",
3559 protection_when_openrsync.flag()
3560 );
3561 }
3562
3563 #[test]
3564 fn gnu_rsync_remote_allows_arg_protection_flag() {
3565 let remote_is_openrsync = false;
3574 let _protection = if remote_is_openrsync {
3576 RsyncArgProtection::None
3577 } else {
3578 RsyncArgProtection::SecludedArgs
3581 };
3582 assert!(
3585 !remote_is_openrsync,
3586 "sanity: non-openrsync path should not be treated as openrsync"
3587 );
3588 }
3589
3590 #[test]
3591 fn test_remote_spec_for_shell_bound_copy_quotes_remote_path() {
3592 assert_eq!(
3593 remote_spec_for_shell_bound_copy("work-mac", "/tmp/has space"),
3594 "work-mac:'/tmp/has space'"
3595 );
3596 }
3597
3598 #[test]
3599 fn test_remote_spec_for_scp_always_quotes_remote_path() {
3600 assert_eq!(
3601 remote_spec_for_scp("work-mac", "/tmp/that's all"),
3602 "work-mac:'/tmp/that'\\''s all'"
3603 );
3604 }
3605
3606 #[test]
3607 fn test_sync_report_totals() {
3608 let mut report = SyncReport::new("test", SyncMethod::Rsync);
3609 report.add_path_result(PathSyncResult {
3610 files_transferred: 5,
3611 bytes_transferred: 100,
3612 success: true,
3613 ..Default::default()
3614 });
3615 report.add_path_result(PathSyncResult {
3616 files_transferred: 3,
3617 bytes_transferred: 50,
3618 success: true,
3619 ..Default::default()
3620 });
3621
3622 assert_eq!(report.total_files(), 8);
3623 assert_eq!(report.total_bytes(), 150);
3624 assert!(report.all_succeeded);
3625 }
3626
3627 #[test]
3628 fn test_sync_report_with_failure() {
3629 let mut report = SyncReport::new("test", SyncMethod::Rsync);
3630 report.add_path_result(PathSyncResult {
3631 success: true,
3632 ..Default::default()
3633 });
3634 report.add_path_result(PathSyncResult {
3635 success: false,
3636 error: Some("Connection refused".into()),
3637 ..Default::default()
3638 });
3639
3640 assert!(!report.all_succeeded);
3641 assert_eq!(report.successful_paths(), 1);
3642 assert_eq!(report.failed_paths(), 1);
3643 }
3644
3645 #[test]
3646 fn test_detect_sync_method() {
3647 let method = SyncEngine::detect_sync_method();
3649 assert!(matches!(
3650 method,
3651 SyncMethod::Rsync | SyncMethod::WslRsync | SyncMethod::Scp | SyncMethod::Sftp
3652 ));
3653 }
3654
3655 #[test]
3656 fn test_sync_engine_mirror_dir() {
3657 let engine = SyncEngine::new(Path::new("/data/cass"));
3658 let mirror = engine.mirror_dir("laptop");
3659 assert_eq!(mirror, PathBuf::from("/data/cass/remotes/laptop/mirror"));
3660 }
3661
3662 #[test]
3663 fn test_sync_method_display() {
3664 for (method, expected) in [
3665 (SyncMethod::Rsync, "rsync"),
3666 (SyncMethod::WslRsync, "wsl-rsync"),
3667 (SyncMethod::Scp, "scp"),
3668 (SyncMethod::Sftp, "sftp"),
3669 ] {
3670 assert_eq!(method.as_str(), expected);
3671 assert_eq!(method.to_string(), expected);
3672 }
3673 }
3674
3675 #[test]
3676 fn test_windows_path_to_wsl_drive() {
3677 assert_eq!(
3678 windows_path_to_wsl("C:\\Users\\george\\AppData\\Roaming\\cass"),
3679 "/mnt/c/Users/george/AppData/Roaming/cass"
3680 );
3681 }
3682
3683 #[test]
3684 fn test_windows_path_to_wsl_forward_slash() {
3685 assert_eq!(
3686 windows_path_to_wsl("C:/Users/george/data"),
3687 "/mnt/c/Users/george/data"
3688 );
3689 }
3690
3691 #[test]
3692 fn test_windows_path_to_wsl_non_windows_path_unchanged() {
3693 assert_eq!(
3695 windows_path_to_wsl("/home/george/data"),
3696 "/home/george/data"
3697 );
3698 }
3699
3700 #[test]
3701 fn test_expand_tilde_with_home() {
3702 assert_eq!(
3704 SyncEngine::expand_tilde_with_home("/home/user/projects", Some("/home/user")),
3705 "/home/user/projects"
3706 );
3707
3708 assert_eq!(
3710 SyncEngine::expand_tilde_with_home("~/.claude/projects", Some("/home/user")),
3711 "/home/user/.claude/projects"
3712 );
3713
3714 assert_eq!(
3716 SyncEngine::expand_tilde_with_home("~", Some("/home/user")),
3717 "/home/user"
3718 );
3719
3720 assert_eq!(
3722 SyncEngine::expand_tilde_with_home("~/.claude/projects", None),
3723 "~/.claude/projects"
3724 );
3725
3726 assert_eq!(
3728 SyncEngine::expand_tilde_with_home("~otheruser/projects", Some("/home/user")),
3729 "~otheruser/projects"
3730 );
3731 }
3732
3733 #[test]
3734 fn test_sync_report_failed() {
3735 let report = SyncReport::failed("test-source", SyncError::NoHost);
3736 assert_eq!(report.source_name, "test-source");
3737 assert!(!report.all_succeeded);
3738 assert_eq!(report.path_results.len(), 1);
3739 assert!(!report.path_results[0].success);
3740 assert!(report.path_results[0].error.is_some());
3741 }
3742
3743 #[test]
3744 fn test_sync_result_default() {
3745 let result = SyncResult::default();
3746 assert!(matches!(result, SyncResult::Skipped));
3747 assert_eq!(result.label(), "never");
3748 }
3749
3750 #[test]
3751 fn test_source_sync_info_default() {
3752 let info = SourceSyncInfo::default();
3753 assert!(info.last_sync.is_none());
3754 assert_eq!(info.files_synced, 0);
3755 assert_eq!(info.bytes_transferred, 0);
3756 assert_eq!(info.duration_ms, 0);
3757 }
3758
3759 #[test]
3760 fn test_sync_status_update() {
3761 let mut status = SyncStatus::default();
3762
3763 let mut report = SyncReport::new("laptop", SyncMethod::Rsync);
3764 report.add_path_result(PathSyncResult {
3765 files_transferred: 10,
3766 bytes_transferred: 1000,
3767 success: true,
3768 ..Default::default()
3769 });
3770 report.total_duration_ms = 500;
3771
3772 status.update("laptop", &report);
3773
3774 let info = status.get("laptop").unwrap();
3775 assert!(info.last_sync.is_some());
3776 assert!(matches!(info.last_result, SyncResult::Success));
3777 assert_eq!(info.files_synced, 10);
3778 assert_eq!(info.bytes_transferred, 1000);
3779 assert_eq!(info.duration_ms, 500);
3780 }
3781
3782 #[test]
3783 fn test_sync_status_partial_failure() {
3784 let mut status = SyncStatus::default();
3785
3786 let mut report = SyncReport::new("server", SyncMethod::Rsync);
3787 report.add_path_result(PathSyncResult {
3788 success: true,
3789 files_transferred: 5,
3790 ..Default::default()
3791 });
3792 report.add_path_result(PathSyncResult {
3793 success: false,
3794 error: Some("Connection refused".into()),
3795 ..Default::default()
3796 });
3797
3798 status.update("server", &report);
3799
3800 let info = status.get("server").unwrap();
3801 assert!(matches!(info.last_result, SyncResult::PartialFailure(_)));
3802 }
3803
3804 #[test]
3805 fn test_sync_status_full_failure() {
3806 let mut status = SyncStatus::default();
3807
3808 let mut report = SyncReport::new("dead-host", SyncMethod::Rsync);
3809 report.add_path_result(PathSyncResult {
3810 success: false,
3811 error: Some("Host unreachable".into()),
3812 ..Default::default()
3813 });
3814
3815 status.update("dead-host", &report);
3816
3817 let info = status.get("dead-host").unwrap();
3818 assert!(matches!(info.last_result, SyncResult::Failed(_)));
3819 }
3820
3821 #[test]
3822 fn test_sync_status_save_round_trips() {
3823 let temp = TempDir::new().expect("tempdir");
3824 let mut status = SyncStatus::default();
3825 let mut report = SyncReport::new("laptop", SyncMethod::Rsync);
3826 report.add_path_result(PathSyncResult {
3827 files_transferred: 3,
3828 bytes_transferred: 42,
3829 success: true,
3830 ..Default::default()
3831 });
3832 status.update("laptop", &report);
3833
3834 status.save(temp.path()).expect("save status");
3835 let loaded = SyncStatus::load(temp.path()).expect("load status");
3836
3837 let info = loaded.get("laptop").expect("round-tripped source");
3838 assert_eq!(info.files_synced, 3);
3839 assert_eq!(info.bytes_transferred, 42);
3840 assert!(matches!(info.last_result, SyncResult::Success));
3841 }
3842
3843 #[cfg(unix)]
3844 #[test]
3845 fn test_sync_status_temp_write_refuses_existing_symlink() {
3846 use std::os::unix::fs::symlink;
3847
3848 let temp = TempDir::new().expect("tempdir");
3849 let protected = temp.path().join("protected.json");
3850 let temp_path = temp.path().join(".sync_status.json.tmp");
3851
3852 std::fs::write(&protected, b"protected").expect("write protected target");
3853 symlink(&protected, &temp_path).expect("create temp symlink");
3854
3855 let err = write_sync_status_temp_file_at(&temp_path, br#"{"sources":{}}"#)
3856 .expect_err("existing temp symlink must be rejected");
3857
3858 assert_eq!(err.kind(), std::io::ErrorKind::AlreadyExists);
3859 assert_eq!(
3860 std::fs::read(&protected).expect("read protected target"),
3861 b"protected"
3862 );
3863 assert!(
3864 std::fs::symlink_metadata(&temp_path)
3865 .expect("temp path metadata")
3866 .file_type()
3867 .is_symlink(),
3868 "failed temp write should leave the existing symlink untouched"
3869 );
3870 }
3871
3872 #[test]
3873 fn test_sync_status_retain_sources_prunes_removed_entries() {
3874 let mut status = SyncStatus::default();
3875 status.sources.insert(
3876 "laptop".into(),
3877 SourceSyncInfo {
3878 files_synced: 3,
3879 ..Default::default()
3880 },
3881 );
3882 status.sources.insert(
3883 "desktop".into(),
3884 SourceSyncInfo {
3885 files_synced: 5,
3886 ..Default::default()
3887 },
3888 );
3889
3890 let removed_any = status.retain_sources(["laptop"]);
3891
3892 assert!(removed_any);
3893 assert!(status.get("laptop").is_some());
3894 assert!(status.get("desktop").is_none());
3895 }
3896
3897 fn source_with_schedule(schedule: SyncSchedule) -> SourceDefinition {
3898 let mut source = SourceDefinition::ssh("laptop", "user@laptop.local");
3899 source.sync_schedule = schedule;
3900 source.paths = vec!["~/.claude/projects".to_string()];
3901 source
3902 }
3903
3904 fn status_with_info(info: SourceSyncInfo) -> SyncStatus {
3905 let mut status = SyncStatus::default();
3906 status.set_info("laptop", info);
3907 status
3908 }
3909
3910 #[test]
3911 fn source_sync_decision_skips_healthy_source_until_schedule_due() {
3912 let now_ms = 1_700_000_000_000;
3913 let source = source_with_schedule(SyncSchedule::Hourly);
3914 let status = status_with_info(SourceSyncInfo {
3915 last_sync: Some(now_ms - 10 * 60 * 1000),
3916 last_result: SyncResult::Success,
3917 duration_ms: 250,
3918 ..Default::default()
3919 });
3920
3921 let decision = status.decision_for_source_at(&source, now_ms, false);
3922
3923 assert_eq!(decision.action, SourceSyncAction::Skip);
3924 assert_eq!(decision.health, SourceHealthKind::Healthy);
3925 assert!(!decision.fallback_active);
3926 assert_eq!(
3927 decision.next_eligible_sync_ms,
3928 Some(now_ms + 50 * 60 * 1000)
3929 );
3930 assert_eq!(decision.staleness_ms, Some(10 * 60 * 1000));
3931 assert_eq!(decision.stale_value_score, 16);
3932 }
3933
3934 #[test]
3935 fn source_sync_decision_syncs_stale_scheduled_source() {
3936 let now_ms = 1_700_000_000_000;
3937 let source = source_with_schedule(SyncSchedule::Hourly);
3938 let status = status_with_info(SourceSyncInfo {
3939 last_sync: Some(now_ms - 2 * 60 * 60 * 1000),
3940 last_result: SyncResult::Success,
3941 duration_ms: 250,
3942 ..Default::default()
3943 });
3944
3945 let decision = status.decision_for_source_at(&source, now_ms, false);
3946
3947 assert_eq!(decision.action, SourceSyncAction::Sync);
3948 assert_eq!(decision.health, SourceHealthKind::Stale);
3949 assert_eq!(decision.stale_value_score, 100);
3950 assert!(
3951 decision
3952 .reasons
3953 .iter()
3954 .any(|reason| reason.contains("schedule is due"))
3955 );
3956 }
3957
3958 #[test]
3959 fn source_sync_decision_defers_auth_failures_with_fallback_reason() {
3960 let now_ms = 1_700_000_000_000;
3961 let source = source_with_schedule(SyncSchedule::Hourly);
3962 let status = status_with_info(SourceSyncInfo {
3963 last_sync: Some(now_ms - 10 * 60 * 1000),
3964 last_result: SyncResult::Failed("Permission denied (publickey)".into()),
3965 duration_ms: 800,
3966 consecutive_failures: 1,
3967 ..Default::default()
3968 });
3969
3970 let decision = status.decision_for_source_at(&source, now_ms, false);
3971
3972 assert_eq!(decision.action, SourceSyncAction::Defer);
3973 assert_eq!(decision.health, SourceHealthKind::AuthFailed);
3974 assert!(decision.fallback_active);
3975 assert_eq!(decision.health_score, 10);
3976 }
3977
3978 #[test]
3979 fn source_sync_decision_marks_partial_success_as_flapping() {
3980 let now_ms = 1_700_000_000_000;
3981 let source = source_with_schedule(SyncSchedule::Hourly);
3982 let status = status_with_info(SourceSyncInfo {
3983 last_sync: Some(now_ms - 10 * 60 * 1000),
3984 last_result: SyncResult::PartialFailure("one path failed".into()),
3985 files_synced: 7,
3986 duration_ms: 900,
3987 consecutive_failures: 1,
3988 ..Default::default()
3989 });
3990
3991 let decision = status.decision_for_source_at(&source, now_ms, false);
3992
3993 assert_eq!(decision.action, SourceSyncAction::Skip);
3994 assert_eq!(decision.health, SourceHealthKind::Flapping);
3995 assert!(decision.fallback_active);
3996 }
3997
3998 #[test]
3999 fn source_sync_decision_keeps_local_fallback_after_unreachable_backoff_expires() {
4000 let now_ms = 1_700_000_000_000;
4001 let source = source_with_schedule(SyncSchedule::Hourly);
4002 let last_sync = now_ms - 10 * 60 * 1000;
4003 let status = status_with_info(SourceSyncInfo {
4004 last_sync: Some(last_sync),
4005 last_result: SyncResult::Failed("Host unreachable".into()),
4006 duration_ms: 900,
4007 consecutive_failures: 1,
4008 ..Default::default()
4009 });
4010
4011 let decision = status.decision_for_source_at(&source, now_ms, false);
4012
4013 assert_eq!(decision.action, SourceSyncAction::Skip);
4014 assert_eq!(decision.health, SourceHealthKind::Flapping);
4015 assert!(decision.fallback_active);
4016 assert_eq!(
4017 decision.backoff_until_ms,
4018 Some(last_sync + SOURCE_FAILURE_BACKOFF_BASE_MS)
4019 );
4020 assert!(
4021 decision
4022 .reasons
4023 .iter()
4024 .any(|reason| reason.contains("local fallback remains active"))
4025 );
4026 }
4027
4028 #[test]
4029 fn source_sync_decision_marks_slow_source_as_high_latency() {
4030 let now_ms = 1_700_000_000_000;
4031 let source = source_with_schedule(SyncSchedule::Hourly);
4032 let status = status_with_info(SourceSyncInfo {
4033 last_sync: Some(now_ms - 10 * 60 * 1000),
4034 last_result: SyncResult::Success,
4035 duration_ms: SOURCE_HIGH_LATENCY_MS + 1,
4036 ..Default::default()
4037 });
4038
4039 let decision = status.decision_for_source_at(&source, now_ms, false);
4040
4041 assert_eq!(decision.action, SourceSyncAction::Skip);
4042 assert_eq!(decision.health, SourceHealthKind::HighLatency);
4043 assert!(decision.fallback_active);
4044 }
4045
4046 #[test]
4047 fn source_sync_decision_manual_override_forces_sync() {
4048 let now_ms = 1_700_000_000_000;
4049 let source = source_with_schedule(SyncSchedule::Manual);
4050 let status = status_with_info(SourceSyncInfo {
4051 last_sync: Some(now_ms),
4052 last_result: SyncResult::Success,
4053 duration_ms: 100,
4054 ..Default::default()
4055 });
4056
4057 let decision = status.decision_for_source_at(&source, now_ms, true);
4058
4059 assert_eq!(decision.action, SourceSyncAction::Sync);
4060 assert!(decision.manual_override);
4061 assert!(
4062 decision
4063 .reasons
4064 .iter()
4065 .any(|reason| reason.contains("overrides automatic scheduling"))
4066 );
4067 }
4068
4069 #[test]
4070 fn test_unique_atomic_temp_path_changes_each_call() {
4071 let final_path = Path::new("/tmp/sync_status.json");
4072 let first = unique_atomic_temp_path(final_path);
4073 let second = unique_atomic_temp_path(final_path);
4074
4075 assert_ne!(first, second);
4076 assert_eq!(first.parent(), final_path.parent());
4077 assert_eq!(second.parent(), final_path.parent());
4078 }
4079
4080 #[test]
4081 fn test_replace_file_from_temp_overwrites_existing_file() {
4082 let temp = TempDir::new().expect("tempdir");
4083 let final_path = temp.path().join("sync_status.json");
4084 let first_tmp = temp.path().join("first.tmp");
4085 let second_tmp = temp.path().join("second.tmp");
4086
4087 std::fs::write(&first_tmp, "{\"first\":true}").expect("write first temp");
4088 replace_file_from_temp(&first_tmp, &final_path).expect("initial replace");
4089 assert_eq!(
4090 std::fs::read_to_string(&final_path).expect("read first final"),
4091 "{\"first\":true}"
4092 );
4093
4094 std::fs::write(&second_tmp, "{\"second\":true}").expect("write second temp");
4095 replace_file_from_temp(&second_tmp, &final_path).expect("overwrite replace");
4096 assert_eq!(
4097 std::fs::read_to_string(&final_path).expect("read second final"),
4098 "{\"second\":true}"
4099 );
4100 }
4101
4102 #[test]
4103 fn test_sync_engine_with_timeouts() {
4104 let engine = SyncEngine::new(Path::new("/data"))
4105 .with_connection_timeout(30)
4106 .with_transfer_timeout(600);
4107
4108 assert_eq!(engine.connection_timeout, 30);
4109 assert_eq!(engine.transfer_timeout, 600);
4110 }
4111
4112 #[test]
4113 fn test_sync_error_display() {
4114 assert_eq!(
4115 SyncError::NoHost.to_string(),
4116 "Source has no host configured"
4117 );
4118 assert_eq!(
4119 SyncError::NoPaths.to_string(),
4120 "Source has no paths configured"
4121 );
4122 assert_eq!(
4123 SyncError::InvalidPath("paths[0] cannot be empty".to_string()).to_string(),
4124 "Invalid source path: paths[0] cannot be empty"
4125 );
4126 assert_eq!(
4127 SyncError::Timeout(30).to_string(),
4128 "Connection timed out after 30 seconds"
4129 );
4130 assert_eq!(SyncError::Cancelled.to_string(), "Sync cancelled");
4131 }
4132
4133 #[test]
4138 fn test_parse_ssh_host_simple() {
4139 let (user, host) = parse_ssh_host("myserver");
4140 assert!(user.is_none());
4141 assert_eq!(host, "myserver");
4142 }
4143
4144 #[test]
4145 fn test_parse_ssh_host_with_user() {
4146 let (user, host) = parse_ssh_host("admin@myserver");
4147 assert_eq!(user, Some("admin"));
4148 assert_eq!(host, "myserver");
4149 }
4150
4151 #[test]
4152 fn test_parse_ssh_host_with_domain() {
4153 let (user, host) = parse_ssh_host("deploy@server.example.com");
4154 assert_eq!(user, Some("deploy"));
4155 assert_eq!(host, "server.example.com");
4156 }
4157
4158 #[test]
4159 fn test_parse_ssh_host_email_like() {
4160 let (user, host) = parse_ssh_host("user@host");
4162 assert_eq!(user, Some("user"));
4163 assert_eq!(host, "host");
4164 }
4165
4166 #[test]
4167 fn test_first_nonblank_username_priority_and_trimming() {
4168 assert_eq!(
4169 first_nonblank_username([Some(" alice "), Some("bob")]),
4170 Some("alice".to_string())
4171 );
4172 assert_eq!(
4173 first_nonblank_username([Some(" "), None, Some("carol")]),
4174 Some("carol".to_string())
4175 );
4176 assert_eq!(first_nonblank_username([None, Some("\t")]), None);
4177 }
4178
4179 #[test]
4180 fn test_expand_tilde_local_with_tilde_prefix() {
4181 let expanded = expand_tilde_local("~/Documents/file.txt");
4182 assert!(!expanded.starts_with('~'));
4184 assert!(expanded.ends_with("/Documents/file.txt"));
4185 }
4186
4187 #[test]
4188 fn test_expand_tilde_local_just_tilde() {
4189 let expanded = expand_tilde_local("~");
4190 assert!(!expanded.starts_with('~'));
4192 assert!(!expanded.is_empty());
4193 }
4194
4195 #[test]
4196 fn test_expand_tilde_local_no_tilde() {
4197 let path = "/absolute/path/to/file";
4198 let expanded = expand_tilde_local(path);
4199 assert_eq!(expanded, path);
4200 }
4201
4202 #[test]
4203 fn test_expand_tilde_local_tilde_in_middle() {
4204 let path = "/path/with/~tilde/inside";
4206 let expanded = expand_tilde_local(path);
4207 assert_eq!(expanded, path);
4208 }
4209}