1use std::path::{Path, PathBuf};
28use std::process::{Command, Stdio};
29use std::sync::OnceLock;
30use std::time::{Duration, Instant};
31
32use thiserror::Error;
33
34use super::{
35 config::{
36 SourceDefinition, SyncSchedule, discover_ssh_hosts, source_path_entry_error,
37 ssh_host_has_safe_token_chars, validate_optional_user_host_shape,
38 },
39 host_key_verification_error, is_host_key_verification_failure, strict_ssh_cli_tokens,
40 strict_ssh_command_for_rsync, wait_for_child_output_with_timeout,
41};
42use ssh2::{FileStat, Session, Sftp};
43use std::io::{Read as IoRead, Write as IoWrite};
44use std::net::{Shutdown, TcpStream};
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55enum RsyncArgProtection {
56 None,
59 ProtectArgs,
61 SecludedArgs,
63}
64
65impl RsyncArgProtection {
66 fn is_supported(self) -> bool {
67 !matches!(self, Self::None)
68 }
69
70 fn flag(self) -> Option<&'static str> {
73 match self {
74 Self::ProtectArgs => Some("--protect-args"),
75 Self::SecludedArgs => Some("--secluded-args"),
76 Self::None => None,
77 }
78 }
79}
80
81fn detect_rsync_arg_protection() -> RsyncArgProtection {
82 static CACHED: OnceLock<RsyncArgProtection> = OnceLock::new();
83 *CACHED.get_or_init(|| {
84 let Some(out) = Command::new("rsync").arg("--help").output().ok() else {
85 return RsyncArgProtection::None;
86 };
87 let mut combined = String::from_utf8_lossy(&out.stdout).into_owned();
91 combined.push_str(&String::from_utf8_lossy(&out.stderr));
92 if combined.contains("--secluded-args") {
97 RsyncArgProtection::SecludedArgs
98 } else if combined.contains("--protect-args") {
99 RsyncArgProtection::ProtectArgs
100 } else {
101 RsyncArgProtection::None
102 }
103 })
104}
105
106fn quote_remote_shell_path(path: &str) -> String {
107 format!("'{}'", path.replace('\'', r#"'\''"#))
113}
114
115fn remote_spec_for_shell_bound_copy(host: &str, remote_path: &str) -> String {
116 format!("{host}:{}", quote_remote_shell_path(remote_path))
120}
121
122fn remote_spec_for_rsync(host: &str, remote_path: &str, protect_args_supported: bool) -> String {
123 if protect_args_supported {
124 format!("{host}:{remote_path}")
126 } else {
127 remote_spec_for_shell_bound_copy(host, remote_path)
129 }
130}
131
132fn remote_spec_for_scp(host: &str, remote_path: &str) -> String {
133 remote_spec_for_shell_bound_copy(host, remote_path)
136}
137
138fn remote_find_regular_files_command(remote_path: &str) -> String {
139 format!(
140 "find -P {} -type f -print0",
141 quote_remote_shell_path(remote_path)
142 )
143}
144
145fn parse_remote_home_stdout(stdout: &[u8]) -> Option<String> {
146 let output = String::from_utf8_lossy(stdout);
147 for line in output.lines() {
148 if let Some(home) = line.trim().strip_prefix("CASS_HOME_MARKER:")
149 && home.starts_with('/')
150 && !home.contains('\0')
151 {
152 return Some(home.to_string());
153 }
154 }
155 None
156}
157
158fn parse_null_terminated_utf8_paths(bytes: &[u8]) -> Vec<String> {
159 bytes
160 .split(|byte| *byte == 0)
161 .filter(|part| !part.is_empty())
162 .filter_map(|part| std::str::from_utf8(part).ok())
163 .map(ToOwned::to_owned)
164 .collect()
165}
166
167fn validate_remote_sync_path_entry(index: usize, path: &str) -> Result<(), SyncError> {
168 match source_path_entry_error(index, path) {
169 Some(message) => Err(SyncError::InvalidPath(message)),
170 None => Ok(()),
171 }
172}
173
174fn invalid_remote_sync_path_result(remote_path: &str, err: SyncError) -> PathSyncResult {
175 PathSyncResult {
176 remote_path: remote_path.to_string(),
177 success: false,
178 error: Some(err.to_string()),
179 ..Default::default()
180 }
181}
182
183fn remote_file_to_safe_local_path(
184 remote_root: &Path,
185 remote_file: &Path,
186 local_container: &Path,
187 leaf_name: &str,
188) -> Option<PathBuf> {
189 let mut local_path = local_container.join(leaf_name);
190 if remote_file == remote_root {
191 return Some(local_path);
192 }
193
194 let relative = remote_file.strip_prefix(remote_root).ok()?;
195 for component in relative.components() {
196 match component {
197 std::path::Component::Normal(name) => local_path.push(name),
198 std::path::Component::CurDir => {}
199 _ => return None,
200 }
201 }
202
203 Some(local_path)
204}
205
206fn existing_local_symlink_below_root(root: &Path, path: &Path) -> Result<Option<PathBuf>, String> {
207 let rel = path.strip_prefix(root).map_err(|_| {
208 format!(
209 "Local path {} is outside sync root {}",
210 path.display(),
211 root.display()
212 )
213 })?;
214
215 let mut current = root.to_path_buf();
216 if let Some(link) = existing_path_symlink(¤t)? {
217 return Ok(Some(link));
218 }
219
220 for component in rel.components() {
221 match component {
222 std::path::Component::Normal(name) => current.push(name),
223 std::path::Component::CurDir => continue,
224 _ => {
225 return Err(format!(
226 "Local path {} contains unsafe component below sync root {}",
227 path.display(),
228 root.display()
229 ));
230 }
231 }
232
233 if let Some(link) = existing_path_symlink(¤t)? {
234 return Ok(Some(link));
235 }
236 }
237
238 Ok(None)
239}
240
241fn existing_path_symlink(path: &Path) -> Result<Option<PathBuf>, String> {
242 match std::fs::symlink_metadata(path) {
243 Ok(metadata) if metadata.file_type().is_symlink() => Ok(Some(path.to_path_buf())),
244 Ok(_) => Ok(None),
245 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
246 Err(e) => Err(format!("Failed to inspect {}: {}", path.display(), e)),
247 }
248}
249
250fn reject_local_symlink_below_root(root: &Path, path: &Path) -> Result<(), String> {
251 if let Some(link) = existing_local_symlink_below_root(root, path)? {
252 return Err(format!(
253 "Refusing to write {} through local symlink {}",
254 path.display(),
255 link.display()
256 ));
257 }
258
259 Ok(())
260}
261
262fn prepare_local_sync_container(sync_root: &Path, local_path: &Path) -> Result<(), String> {
263 reject_local_symlink_below_root(sync_root, local_path)?;
264 std::fs::create_dir_all(local_path)
265 .map_err(|e| format!("Failed to create directory: {}", e))?;
266 reject_local_symlink_below_root(sync_root, local_path)?;
267 Ok(())
268}
269
270fn prepare_local_sync_root(local_store: &Path, mirror_dir: &Path) -> Result<(), String> {
271 reject_local_symlink_below_root(local_store, mirror_dir)?;
272 std::fs::create_dir_all(mirror_dir)
273 .map_err(|e| format!("Failed to create directory: {}", e))?;
274 reject_local_symlink_below_root(local_store, mirror_dir)?;
275 Ok(())
276}
277
278fn sftp_file_stat_is_symlink(stat: &FileStat) -> bool {
279 stat.file_type().is_symlink()
280}
281
282#[derive(Error, Debug)]
284pub enum SyncError {
285 #[error("Source has no host configured")]
286 NoHost,
287
288 #[error("Source has no paths configured")]
289 NoPaths,
290
291 #[error("Invalid source path: {0}")]
292 InvalidPath(String),
293
294 #[error("Invalid source definition: {0}")]
295 InvalidSource(String),
296
297 #[error("rsync command failed: {0}")]
298 RsyncFailed(String),
299
300 #[error("Failed to create local directory: {0}")]
301 CreateDirFailed(#[from] std::io::Error),
302
303 #[error("SSH connection failed: {0}")]
304 SshFailed(String),
305
306 #[error("Connection timed out after {0} seconds")]
307 Timeout(u64),
308
309 #[error("Sync cancelled")]
310 Cancelled,
311}
312
313#[derive(Debug, Clone, Copy, PartialEq, Eq)]
315pub enum SyncMethod {
316 Rsync,
318 WslRsync,
321 Scp,
328 Sftp,
334}
335
336impl SyncMethod {
337 pub fn as_str(self) -> &'static str {
338 match self {
339 Self::Rsync => "rsync",
340 Self::WslRsync => "wsl-rsync",
341 Self::Scp => "scp",
342 Self::Sftp => "sftp",
343 }
344 }
345}
346
347impl std::fmt::Display for SyncMethod {
348 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
349 f.write_str(self.as_str())
350 }
351}
352
353#[derive(Debug, Clone, Default)]
355pub struct PathSyncResult {
356 pub remote_path: String,
358 pub local_path: PathBuf,
360 pub files_transferred: u64,
362 pub bytes_transferred: u64,
364 pub success: bool,
366 pub error: Option<String>,
368 pub duration_ms: u64,
370}
371
372#[derive(Debug, Clone)]
374pub struct SyncReport {
375 pub source_name: String,
377 pub method: SyncMethod,
379 pub path_results: Vec<PathSyncResult>,
381 pub total_duration_ms: u64,
383 pub all_succeeded: bool,
385}
386
387impl SyncReport {
388 pub fn new(source_name: impl Into<String>, method: SyncMethod) -> Self {
390 Self {
391 source_name: source_name.into(),
392 method,
393 path_results: Vec::new(),
394 total_duration_ms: 0,
395 all_succeeded: true,
396 }
397 }
398
399 pub fn failed(source_name: impl Into<String>, error: SyncError) -> Self {
401 Self {
402 source_name: source_name.into(),
403 method: SyncMethod::Rsync,
404 path_results: vec![PathSyncResult {
405 error: Some(error.to_string()),
406 success: false,
407 ..Default::default()
408 }],
409 total_duration_ms: 0,
410 all_succeeded: false,
411 }
412 }
413
414 pub fn add_path_result(&mut self, result: PathSyncResult) {
416 if !result.success {
417 self.all_succeeded = false;
418 }
419 self.path_results.push(result);
420 }
421
422 pub fn total_files(&self) -> u64 {
424 self.path_results.iter().map(|r| r.files_transferred).sum()
425 }
426
427 pub fn total_bytes(&self) -> u64 {
429 self.path_results.iter().map(|r| r.bytes_transferred).sum()
430 }
431
432 pub fn successful_paths(&self) -> usize {
434 self.path_results.iter().filter(|r| r.success).count()
435 }
436
437 pub fn failed_paths(&self) -> usize {
439 self.path_results.iter().filter(|r| !r.success).count()
440 }
441
442 pub fn sync_result(&self) -> SyncResult {
444 if self.all_succeeded {
445 SyncResult::Success
446 } else {
447 let errors: Vec<String> = self
448 .path_results
449 .iter()
450 .filter_map(|r| r.error.clone())
451 .collect();
452 if self.successful_paths() > 0 {
453 SyncResult::PartialFailure(errors.join("; "))
454 } else {
455 SyncResult::Failed(errors.join("; "))
456 }
457 }
458 }
459}
460
461#[derive(Debug, Default)]
463struct RsyncStats {
464 files_transferred: u64,
465 bytes_transferred: u64,
466}
467
468pub struct SyncEngine {
470 local_store: PathBuf,
473 connection_timeout: u64,
475 transfer_timeout: u64,
477}
478
479impl SyncEngine {
480 pub fn new(data_dir: &Path) -> Self {
485 Self {
486 local_store: data_dir.to_path_buf(),
487 connection_timeout: 10,
488 transfer_timeout: 300, }
490 }
491
492 pub fn with_connection_timeout(mut self, seconds: u64) -> Self {
494 self.connection_timeout = seconds;
495 self
496 }
497
498 pub fn with_transfer_timeout(mut self, seconds: u64) -> Self {
500 self.transfer_timeout = seconds;
501 self
502 }
503
504 pub fn mirror_dir(&self, source_name: &str) -> PathBuf {
506 self.local_store
507 .join("remotes")
508 .join(source_name)
509 .join("mirror")
510 }
511
512 fn get_remote_home(&self, host: &str) -> Result<String, SyncError> {
516 if host.trim().is_empty()
518 || host.starts_with('-')
519 || !ssh_host_has_safe_token_chars(host)
520 || validate_optional_user_host_shape(host).is_err()
521 {
522 return Err(SyncError::SshFailed(format!(
523 "Invalid characters in host: {}",
524 host
525 )));
526 }
527
528 let timeout_secs = self.connection_timeout.max(1);
529 let mut cmd = Command::new("ssh");
530 cmd.args(strict_ssh_cli_tokens(timeout_secs))
531 .arg("--")
532 .arg(host)
533 .arg("printf 'CASS_HOME_MARKER:%s\\n' \"$HOME\"")
534 .stdout(Stdio::piped())
535 .stderr(Stdio::piped());
536
537 let child = cmd
538 .spawn()
539 .map_err(|e| SyncError::SshFailed(format!("Failed to execute ssh: {}", e)))?;
540 let output = wait_for_child_output_with_timeout(child, Duration::from_secs(timeout_secs))
541 .map_err(|e| SyncError::SshFailed(format!("SSH command failed: {}", e)))?
542 .ok_or(SyncError::Timeout(timeout_secs))?;
543
544 if !output.status.success() {
545 let stderr = String::from_utf8_lossy(&output.stderr);
546 if is_host_key_verification_failure(&stderr) {
547 return Err(SyncError::SshFailed(host_key_verification_error(host)));
548 }
549 return Err(SyncError::SshFailed(format!(
550 "Failed to get remote home directory: {}",
551 stderr.trim()
552 )));
553 }
554
555 let remote_home = parse_remote_home_stdout(&output.stdout).ok_or_else(|| {
556 SyncError::SshFailed(
557 "Unable to parse remote home directory from SSH output".to_string(),
558 )
559 })?;
560
561 tracing::debug!(host = %host, remote_home = %remote_home, "got remote home directory");
562 Ok(remote_home)
563 }
564
565 fn expand_tilde_with_home(path: &str, remote_home: Option<&str>) -> String {
569 if !path.starts_with('~') {
570 return path.to_string();
571 }
572
573 let Some(home) = remote_home else {
574 return path.to_string();
575 };
576
577 if path == "~" {
578 home.to_string()
579 } else if let Some(rest) = path.strip_prefix("~/") {
580 format!("{}/{}", home, rest)
581 } else {
582 path.to_string()
584 }
585 }
586
587 pub fn detect_sync_method() -> SyncMethod {
601 if Command::new("rsync")
603 .arg("--version")
604 .output()
605 .map(|o| o.status.success())
606 .unwrap_or(false)
607 {
608 return SyncMethod::Rsync;
609 }
610
611 #[cfg(target_os = "windows")]
613 if Command::new("wsl")
614 .args(["rsync", "--version"])
615 .output()
616 .map(|o| o.status.success())
617 .unwrap_or(false)
618 {
619 return SyncMethod::WslRsync;
620 }
621
622 if Command::new("scp")
625 .arg("-S")
626 .arg("ssh")
627 .arg("--")
628 .output()
631 .is_ok()
632 {
633 if which_scp_exists() {
635 return SyncMethod::Scp;
636 }
637 }
638
639 SyncMethod::Sftp
641 }
642
643 pub fn sync_source(&self, source: &SourceDefinition) -> Result<SyncReport, SyncError> {
648 if !source.is_remote() {
649 return Err(SyncError::NoHost);
650 }
651
652 let host = source.host.as_ref().ok_or(SyncError::NoHost)?;
653
654 if source.paths.is_empty() {
655 return Err(SyncError::NoPaths);
656 }
657
658 source
659 .validate_structure()
660 .map_err(|e| SyncError::InvalidSource(e.to_string()))?;
661
662 let method = Self::detect_sync_method();
663 let mut report = SyncReport::new(&source.name, method);
664 let overall_start = Instant::now();
665
666 let mirror_dir = self.mirror_dir(&source.name);
668 prepare_local_sync_root(&self.local_store, &mirror_dir)
669 .map_err(|e| SyncError::CreateDirFailed(std::io::Error::other(e)))?;
670
671 let remote_home = if source.paths.iter().enumerate().any(|(index, path)| {
673 path.starts_with('~') && validate_remote_sync_path_entry(index, path).is_ok()
674 }) {
675 match self.get_remote_home(host) {
676 Ok(home) => Some(home),
677 Err(e) => {
678 tracing::warn!(host = %host, error = %e, "Failed to get remote home directory");
679 None
680 }
681 }
682 } else {
683 None
684 };
685
686 for (index, remote_path) in source.paths.iter().enumerate() {
687 if let Err(err) = validate_remote_sync_path_entry(index, remote_path) {
688 report.add_path_result(invalid_remote_sync_path_result(remote_path, err));
689 continue;
690 }
691
692 let result = match method {
693 SyncMethod::Rsync => {
694 self.sync_path_rsync(host, remote_path, &mirror_dir, remote_home.as_deref())
695 }
696 SyncMethod::WslRsync => {
697 self.sync_path_wsl_rsync(host, remote_path, &mirror_dir, remote_home.as_deref())
698 }
699 SyncMethod::Scp => {
700 self.sync_path_scp(host, remote_path, &mirror_dir, remote_home.as_deref())
701 }
702 SyncMethod::Sftp => {
703 self.sync_path_sftp(host, remote_path, &mirror_dir, remote_home.as_deref())
704 }
705 };
706 report.add_path_result(result);
707 }
708
709 report.total_duration_ms = overall_start.elapsed().as_millis() as u64;
710 Ok(report)
711 }
712
713 pub fn sync_all(
717 &self,
718 sources: impl Iterator<Item = impl std::borrow::Borrow<SourceDefinition>>,
719 ) -> Vec<SyncReport> {
720 sources
721 .map(|source| {
722 let source = source.borrow();
723 self.sync_source(source)
724 .unwrap_or_else(|e| SyncReport::failed(&source.name, e))
725 })
726 .collect()
727 }
728
729 fn sync_path_rsync(
736 &self,
737 host: &str,
738 remote_path: &str,
739 dest_dir: &Path,
740 remote_home: Option<&str>,
741 ) -> PathSyncResult {
742 let start = Instant::now();
743 if remote_path.starts_with('~') && remote_home.is_none() {
744 let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
745 return PathSyncResult {
746 remote_path: remote_path.to_string(),
747 local_path,
748 success: false,
749 error: Some(
750 "Cannot expand '~' in remote path; failed to determine remote home directory"
751 .to_string(),
752 ),
753 duration_ms: start.elapsed().as_millis() as u64,
754 ..Default::default()
755 };
756 }
757
758 let expanded_path = Self::expand_tilde_with_home(remote_path, remote_home);
760
761 if remote_path.starts_with('~') && expanded_path == remote_path {
763 tracing::warn!(
764 remote_path = %remote_path,
765 "Could not expand tilde in path (remote home directory not available)"
766 );
767 }
768
769 let safe_name = path_to_safe_dirname(remote_path);
772 let local_path = dest_dir.join(&safe_name);
773
774 if let Err(e) = prepare_local_sync_container(dest_dir, &local_path) {
776 return PathSyncResult {
777 remote_path: remote_path.to_string(),
778 local_path: local_path.clone(),
779 success: false,
780 error: Some(e),
781 duration_ms: start.elapsed().as_millis() as u64,
782 ..Default::default()
783 };
784 }
785
786 let arg_protection = detect_rsync_arg_protection();
789 let protect_args_supported = arg_protection.is_supported();
790 let remote_spec = remote_spec_for_rsync(host, &expanded_path, protect_args_supported);
791 let ssh_opts = strict_ssh_command_for_rsync(self.connection_timeout);
792
793 let local_path_str = match local_path.to_str() {
794 Some(s) => s,
795 None => {
796 return PathSyncResult {
797 remote_path: remote_path.to_string(),
798 local_path,
799 success: false,
800 error: Some("Local path contains invalid UTF-8".to_string()),
801 duration_ms: start.elapsed().as_millis() as u64,
802 ..Default::default()
803 };
804 }
805 };
806
807 let timeout_str = self.transfer_timeout.to_string();
808 let mut cmd = Command::new("rsync");
809 cmd.args(["-avz", "--links", "--safe-links", "--stats", "--partial"]);
810 if let Some(flag) = arg_protection.flag() {
811 cmd.arg(flag);
812 }
813 cmd.args([
814 "--timeout",
815 &timeout_str,
816 "-e",
817 &ssh_opts,
818 "--",
819 &remote_spec,
820 local_path_str,
821 ]);
822
823 tracing::debug!(
824 host = %host,
825 remote_path = %expanded_path,
826 local_path = %local_path.display(),
827 "starting rsync"
828 );
829
830 let output = match cmd.output() {
831 Ok(o) => o,
832 Err(e) => {
833 return PathSyncResult {
834 remote_path: remote_path.to_string(),
835 local_path,
836 success: false,
837 error: Some(format!("Failed to execute rsync: {}", e)),
838 duration_ms: start.elapsed().as_millis() as u64,
839 ..Default::default()
840 };
841 }
842 };
843
844 let duration_ms = start.elapsed().as_millis() as u64;
845 let stdout = String::from_utf8_lossy(&output.stdout);
846 let stderr = String::from_utf8_lossy(&output.stderr);
847
848 if !output.status.success() {
849 let error_msg = if stderr.contains("Connection refused")
851 || stderr.contains("Connection timed out")
852 {
853 format!("SSH connection failed: {}", stderr.trim())
854 } else if is_host_key_verification_failure(&stderr) {
855 host_key_verification_error(host)
856 } else if stderr.contains("No such file or directory") {
857 format!("Remote path not found: {}", expanded_path)
858 } else if stderr.contains("Permission denied") {
859 format!("Permission denied: {}", stderr.trim())
860 } else {
861 format!("rsync failed: {}", stderr.trim())
862 };
863
864 tracing::warn!(
865 host = %host,
866 remote_path = %expanded_path,
867 error = %error_msg,
868 "rsync failed"
869 );
870
871 return PathSyncResult {
872 remote_path: remote_path.to_string(),
873 local_path,
874 success: false,
875 error: Some(error_msg),
876 duration_ms,
877 ..Default::default()
878 };
879 }
880
881 let stats = parse_rsync_stats(&stdout);
883
884 tracing::info!(
885 host = %host,
886 remote_path = %expanded_path,
887 files = stats.files_transferred,
888 bytes = stats.bytes_transferred,
889 duration_ms,
890 "rsync completed"
891 );
892
893 PathSyncResult {
894 remote_path: remote_path.to_string(),
895 local_path,
896 files_transferred: stats.files_transferred,
897 bytes_transferred: stats.bytes_transferred,
898 success: true,
899 error: None,
900 duration_ms,
901 }
902 }
903
904 fn sync_path_wsl_rsync(
909 &self,
910 host: &str,
911 remote_path: &str,
912 dest_dir: &Path,
913 remote_home: Option<&str>,
914 ) -> PathSyncResult {
915 let start = Instant::now();
916
917 if remote_path.starts_with('~') && remote_home.is_none() {
918 let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
919 return PathSyncResult {
920 remote_path: remote_path.to_string(),
921 local_path,
922 success: false,
923 error: Some(
924 "Cannot expand '~' in remote path; failed to determine remote home directory"
925 .to_string(),
926 ),
927 duration_ms: start.elapsed().as_millis() as u64,
928 ..Default::default()
929 };
930 }
931
932 let expanded_path = Self::expand_tilde_with_home(remote_path, remote_home);
933 let safe_name = path_to_safe_dirname(remote_path);
934 let local_path = dest_dir.join(&safe_name);
935
936 if let Err(e) = prepare_local_sync_container(dest_dir, &local_path) {
937 return PathSyncResult {
938 remote_path: remote_path.to_string(),
939 local_path,
940 success: false,
941 error: Some(e),
942 duration_ms: start.elapsed().as_millis() as u64,
943 ..Default::default()
944 };
945 }
946
947 let local_path_str = match local_path.to_str() {
948 Some(s) => s,
949 None => {
950 return PathSyncResult {
951 remote_path: remote_path.to_string(),
952 local_path,
953 success: false,
954 error: Some("Local path contains invalid UTF-8".to_string()),
955 duration_ms: start.elapsed().as_millis() as u64,
956 ..Default::default()
957 };
958 }
959 };
960
961 let wsl_dest = windows_path_to_wsl(local_path_str);
965
966 let remote_spec = remote_spec_for_rsync(host, &expanded_path, true);
967 let ssh_opts = strict_ssh_command_for_rsync(self.connection_timeout);
968 let timeout_str = self.transfer_timeout.to_string();
969
970 let mut cmd = Command::new("wsl");
971 cmd.args([
972 "rsync",
973 "-avz",
974 "--links",
975 "--safe-links",
976 "--stats",
977 "--partial",
978 ]);
979 cmd.arg("--protect-args");
981 cmd.args([
982 "--timeout",
983 &timeout_str,
984 "-e",
985 &ssh_opts,
986 "--",
987 &remote_spec,
988 &wsl_dest,
989 ]);
990
991 tracing::debug!(
992 host = %host,
993 remote_path = %expanded_path,
994 local_path = %local_path.display(),
995 wsl_dest = %wsl_dest,
996 "starting wsl rsync"
997 );
998
999 let output = match cmd.output() {
1000 Ok(o) => o,
1001 Err(e) => {
1002 return PathSyncResult {
1003 remote_path: remote_path.to_string(),
1004 local_path,
1005 success: false,
1006 error: Some(format!("Failed to execute wsl rsync: {}", e)),
1007 duration_ms: start.elapsed().as_millis() as u64,
1008 ..Default::default()
1009 };
1010 }
1011 };
1012
1013 let duration_ms = start.elapsed().as_millis() as u64;
1014 let stdout = String::from_utf8_lossy(&output.stdout);
1015 let stderr = String::from_utf8_lossy(&output.stderr);
1016
1017 if !output.status.success() {
1018 let error_msg = if stderr.contains("Connection refused")
1019 || stderr.contains("Connection timed out")
1020 {
1021 format!("SSH connection failed: {}", stderr.trim())
1022 } else if is_host_key_verification_failure(&stderr) {
1023 host_key_verification_error(host)
1024 } else if stderr.contains("No such file or directory") {
1025 format!("Remote path not found: {}", expanded_path)
1026 } else if stderr.contains("Permission denied") {
1027 format!("Permission denied: {}", stderr.trim())
1028 } else {
1029 format!("wsl rsync failed: {}", stderr.trim())
1030 };
1031
1032 tracing::warn!(
1033 host = %host,
1034 remote_path = %expanded_path,
1035 error = %error_msg,
1036 "wsl rsync failed"
1037 );
1038
1039 return PathSyncResult {
1040 remote_path: remote_path.to_string(),
1041 local_path,
1042 success: false,
1043 error: Some(error_msg),
1044 duration_ms,
1045 ..Default::default()
1046 };
1047 }
1048
1049 let stats = parse_rsync_stats(&stdout);
1050
1051 tracing::info!(
1052 host = %host,
1053 remote_path = %expanded_path,
1054 files = stats.files_transferred,
1055 bytes = stats.bytes_transferred,
1056 duration_ms,
1057 "wsl rsync completed"
1058 );
1059
1060 PathSyncResult {
1061 remote_path: remote_path.to_string(),
1062 local_path,
1063 files_transferred: stats.files_transferred,
1064 bytes_transferred: stats.bytes_transferred,
1065 success: true,
1066 error: None,
1067 duration_ms,
1068 }
1069 }
1070
1071 fn sync_path_scp(
1081 &self,
1082 host: &str,
1083 remote_path: &str,
1084 dest_dir: &Path,
1085 remote_home: Option<&str>,
1086 ) -> PathSyncResult {
1087 let start = Instant::now();
1088
1089 if remote_path.starts_with('~') && remote_home.is_none() {
1090 let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
1091 return PathSyncResult {
1092 remote_path: remote_path.to_string(),
1093 local_path,
1094 success: false,
1095 error: Some(
1096 "Cannot expand '~' in remote path; failed to determine remote home directory"
1097 .to_string(),
1098 ),
1099 duration_ms: start.elapsed().as_millis() as u64,
1100 ..Default::default()
1101 };
1102 }
1103
1104 let expanded_path = Self::expand_tilde_with_home(remote_path, remote_home);
1105 let safe_name = path_to_safe_dirname(remote_path);
1106 let local_path = dest_dir.join(&safe_name);
1107
1108 if let Err(e) = prepare_local_sync_container(dest_dir, &local_path) {
1109 return PathSyncResult {
1110 remote_path: remote_path.to_string(),
1111 local_path,
1112 success: false,
1113 error: Some(e),
1114 duration_ms: start.elapsed().as_millis() as u64,
1115 ..Default::default()
1116 };
1117 }
1118
1119 let connect_timeout = self.connection_timeout.to_string();
1122 let find_command = remote_find_regular_files_command(&expanded_path);
1123
1124 tracing::debug!(
1125 host = %host,
1126 remote_path = %expanded_path,
1127 local_path = %local_path.display(),
1128 "listing regular files for scp sync"
1129 );
1130
1131 let timeout_secs = self.connection_timeout.max(1);
1132 let mut cmd = Command::new("ssh");
1133 cmd.args(strict_ssh_cli_tokens(timeout_secs))
1134 .arg("--")
1135 .arg(host)
1136 .arg(&find_command)
1137 .stdout(Stdio::piped())
1138 .stderr(Stdio::piped());
1139
1140 let output = match cmd.spawn().and_then(|child| {
1141 wait_for_child_output_with_timeout(child, Duration::from_secs(timeout_secs))
1142 }) {
1143 Ok(Some(o)) => o,
1144 Ok(None) => {
1145 return PathSyncResult {
1146 remote_path: remote_path.to_string(),
1147 local_path,
1148 success: false,
1149 error: Some(format!(
1150 "SSH file listing timed out after {timeout_secs} seconds"
1151 )),
1152 duration_ms: start.elapsed().as_millis() as u64,
1153 ..Default::default()
1154 };
1155 }
1156 Err(e) => {
1157 return PathSyncResult {
1158 remote_path: remote_path.to_string(),
1159 local_path,
1160 success: false,
1161 error: Some(format!("Failed to execute ssh file listing: {}", e)),
1162 duration_ms: start.elapsed().as_millis() as u64,
1163 ..Default::default()
1164 };
1165 }
1166 };
1167
1168 let stderr = String::from_utf8_lossy(&output.stderr);
1169 if !output.status.success() {
1170 let error_msg = if stderr.contains("Connection refused")
1171 || stderr.contains("Connection timed out")
1172 {
1173 format!("SSH connection failed: {}", stderr.trim())
1174 } else if is_host_key_verification_failure(&stderr) {
1175 host_key_verification_error(host)
1176 } else if stderr.contains("No such file or directory") {
1177 format!("Remote path not found: {}", expanded_path)
1178 } else if stderr.contains("Permission denied") {
1179 format!("Permission denied: {}", stderr.trim())
1180 } else {
1181 format!("Remote file listing failed: {}", stderr.trim())
1182 };
1183
1184 tracing::warn!(
1185 host = %host,
1186 remote_path = %expanded_path,
1187 error = %error_msg,
1188 "scp file listing failed"
1189 );
1190
1191 return PathSyncResult {
1192 remote_path: remote_path.to_string(),
1193 local_path,
1194 success: false,
1195 error: Some(error_msg),
1196 duration_ms: start.elapsed().as_millis() as u64,
1197 ..Default::default()
1198 };
1199 }
1200
1201 let remote_files = parse_null_terminated_utf8_paths(&output.stdout);
1202 let remote_root = Path::new(&expanded_path);
1203 let leaf_name = Path::new(remote_path)
1204 .file_name()
1205 .and_then(|n| n.to_str())
1206 .unwrap_or("remote");
1207 let mut files_transferred = 0u64;
1208 let mut bytes_transferred = 0u64;
1209
1210 for remote_file in remote_files {
1211 let remote_file_path = Path::new(&remote_file);
1212 let Some(local_file) = remote_file_to_safe_local_path(
1213 remote_root,
1214 remote_file_path,
1215 &local_path,
1216 leaf_name,
1217 ) else {
1218 tracing::warn!(
1219 remote_path = %remote_file,
1220 root = %expanded_path,
1221 "skipping scp file outside listed root"
1222 );
1223 continue;
1224 };
1225
1226 if let Err(e) = reject_local_symlink_below_root(&local_path, &local_file) {
1227 return PathSyncResult {
1228 remote_path: remote_path.to_string(),
1229 local_path,
1230 success: false,
1231 error: Some(e),
1232 duration_ms: start.elapsed().as_millis() as u64,
1233 ..Default::default()
1234 };
1235 }
1236
1237 if let Some(parent) = local_file.parent() {
1238 if let Err(e) = std::fs::create_dir_all(parent) {
1239 return PathSyncResult {
1240 remote_path: remote_path.to_string(),
1241 local_path,
1242 success: false,
1243 error: Some(format!("Failed to create {}: {}", parent.display(), e)),
1244 duration_ms: start.elapsed().as_millis() as u64,
1245 ..Default::default()
1246 };
1247 }
1248
1249 if let Err(e) = reject_local_symlink_below_root(&local_path, parent) {
1250 return PathSyncResult {
1251 remote_path: remote_path.to_string(),
1252 local_path,
1253 success: false,
1254 error: Some(e),
1255 duration_ms: start.elapsed().as_millis() as u64,
1256 ..Default::default()
1257 };
1258 }
1259 }
1260
1261 if let Err(e) = reject_local_symlink_below_root(&local_path, &local_file) {
1262 return PathSyncResult {
1263 remote_path: remote_path.to_string(),
1264 local_path,
1265 success: false,
1266 error: Some(e),
1267 duration_ms: start.elapsed().as_millis() as u64,
1268 ..Default::default()
1269 };
1270 }
1271
1272 let temp_path =
1273 unique_atomic_sidecar_path(&local_file, "download", "cass-sync-scp-download");
1274 let Some(temp_path_str) = temp_path.to_str() else {
1275 return PathSyncResult {
1276 remote_path: remote_path.to_string(),
1277 local_path,
1278 success: false,
1279 error: Some("Local path contains invalid UTF-8".to_string()),
1280 duration_ms: start.elapsed().as_millis() as u64,
1281 ..Default::default()
1282 };
1283 };
1284 if let Err(e) = std::fs::OpenOptions::new()
1285 .write(true)
1286 .create_new(true)
1287 .open(&temp_path)
1288 .and_then(|file| file.sync_all())
1289 {
1290 return PathSyncResult {
1291 remote_path: remote_path.to_string(),
1292 local_path,
1293 success: false,
1294 error: Some(format!("Failed to create {}: {}", temp_path.display(), e)),
1295 duration_ms: start.elapsed().as_millis() as u64,
1296 ..Default::default()
1297 };
1298 }
1299
1300 let remote_spec = remote_spec_for_scp(host, &remote_file);
1301 let mut cmd = Command::new("scp");
1302 cmd.args([
1303 "-B",
1304 "-o",
1305 &format!("ConnectTimeout={}", connect_timeout),
1306 "-o",
1307 "ServerAliveInterval=15",
1308 "-o",
1309 "ServerAliveCountMax=3",
1310 "-o",
1311 "StrictHostKeyChecking=yes",
1312 "--",
1313 &remote_spec,
1314 temp_path_str,
1315 ]);
1316
1317 let output = match cmd.output() {
1318 Ok(o) => o,
1319 Err(e) => {
1320 return PathSyncResult {
1321 remote_path: remote_path.to_string(),
1322 local_path,
1323 success: false,
1324 error: Some(format!("Failed to execute scp: {}", e)),
1325 duration_ms: start.elapsed().as_millis() as u64,
1326 ..Default::default()
1327 };
1328 }
1329 };
1330
1331 if !output.status.success() {
1332 let _ = std::fs::remove_file(&temp_path);
1333 let stderr = String::from_utf8_lossy(&output.stderr);
1334 let error_msg = if is_host_key_verification_failure(&stderr) {
1335 host_key_verification_error(host)
1336 } else if stderr.contains("Permission denied") {
1337 format!("Permission denied: {}", stderr.trim())
1338 } else {
1339 format!("scp failed: {}", stderr.trim())
1340 };
1341
1342 tracing::warn!(
1343 host = %host,
1344 remote_path = %remote_file,
1345 error = %error_msg,
1346 "scp file transfer failed"
1347 );
1348
1349 return PathSyncResult {
1350 remote_path: remote_path.to_string(),
1351 local_path,
1352 success: false,
1353 error: Some(error_msg),
1354 duration_ms: start.elapsed().as_millis() as u64,
1355 ..Default::default()
1356 };
1357 }
1358
1359 files_transferred += 1;
1360 if let Err(e) = sync_file_path(&temp_path) {
1361 return PathSyncResult {
1362 remote_path: remote_path.to_string(),
1363 local_path,
1364 success: false,
1365 error: Some(format!("Failed to sync {}: {}", temp_path.display(), e)),
1366 duration_ms: start.elapsed().as_millis() as u64,
1367 ..Default::default()
1368 };
1369 }
1370 if let Ok(metadata) = std::fs::metadata(&temp_path) {
1371 bytes_transferred = bytes_transferred.saturating_add(metadata.len());
1372 }
1373 if let Err(e) = replace_file_from_temp(&temp_path, &local_file) {
1374 return PathSyncResult {
1375 remote_path: remote_path.to_string(),
1376 local_path,
1377 success: false,
1378 error: Some(format!(
1379 "Failed to publish {} to {}: {}",
1380 temp_path.display(),
1381 local_file.display(),
1382 e
1383 )),
1384 duration_ms: start.elapsed().as_millis() as u64,
1385 ..Default::default()
1386 };
1387 }
1388 }
1389
1390 let duration_ms = start.elapsed().as_millis() as u64;
1391
1392 tracing::info!(
1393 host = %host,
1394 remote_path = %expanded_path,
1395 files = files_transferred,
1396 bytes = bytes_transferred,
1397 duration_ms,
1398 "scp sync completed"
1399 );
1400
1401 PathSyncResult {
1402 remote_path: remote_path.to_string(),
1403 local_path,
1404 files_transferred,
1405 bytes_transferred,
1406 success: true,
1407 error: None,
1408 duration_ms,
1409 }
1410 }
1411
1412 fn sync_path_sftp(
1417 &self,
1418 host: &str,
1419 remote_path: &str,
1420 dest_dir: &Path,
1421 remote_home: Option<&str>,
1422 ) -> PathSyncResult {
1423 let start = Instant::now();
1424 if remote_path.starts_with('~') && remote_home.is_none() {
1425 let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
1426 return PathSyncResult {
1427 remote_path: remote_path.to_string(),
1428 local_path,
1429 success: false,
1430 error: Some(
1431 "Cannot expand '~' in remote path; failed to determine remote home directory"
1432 .to_string(),
1433 ),
1434 duration_ms: start.elapsed().as_millis() as u64,
1435 ..Default::default()
1436 };
1437 }
1438 let expanded_path = Self::expand_tilde_with_home(remote_path, remote_home);
1439 let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
1441
1442 if let Err(e) = prepare_local_sync_container(dest_dir, &local_path) {
1444 return PathSyncResult {
1445 remote_path: remote_path.to_string(),
1446 local_path,
1447 success: false,
1448 error: Some(e),
1449 duration_ms: start.elapsed().as_millis() as u64,
1450 ..Default::default()
1451 };
1452 }
1453
1454 let (ssh_user, ssh_host) = parse_ssh_host(host);
1456
1457 let ssh_config = discover_ssh_hosts();
1460 let host_config = ssh_config.iter().find(|h| h.name == ssh_host).or_else(|| {
1461 ssh_config
1462 .iter()
1463 .find(|h| h.hostname.as_deref() == Some(ssh_host))
1464 });
1465
1466 let hostname = host_config
1468 .and_then(|h| h.hostname.as_deref())
1469 .unwrap_or(ssh_host);
1470 let port = host_config.and_then(|h| h.port).unwrap_or(22);
1471 let username = match first_nonblank_username([
1473 ssh_user,
1474 host_config.and_then(|h| h.user.as_deref()),
1475 ])
1476 .or_else(|| env_username("USER"))
1477 .or_else(|| env_username("LOGNAME"))
1478 {
1479 Some(user) => user,
1480 None => {
1481 return PathSyncResult {
1482 remote_path: remote_path.to_string(),
1483 local_path,
1484 success: false,
1485 error: Some(format!(
1486 "Unable to determine SSH username for host '{}' (missing/blank user@host, SSH config user, USER, and LOGNAME)",
1487 host
1488 )),
1489 duration_ms: start.elapsed().as_millis() as u64,
1490 ..Default::default()
1491 };
1492 }
1493 };
1494 let identity_file = host_config.and_then(|h| h.identity_file.as_deref());
1495
1496 tracing::debug!(
1497 hostname = %hostname,
1498 port,
1499 username = %username,
1500 identity_file = ?identity_file,
1501 remote_path = %expanded_path,
1502 "SFTP connection parameters"
1503 );
1504
1505 let conn_timeout = std::time::Duration::from_secs(self.connection_timeout);
1507 let addr = format!("{}:{}", hostname, port);
1508 let sock_addr: std::net::SocketAddr = match addr.parse().or_else(|_| {
1509 use std::net::ToSocketAddrs;
1511 (hostname, port)
1512 .to_socket_addrs()
1513 .ok()
1514 .and_then(|mut addrs| addrs.next())
1515 .ok_or(std::io::Error::new(
1516 std::io::ErrorKind::InvalidInput,
1517 "cannot resolve hostname",
1518 ))
1519 }) {
1520 Ok(a) => a,
1521 Err(e) => {
1522 return PathSyncResult {
1523 remote_path: remote_path.to_string(),
1524 local_path,
1525 success: false,
1526 error: Some(format!("DNS resolution failed for {hostname}:{port}: {e}")),
1527 duration_ms: start.elapsed().as_millis() as u64,
1528 ..Default::default()
1529 };
1530 }
1531 };
1532 let tcp = match TcpStream::connect_timeout(&sock_addr, conn_timeout) {
1533 Ok(t) => t,
1534 Err(e) => {
1535 return PathSyncResult {
1536 remote_path: remote_path.to_string(),
1537 local_path,
1538 success: false,
1539 error: Some(format!(
1540 "TCP connection failed to {}:{}: {}",
1541 hostname, port, e
1542 )),
1543 duration_ms: start.elapsed().as_millis() as u64,
1544 ..Default::default()
1545 };
1546 }
1547 };
1548
1549 let timeout = std::time::Duration::from_secs(self.transfer_timeout);
1551 if let Err(e) = tcp.set_read_timeout(Some(timeout)) {
1552 tracing::warn!("Failed to set TCP read timeout: {}", e);
1553 }
1554 if let Err(e) = tcp.set_write_timeout(Some(timeout)) {
1555 tracing::warn!("Failed to set TCP write timeout: {}", e);
1556 }
1557 let tcp_shutdown = tcp.try_clone().ok();
1558
1559 let mut session = match Session::new() {
1561 Ok(s) => s,
1562 Err(e) => {
1563 let _ = tcp.shutdown(Shutdown::Both);
1564 return PathSyncResult {
1565 remote_path: remote_path.to_string(),
1566 local_path,
1567 success: false,
1568 error: Some(format!("Failed to create SSH session: {}", e)),
1569 duration_ms: start.elapsed().as_millis() as u64,
1570 ..Default::default()
1571 };
1572 }
1573 };
1574
1575 session.set_tcp_stream(tcp);
1576 let close_connections = |session: &mut Session, reason: &str| {
1577 let _ = session.disconnect(None, reason, None);
1578 if let Some(stream) = tcp_shutdown.as_ref() {
1579 let _ = stream.shutdown(Shutdown::Both);
1580 }
1581 };
1582
1583 if let Err(e) = session.handshake() {
1584 close_connections(&mut session, "handshake failed");
1585 return PathSyncResult {
1586 remote_path: remote_path.to_string(),
1587 local_path,
1588 success: false,
1589 error: Some(format!("SSH handshake failed: {}", e)),
1590 duration_ms: start.elapsed().as_millis() as u64,
1591 ..Default::default()
1592 };
1593 }
1594
1595 if let Err(e) = self.authenticate_ssh(&session, &username, identity_file) {
1597 close_connections(&mut session, "authentication failed");
1598 return PathSyncResult {
1599 remote_path: remote_path.to_string(),
1600 local_path,
1601 success: false,
1602 error: Some(format!("SSH authentication failed: {}", e)),
1603 duration_ms: start.elapsed().as_millis() as u64,
1604 ..Default::default()
1605 };
1606 }
1607
1608 let sftp = match session.sftp() {
1610 Ok(s) => s,
1611 Err(e) => {
1612 close_connections(&mut session, "sftp open failed");
1613 return PathSyncResult {
1614 remote_path: remote_path.to_string(),
1615 local_path,
1616 success: false,
1617 error: Some(format!("Failed to open SFTP session: {}", e)),
1618 duration_ms: start.elapsed().as_millis() as u64,
1619 ..Default::default()
1620 };
1621 }
1622 };
1623
1624 tracing::info!(
1625 host = %host,
1626 remote_path = %expanded_path,
1627 local_path = %local_path.display(),
1628 "starting SFTP sync"
1629 );
1630
1631 let mut files_transferred = 0u64;
1633 let mut bytes_transferred = 0u64;
1634
1635 let leaf_name = Path::new(remote_path)
1638 .file_name()
1639 .and_then(|n| n.to_str())
1640 .unwrap_or("remote");
1641 let target_local_path = local_path.join(leaf_name);
1642
1643 if let Err(e) = self.sftp_download_recursive(
1644 &sftp,
1645 Path::new(&expanded_path),
1646 &target_local_path,
1647 &local_path,
1648 &mut files_transferred,
1649 &mut bytes_transferred,
1650 ) {
1651 close_connections(&mut session, "sftp download failed");
1652 return PathSyncResult {
1653 remote_path: remote_path.to_string(),
1654 local_path,
1655 files_transferred,
1656 bytes_transferred,
1657 success: false,
1658 error: Some(format!("SFTP download failed: {}", e)),
1659 duration_ms: start.elapsed().as_millis() as u64,
1660 };
1661 }
1662
1663 let duration_ms = start.elapsed().as_millis() as u64;
1664
1665 tracing::info!(
1666 host = %host,
1667 remote_path = %expanded_path,
1668 files = files_transferred,
1669 bytes = bytes_transferred,
1670 duration_ms,
1671 "SFTP sync completed"
1672 );
1673
1674 close_connections(&mut session, "sync complete");
1675 PathSyncResult {
1676 remote_path: remote_path.to_string(),
1677 local_path,
1678 files_transferred,
1679 bytes_transferred,
1680 success: true,
1681 error: None,
1682 duration_ms,
1683 }
1684 }
1685
1686 fn authenticate_ssh(
1688 &self,
1689 session: &Session,
1690 username: &str,
1691 identity_file: Option<&str>,
1692 ) -> Result<(), String> {
1693 if let Ok(mut agent) = session.agent()
1695 && agent.connect().is_ok()
1696 && agent.list_identities().is_ok()
1697 {
1698 for identity in agent.identities().unwrap_or_default() {
1699 if agent.userauth(username, &identity).is_ok() && session.authenticated() {
1700 tracing::debug!("Authenticated via SSH agent");
1701 return Ok(());
1702 }
1703 }
1704 }
1705
1706 if let Some(key_path) = identity_file {
1708 let key_path_expanded = expand_tilde_local(key_path);
1709 let key_path_buf = Path::new(&key_path_expanded);
1710
1711 if key_path_buf.exists()
1712 && session
1713 .userauth_pubkey_file(username, None, key_path_buf, None)
1714 .is_ok()
1715 && session.authenticated()
1716 {
1717 tracing::debug!(key = %key_path_buf.display(), "Authenticated via key file");
1718 return Ok(());
1719 }
1720 }
1721
1722 if let Some(home) = dirs::home_dir() {
1724 for key_name in ["id_ed25519", "id_rsa", "id_ecdsa"] {
1725 let key_path = home.join(".ssh").join(key_name);
1726 if key_path.exists()
1727 && session
1728 .userauth_pubkey_file(username, None, &key_path, None)
1729 .is_ok()
1730 && session.authenticated()
1731 {
1732 tracing::debug!(key = %key_path.display(), "Authenticated via default key");
1733 return Ok(());
1734 }
1735 }
1736 }
1737
1738 Err(format!(
1739 "No valid authentication method found for user '{}'",
1740 username
1741 ))
1742 }
1743
1744 fn sftp_download_recursive(
1746 &self,
1747 sftp: &Sftp,
1748 remote_path: &Path,
1749 local_path: &Path,
1750 local_root: &Path,
1751 files_transferred: &mut u64,
1752 bytes_transferred: &mut u64,
1753 ) -> Result<(), String> {
1754 let stat = sftp
1757 .lstat(remote_path)
1758 .map_err(|e| format!("Failed to lstat {}: {}", remote_path.display(), e))?;
1759
1760 if sftp_file_stat_is_symlink(&stat) {
1761 tracing::warn!(
1762 path = %remote_path.display(),
1763 "Skipping remote symlink during SFTP sync"
1764 );
1765 return Ok(());
1766 }
1767
1768 if stat.is_dir() {
1769 reject_local_symlink_below_root(local_root, local_path)?;
1771 std::fs::create_dir_all(local_path)
1772 .map_err(|e| format!("Failed to create {}: {}", local_path.display(), e))?;
1773 reject_local_symlink_below_root(local_root, local_path)?;
1774
1775 let entries = sftp
1777 .readdir(remote_path)
1778 .map_err(|e| format!("Failed to list {}: {}", remote_path.display(), e))?;
1779
1780 for (entry_path, _entry_stat) in entries {
1781 let Some(file_name) = sftp_entry_file_name(&entry_path, remote_path) else {
1782 continue;
1783 };
1784
1785 let entry_stat = sftp
1786 .lstat(&entry_path)
1787 .map_err(|e| format!("Failed to lstat {}: {}", entry_path.display(), e))?;
1788 if sftp_file_stat_is_symlink(&entry_stat) {
1789 tracing::warn!(
1790 path = %entry_path.display(),
1791 "Skipping remote symlink during SFTP sync"
1792 );
1793 continue;
1794 }
1795
1796 let local_entry = local_path.join(file_name);
1797
1798 if entry_stat.is_dir() {
1799 self.sftp_download_recursive(
1801 sftp,
1802 &entry_path,
1803 &local_entry,
1804 local_root,
1805 files_transferred,
1806 bytes_transferred,
1807 )?;
1808 } else if entry_stat.is_file() {
1809 if self.sftp_download_file(
1811 sftp,
1812 &entry_path,
1813 &local_entry,
1814 local_root,
1815 bytes_transferred,
1816 )? {
1817 *files_transferred += 1;
1818 }
1819 }
1820 }
1822 } else if stat.is_file() {
1823 if let Some(parent) = local_path.parent() {
1825 reject_local_symlink_below_root(local_root, parent)?;
1826 std::fs::create_dir_all(parent).map_err(|e| {
1827 format!("Failed to create local dir {}: {}", parent.display(), e)
1828 })?;
1829 reject_local_symlink_below_root(local_root, parent)?;
1830 }
1831
1832 if self.sftp_download_file(
1833 sftp,
1834 remote_path,
1835 local_path,
1836 local_root,
1837 bytes_transferred,
1838 )? {
1839 *files_transferred += 1;
1840 }
1841 } else {
1842 tracing::warn!(
1844 path = %remote_path.display(),
1845 "Skipping remote path: not a regular file or directory"
1846 );
1847 }
1848
1849 Ok(())
1850 }
1851
1852 fn sftp_download_file(
1854 &self,
1855 sftp: &Sftp,
1856 remote_path: &Path,
1857 local_path: &Path,
1858 local_root: &Path,
1859 bytes_transferred: &mut u64,
1860 ) -> Result<bool, String> {
1861 let stat = sftp
1862 .lstat(remote_path)
1863 .map_err(|e| format!("Failed to lstat {}: {}", remote_path.display(), e))?;
1864 if sftp_file_stat_is_symlink(&stat) {
1865 tracing::warn!(
1866 path = %remote_path.display(),
1867 "Skipping remote symlink during SFTP sync"
1868 );
1869 return Ok(false);
1870 }
1871 if !stat.is_file() {
1872 tracing::warn!(
1873 path = %remote_path.display(),
1874 "Skipping remote path: not a regular file"
1875 );
1876 return Ok(false);
1877 }
1878
1879 let mut remote_file = sftp
1880 .open(remote_path)
1881 .map_err(|e| format!("Failed to open {}: {}", remote_path.display(), e))?;
1882
1883 reject_local_symlink_below_root(local_root, local_path)?;
1884
1885 let temp_path = unique_atomic_sidecar_path(local_path, "download", "cass-sync-download");
1886 let mut local_file = std::fs::OpenOptions::new()
1887 .write(true)
1888 .create_new(true)
1889 .open(&temp_path)
1890 .map_err(|e| format!("Failed to create {}: {}", temp_path.display(), e))?;
1891
1892 let mut buffer = [0u8; 32768]; loop {
1895 let bytes_read = remote_file
1896 .read(&mut buffer)
1897 .map_err(|e| format!("Failed to read {}: {}", remote_path.display(), e))?;
1898
1899 if bytes_read == 0 {
1900 break;
1901 }
1902
1903 local_file
1904 .write_all(&buffer[..bytes_read])
1905 .map_err(|e| format!("Failed to write {}: {}", local_path.display(), e))?;
1906
1907 *bytes_transferred += bytes_read as u64;
1908 }
1909
1910 tracing::trace!(
1911 remote = %remote_path.display(),
1912 local = %local_path.display(),
1913 "downloaded file"
1914 );
1915
1916 local_file
1917 .sync_all()
1918 .map_err(|e| format!("Failed to sync {}: {}", temp_path.display(), e))?;
1919 drop(local_file);
1920 replace_file_from_temp(&temp_path, local_path).map_err(|e| {
1921 format!(
1922 "Failed to publish {} to {}: {}",
1923 temp_path.display(),
1924 local_path.display(),
1925 e
1926 )
1927 })?;
1928
1929 Ok(true)
1930 }
1931}
1932
1933fn sftp_entry_file_name<'a>(entry_path: &'a Path, parent_path: &Path) -> Option<&'a str> {
1935 let Some(file_name) = entry_path.file_name() else {
1936 tracing::warn!(
1937 parent = %parent_path.display(),
1938 entry = ?entry_path,
1939 "Skipping SFTP entry without a file name"
1940 );
1941 return None;
1942 };
1943
1944 let Some(file_name) = file_name.to_str() else {
1945 tracing::warn!(
1946 parent = %parent_path.display(),
1947 entry = ?entry_path,
1948 "Skipping SFTP entry with non-UTF-8 file name"
1949 );
1950 return None;
1951 };
1952
1953 if file_name.is_empty() {
1954 tracing::warn!(
1955 parent = %parent_path.display(),
1956 entry = ?entry_path,
1957 "Skipping SFTP entry with empty file name"
1958 );
1959 return None;
1960 }
1961
1962 if file_name == "." || file_name == ".." {
1963 return None;
1964 }
1965
1966 Some(file_name)
1967}
1968
1969fn which_scp_exists() -> bool {
1974 std::env::var_os("PATH")
1975 .map(|path_var| {
1976 std::env::split_paths(&path_var).any(|dir| {
1977 let candidate = dir.join(if cfg!(target_os = "windows") {
1978 "scp.exe"
1979 } else {
1980 "scp"
1981 });
1982 candidate.is_file()
1983 })
1984 })
1985 .unwrap_or(false)
1986}
1987
1988fn windows_path_to_wsl(path: &str) -> String {
1995 if path.len() >= 3 {
1997 let bytes = path.as_bytes();
1998 if bytes[1] == b':' && (bytes[2] == b'\\' || bytes[2] == b'/') {
1999 let drive = (bytes[0] as char).to_lowercase().next().unwrap_or('c');
2000 let rest = path[3..].replace('\\', "/");
2001 return format!("/mnt/{}/{}", drive, rest);
2002 }
2003 }
2004 path.to_string()
2005}
2006
2007fn parse_ssh_host(host: &str) -> (Option<&str>, &str) {
2013 if let Some(at_pos) = host.find('@') {
2014 let user = &host[..at_pos];
2015 let hostname = &host[at_pos + 1..];
2016 (Some(user), hostname)
2017 } else {
2018 (None, host)
2019 }
2020}
2021
2022fn first_nonblank_username<'a>(
2023 candidates: impl IntoIterator<Item = Option<&'a str>>,
2024) -> Option<String> {
2025 candidates.into_iter().find_map(|candidate| {
2026 let trimmed = candidate?.trim();
2027 if trimmed.is_empty() {
2028 None
2029 } else {
2030 Some(trimmed.to_string())
2031 }
2032 })
2033}
2034
2035fn env_username(key: &str) -> Option<String> {
2036 dotenvy::var(key)
2037 .ok()
2038 .and_then(|value| first_nonblank_username([Some(value.as_str())]))
2039}
2040
2041fn expand_tilde_local(path: &str) -> String {
2043 if let Some(stripped) = path.strip_prefix("~/")
2044 && let Some(home) = dirs::home_dir()
2045 {
2046 return format!("{}/{}", home.display(), stripped);
2047 } else if path == "~"
2048 && let Some(home) = dirs::home_dir()
2049 {
2050 return home.display().to_string();
2051 }
2052 path.to_string()
2053}
2054
2055pub fn path_to_safe_dirname(path: &str) -> String {
2064 use std::path::{Component, Path};
2065
2066 let path_obj = Path::new(path);
2067 let mut parts: Vec<&str> = Vec::new();
2068
2069 for component in path_obj.components() {
2070 match component {
2071 Component::Normal(name) => {
2072 if let Some(s) = name.to_str() {
2073 if !s.is_empty() && s != "." && s != "~" {
2075 parts.push(s);
2076 }
2077 }
2078 }
2079 Component::ParentDir
2081 | Component::CurDir
2082 | Component::RootDir
2083 | Component::Prefix(_) => {}
2084 }
2085 }
2086
2087 let cleaned = parts.join("_").replace([' ', '\\'], "_");
2088
2089 let hash = fnv1a_hash(path);
2091 let hash_suffix = format!("{:08x}", hash);
2092
2093 if cleaned.is_empty() {
2094 format!("root_{}", hash_suffix)
2095 } else {
2096 format!("{}_{}", cleaned, hash_suffix)
2097 }
2098}
2099
2100fn fnv1a_hash(text: &str) -> u64 {
2101 let mut hash: u64 = 0xcbf29ce484222325;
2102 for byte in text.bytes() {
2103 hash ^= u64::from(byte);
2104 hash = hash.wrapping_mul(0x100000001b3);
2105 }
2106 hash
2107}
2108
2109fn parse_rsync_stats(output: &str) -> RsyncStats {
2111 let mut stats = RsyncStats::default();
2112
2113 for line in output.lines() {
2114 let line = line.trim();
2115
2116 if line.starts_with("Number of regular files transferred:")
2118 && let Some(num_str) = line.split(':').nth(1)
2119 {
2120 stats.files_transferred = num_str.trim().replace(',', "").parse().unwrap_or(0);
2121 }
2122
2123 if line.starts_with("Total transferred file size:")
2125 && let Some(size_part) = line.split(':').nth(1)
2126 {
2127 let size_str = size_part
2129 .split_whitespace()
2130 .next()
2131 .unwrap_or("0")
2132 .replace(',', "");
2133 stats.bytes_transferred = size_str.parse().unwrap_or(0);
2134 }
2135 }
2136
2137 stats
2138}
2139
2140#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
2146#[serde(rename_all = "snake_case")]
2147pub enum SyncResult {
2148 Success,
2150 PartialFailure(String),
2152 Failed(String),
2154 #[default]
2156 Skipped,
2157}
2158
2159impl SyncResult {
2160 pub fn label(&self) -> &'static str {
2162 match self {
2163 Self::Success => "success",
2164 Self::PartialFailure(_) => "partial",
2165 Self::Failed(_) => "failed",
2166 Self::Skipped => "never",
2167 }
2168 }
2169
2170 pub fn error_message(&self) -> Option<&str> {
2172 match self {
2173 Self::PartialFailure(error) | Self::Failed(error) => Some(error.as_str()),
2174 Self::Success | Self::Skipped => None,
2175 }
2176 }
2177}
2178
2179#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
2181#[serde(rename_all = "snake_case")]
2182pub enum SourceSyncAction {
2183 Sync,
2185 Skip,
2187 Defer,
2189}
2190
2191impl SourceSyncAction {
2192 pub fn as_str(self) -> &'static str {
2193 match self {
2194 Self::Sync => "sync",
2195 Self::Skip => "skip",
2196 Self::Defer => "defer",
2197 }
2198 }
2199}
2200
2201#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
2203#[serde(rename_all = "snake_case")]
2204pub enum SourceHealthKind {
2205 NeverSynced,
2206 Healthy,
2207 Stale,
2208 HighLatency,
2209 Flapping,
2210 AuthFailed,
2211 BackingOff,
2212}
2213
2214impl SourceHealthKind {
2215 pub fn as_str(self) -> &'static str {
2216 match self {
2217 Self::NeverSynced => "never_synced",
2218 Self::Healthy => "healthy",
2219 Self::Stale => "stale",
2220 Self::HighLatency => "high_latency",
2221 Self::Flapping => "flapping",
2222 Self::AuthFailed => "auth_failed",
2223 Self::BackingOff => "backing_off",
2224 }
2225 }
2226}
2227
2228#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
2230pub struct SourceSyncDecision {
2231 pub action: SourceSyncAction,
2233 pub health: SourceHealthKind,
2235 pub health_score: u8,
2237 pub staleness_ms: Option<i64>,
2239 pub stale_value_score: u8,
2241 pub manual_override: bool,
2243 pub fallback_active: bool,
2245 pub next_eligible_sync_ms: Option<i64>,
2247 pub backoff_until_ms: Option<i64>,
2249 pub reasons: Vec<String>,
2251}
2252
2253impl SourceSyncDecision {
2254 fn evaluate(
2255 source: &SourceDefinition,
2256 info: Option<&SourceSyncInfo>,
2257 now_ms: i64,
2258 manual_override: bool,
2259 ) -> Self {
2260 let period_ms = sync_schedule_period_ms(source.sync_schedule);
2261 let next_eligible_sync_ms = info
2262 .and_then(|info| info.last_sync)
2263 .and_then(|last_sync| period_ms.map(|period| last_sync.saturating_add(period)));
2264 let backoff_until_ms = info.and_then(failure_backoff_until_ms);
2265 let staleness_ms = info.and_then(|info| {
2266 info.last_sync
2267 .map(|last_sync| now_ms.saturating_sub(last_sync).max(0))
2268 });
2269 let stale_value_score =
2270 stale_value_score_for_source(source.sync_schedule, staleness_ms, info);
2271 let mut reasons = Vec::new();
2272
2273 let health = match info {
2274 None => {
2275 reasons.push("no durable sync status exists for this source".to_string());
2276 SourceHealthKind::NeverSynced
2277 }
2278 Some(info) if info.last_sync.is_none() => {
2279 reasons.push("source has never completed or attempted a sync".to_string());
2280 SourceHealthKind::NeverSynced
2281 }
2282 Some(info) if sync_result_auth_failure(&info.last_result) => {
2283 reasons
2284 .push("last sync failed with an authentication or host-key error".to_string());
2285 SourceHealthKind::AuthFailed
2286 }
2287 Some(info) if matches!(info.last_result, SyncResult::PartialFailure(_)) => {
2288 reasons.push("last sync partially succeeded and partially failed".to_string());
2289 SourceHealthKind::Flapping
2290 }
2291 Some(info)
2292 if info.consecutive_failures > 0
2293 && backoff_until_ms.is_some_and(|until| until > now_ms) =>
2294 {
2295 reasons.push(format!(
2296 "{} consecutive failure(s) are inside retry backoff",
2297 info.consecutive_failures
2298 ));
2299 SourceHealthKind::BackingOff
2300 }
2301 Some(info) if matches!(info.last_result, SyncResult::Failed(_)) => {
2302 let error = info.last_result.error_message().unwrap_or("unknown error");
2303 reasons.push(format!(
2304 "last sync failed completely ({error}); local fallback remains active"
2305 ));
2306 SourceHealthKind::Flapping
2307 }
2308 Some(info) if info.duration_ms >= SOURCE_HIGH_LATENCY_MS => {
2309 reasons.push(format!(
2310 "last sync took {}ms, above {}ms high-latency guard",
2311 info.duration_ms, SOURCE_HIGH_LATENCY_MS
2312 ));
2313 SourceHealthKind::HighLatency
2314 }
2315 Some(info) if sync_schedule_due(info.last_sync, period_ms, now_ms) => {
2316 reasons.push("configured sync schedule is due".to_string());
2317 SourceHealthKind::Stale
2318 }
2319 Some(_) => SourceHealthKind::Healthy,
2320 };
2321
2322 let fallback_active = matches!(
2323 health,
2324 SourceHealthKind::AuthFailed
2325 | SourceHealthKind::BackingOff
2326 | SourceHealthKind::Flapping
2327 | SourceHealthKind::HighLatency
2328 );
2329
2330 let mut action = if manual_override {
2331 reasons.push("explicit sync command overrides automatic scheduling".to_string());
2332 SourceSyncAction::Sync
2333 } else {
2334 automatic_source_sync_action(source.sync_schedule, health, info, now_ms)
2335 };
2336
2337 if !manual_override && matches!(health, SourceHealthKind::AuthFailed) {
2338 action = SourceSyncAction::Defer;
2339 }
2340
2341 if !manual_override && matches!(source.sync_schedule, SyncSchedule::Manual) {
2342 reasons.push("sync_schedule=manual requires an explicit sync command".to_string());
2343 }
2344
2345 if !manual_override
2346 && matches!(action, SourceSyncAction::Skip)
2347 && let Some(next_ms) = next_eligible_sync_ms
2348 {
2349 reasons.push(format!(
2350 "next scheduled sync is eligible at unix_ms={next_ms}"
2351 ));
2352 }
2353
2354 if reasons.is_empty() {
2355 reasons.push("source is healthy and within schedule".to_string());
2356 }
2357
2358 Self {
2359 action,
2360 health,
2361 health_score: health_score_for_source(health),
2362 staleness_ms,
2363 stale_value_score,
2364 manual_override,
2365 fallback_active,
2366 next_eligible_sync_ms,
2367 backoff_until_ms,
2368 reasons,
2369 }
2370 }
2371}
2372
2373#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
2375pub struct SourceSyncInfo {
2376 pub last_sync: Option<i64>,
2378 pub last_result: SyncResult,
2380 pub files_synced: u64,
2382 pub bytes_transferred: u64,
2384 pub duration_ms: u64,
2386 #[serde(default)]
2388 pub consecutive_failures: u32,
2389}
2390
2391impl SourceSyncInfo {
2392 pub fn from_report(report: &SyncReport) -> Self {
2394 let last_result = report.sync_result();
2395 Self {
2396 last_sync: Some(current_unix_ms()),
2397 consecutive_failures: u32::from(!report.all_succeeded),
2398 last_result,
2399 files_synced: report.total_files(),
2400 bytes_transferred: report.total_bytes(),
2401 duration_ms: report.total_duration_ms,
2402 }
2403 }
2404}
2405
2406#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
2408pub struct SyncStatus {
2409 pub sources: std::collections::HashMap<String, SourceSyncInfo>,
2411}
2412
2413impl SyncStatus {
2414 pub fn load(data_dir: &Path) -> Result<Self, std::io::Error> {
2416 let path = Self::status_path(data_dir);
2417 match std::fs::read_to_string(&path) {
2418 Ok(content) => serde_json::from_str(&content)
2419 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
2420 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()),
2421 Err(e) => Err(e),
2422 }
2423 }
2424
2425 pub fn save(&self, data_dir: &Path) -> Result<(), std::io::Error> {
2430 let path = Self::status_path(data_dir);
2431 if let Some(parent) = path.parent() {
2432 std::fs::create_dir_all(parent)?;
2433 }
2434 let content = serde_json::to_string_pretty(self)?;
2435 let tmp_path = unique_atomic_temp_path(&path);
2436 std::fs::write(&tmp_path, content)?;
2437 sync_file_path(&tmp_path)?;
2438 replace_file_from_temp(&tmp_path, &path)
2439 }
2440
2441 pub fn update(&mut self, source_name: &str, report: &SyncReport) {
2443 let previous_failures = self
2444 .get(source_name)
2445 .map(|info| info.consecutive_failures)
2446 .unwrap_or_default();
2447 let mut info = SourceSyncInfo::from_report(report);
2448 if report.all_succeeded {
2449 info.consecutive_failures = 0;
2450 } else {
2451 info.consecutive_failures = previous_failures.saturating_add(1);
2452 }
2453 self.set_info(source_name, info);
2454 }
2455
2456 pub fn set_info(&mut self, source_name: &str, info: SourceSyncInfo) {
2458 self.sources.insert(source_name.to_string(), info);
2459 }
2460
2461 pub fn retain_sources<'a>(&mut self, source_names: impl IntoIterator<Item = &'a str>) -> bool {
2465 let allowed: std::collections::HashSet<&str> = source_names.into_iter().collect();
2466 let previous_len = self.sources.len();
2467 self.sources
2468 .retain(|source_name, _| allowed.contains(source_name.as_str()));
2469 self.sources.len() != previous_len
2470 }
2471
2472 pub fn get(&self, source_name: &str) -> Option<&SourceSyncInfo> {
2474 self.sources.get(source_name)
2475 }
2476
2477 pub fn decision_for_source_at(
2479 &self,
2480 source: &SourceDefinition,
2481 now_ms: i64,
2482 manual_override: bool,
2483 ) -> SourceSyncDecision {
2484 SourceSyncDecision::evaluate(source, self.get(&source.name), now_ms, manual_override)
2485 }
2486
2487 fn status_path(data_dir: &Path) -> PathBuf {
2489 data_dir.join("sync_status.json")
2490 }
2491}
2492
2493const SOURCE_HIGH_LATENCY_MS: u64 = 60_000;
2494const SOURCE_FAILURE_BACKOFF_BASE_MS: i64 = 5 * 60 * 1000;
2495const SOURCE_FAILURE_BACKOFF_MAX_MS: i64 = 60 * 60 * 1000;
2496
2497pub(crate) fn current_unix_ms() -> i64 {
2498 let now = std::time::SystemTime::now()
2499 .duration_since(std::time::UNIX_EPOCH)
2500 .unwrap_or_default()
2501 .as_millis();
2502 i64::try_from(now).unwrap_or(i64::MAX)
2503}
2504
2505fn sync_schedule_period_ms(schedule: SyncSchedule) -> Option<i64> {
2506 match schedule {
2507 SyncSchedule::Manual => None,
2508 SyncSchedule::Hourly => Some(60 * 60 * 1000),
2509 SyncSchedule::Daily => Some(24 * 60 * 60 * 1000),
2510 }
2511}
2512
2513fn sync_schedule_due(last_sync: Option<i64>, period_ms: Option<i64>, now_ms: i64) -> bool {
2514 match (last_sync, period_ms) {
2515 (None, _) => true,
2516 (Some(_), None) => false,
2517 (Some(last_sync), Some(period_ms)) => last_sync.saturating_add(period_ms) <= now_ms,
2518 }
2519}
2520
2521fn automatic_source_sync_action(
2522 schedule: SyncSchedule,
2523 health: SourceHealthKind,
2524 info: Option<&SourceSyncInfo>,
2525 now_ms: i64,
2526) -> SourceSyncAction {
2527 match health {
2528 SourceHealthKind::AuthFailed | SourceHealthKind::BackingOff => SourceSyncAction::Defer,
2529 _ if matches!(schedule, SyncSchedule::Manual) => SourceSyncAction::Skip,
2530 SourceHealthKind::NeverSynced | SourceHealthKind::Stale => SourceSyncAction::Sync,
2531 SourceHealthKind::Flapping | SourceHealthKind::HighLatency => {
2532 if sync_schedule_due(
2533 info.and_then(|info| info.last_sync),
2534 sync_schedule_period_ms(schedule),
2535 now_ms,
2536 ) {
2537 SourceSyncAction::Sync
2538 } else {
2539 SourceSyncAction::Skip
2540 }
2541 }
2542 SourceHealthKind::Healthy => {
2543 if sync_schedule_due(
2544 info.and_then(|info| info.last_sync),
2545 sync_schedule_period_ms(schedule),
2546 now_ms,
2547 ) {
2548 SourceSyncAction::Sync
2549 } else {
2550 SourceSyncAction::Skip
2551 }
2552 }
2553 }
2554}
2555
2556fn health_score_for_source(health: SourceHealthKind) -> u8 {
2557 match health {
2558 SourceHealthKind::Healthy => 100,
2559 SourceHealthKind::Stale => 75,
2560 SourceHealthKind::NeverSynced => 65,
2561 SourceHealthKind::HighLatency => 55,
2562 SourceHealthKind::Flapping => 40,
2563 SourceHealthKind::BackingOff => 25,
2564 SourceHealthKind::AuthFailed => 10,
2565 }
2566}
2567
2568fn stale_value_score_for_source(
2569 schedule: SyncSchedule,
2570 staleness_ms: Option<i64>,
2571 info: Option<&SourceSyncInfo>,
2572) -> u8 {
2573 let Some(info) = info else {
2574 return 100;
2575 };
2576 if info.last_sync.is_none() {
2577 return 100;
2578 }
2579
2580 let Some(staleness_ms) = staleness_ms else {
2581 return 100;
2582 };
2583
2584 let Some(period_ms) = sync_schedule_period_ms(schedule) else {
2585 return 0;
2586 };
2587
2588 let score = staleness_ms.saturating_mul(100) / period_ms.max(1);
2589 u8::try_from(score.clamp(0, 100)).unwrap_or(100)
2590}
2591
2592fn failure_backoff_until_ms(info: &SourceSyncInfo) -> Option<i64> {
2593 if info.consecutive_failures == 0 {
2594 return None;
2595 }
2596 let last_sync = info.last_sync?;
2597 let exponent = info.consecutive_failures.saturating_sub(1).min(4);
2598 let multiplier = 1_i64.checked_shl(exponent).unwrap_or(16);
2599 let backoff_ms = SOURCE_FAILURE_BACKOFF_BASE_MS
2600 .saturating_mul(multiplier)
2601 .min(SOURCE_FAILURE_BACKOFF_MAX_MS);
2602 Some(last_sync.saturating_add(backoff_ms))
2603}
2604
2605fn sync_result_auth_failure(result: &SyncResult) -> bool {
2606 let Some(error) = result.error_message() else {
2607 return false;
2608 };
2609 let error = error.to_ascii_lowercase();
2610 error.contains("permission denied")
2611 || error.contains("authentication")
2612 || error.contains("host key verification failed")
2613 || error.contains("known_hosts")
2614 || error.contains("no valid authentication")
2615}
2616
2617fn unique_atomic_temp_path(path: &Path) -> PathBuf {
2618 unique_atomic_sidecar_path(path, "tmp", "sync_status.json")
2619}
2620
2621fn replace_file_from_temp(temp_path: &Path, final_path: &Path) -> Result<(), std::io::Error> {
2622 #[cfg(windows)]
2623 {
2624 match std::fs::rename(temp_path, final_path) {
2625 Ok(()) => sync_parent_directory(final_path),
2626 Err(first_err)
2627 if final_path.exists()
2628 && matches!(
2629 first_err.kind(),
2630 std::io::ErrorKind::AlreadyExists | std::io::ErrorKind::PermissionDenied
2631 ) =>
2632 {
2633 let backup_path = unique_replace_backup_path(final_path);
2634 std::fs::rename(final_path, &backup_path).map_err(|backup_err| {
2635 let _ = std::fs::remove_file(temp_path);
2636 std::io::Error::other(format!(
2637 "failed preparing backup {} before replacing {}: first error: {}; backup error: {}",
2638 backup_path.display(),
2639 final_path.display(),
2640 first_err,
2641 backup_err
2642 ))
2643 })?;
2644 match std::fs::rename(temp_path, final_path) {
2645 Ok(()) => {
2646 let _ = std::fs::remove_file(&backup_path);
2647 sync_parent_directory(final_path)
2648 }
2649 Err(second_err) => {
2650 let restore_result = std::fs::rename(&backup_path, final_path);
2651 match restore_result {
2652 Ok(()) => {
2653 let _ = std::fs::remove_file(temp_path);
2654 sync_parent_directory(final_path).map_err(|sync_err| {
2655 std::io::Error::other(format!(
2656 "failed replacing {} with {}: first error: {}; second error: {}; restored original file but failed syncing parent directory: {}",
2657 final_path.display(),
2658 temp_path.display(),
2659 first_err,
2660 second_err,
2661 sync_err
2662 ))
2663 })?;
2664 Err(std::io::Error::new(
2665 second_err.kind(),
2666 format!(
2667 "failed replacing {} with {}: first error: {}; second error: {}; restored original file",
2668 final_path.display(),
2669 temp_path.display(),
2670 first_err,
2671 second_err
2672 ),
2673 ))
2674 }
2675 Err(restore_err) => Err(std::io::Error::other(format!(
2676 "failed replacing {} with {}: first error: {}; second error: {}; restore error: {}; temp file retained at {}",
2677 final_path.display(),
2678 temp_path.display(),
2679 first_err,
2680 second_err,
2681 restore_err,
2682 temp_path.display()
2683 ))),
2684 }
2685 }
2686 }
2687 }
2688 Err(rename_err) => Err(rename_err),
2689 }
2690 }
2691
2692 #[cfg(not(windows))]
2693 {
2694 std::fs::rename(temp_path, final_path)?;
2695 sync_parent_directory(final_path)
2696 }
2697}
2698
2699fn sync_file_path(path: &Path) -> Result<(), std::io::Error> {
2700 std::fs::File::open(path)?.sync_all()
2701}
2702
2703#[cfg(not(windows))]
2704fn sync_parent_directory(path: &Path) -> Result<(), std::io::Error> {
2705 let Some(parent) = path.parent() else {
2706 return Ok(());
2707 };
2708 std::fs::File::open(parent)?.sync_all()
2709}
2710
2711#[cfg(windows)]
2712fn sync_parent_directory(_path: &Path) -> Result<(), std::io::Error> {
2713 Ok(())
2714}
2715
2716#[cfg(windows)]
2717fn unique_replace_backup_path(path: &Path) -> PathBuf {
2718 unique_atomic_sidecar_path(path, "bak", "sync_status.json")
2719}
2720
2721fn unique_atomic_sidecar_path(path: &Path, suffix: &str, fallback_name: &str) -> PathBuf {
2722 static NEXT_NONCE: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
2723
2724 let timestamp = std::time::SystemTime::now()
2725 .duration_since(std::time::UNIX_EPOCH)
2726 .unwrap_or_default()
2727 .as_nanos();
2728 let nonce = NEXT_NONCE.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2729 let file_name = path
2730 .file_name()
2731 .and_then(|name| name.to_str())
2732 .unwrap_or(fallback_name);
2733
2734 path.with_file_name(format!(
2735 ".{file_name}.{suffix}.{}.{}.{}",
2736 std::process::id(),
2737 timestamp,
2738 nonce
2739 ))
2740}
2741
2742#[cfg(test)]
2743mod tests {
2744 use super::*;
2745 use tempfile::TempDir;
2746
2747 #[test]
2748 fn test_path_to_safe_dirname() {
2749 let res = path_to_safe_dirname("~/.claude/projects");
2750 assert!(res.starts_with(".claude_projects_"));
2751
2752 let res = path_to_safe_dirname("/home/user/data");
2753 assert!(res.starts_with("home_user_data_"));
2754
2755 let res = path_to_safe_dirname("~/");
2756 assert!(res.starts_with("root_"));
2757
2758 let res = path_to_safe_dirname("");
2759 assert!(res.starts_with("root_"));
2760 }
2761
2762 #[test]
2763 fn test_path_to_safe_dirname_empty() {
2764 let res = path_to_safe_dirname("~");
2765 assert!(res.starts_with("root_"));
2766
2767 let res = path_to_safe_dirname("/");
2768 assert!(res.starts_with("root_"));
2769 }
2770
2771 #[test]
2772 fn test_path_to_safe_dirname_strips_traversal_components() {
2773 let res = path_to_safe_dirname("../../etc/passwd");
2774
2775 assert!(res.starts_with("etc_passwd_"));
2776 assert!(!res.contains(".."));
2777 assert!(!res.contains('/'));
2778 assert!(!res.contains('\\'));
2779 }
2780
2781 #[test]
2782 fn test_get_remote_home_rejects_unsafe_hosts_before_ssh() {
2783 let temp = TempDir::new().unwrap();
2784 let engine = SyncEngine::new(temp.path());
2785
2786 for host in [
2787 "work-mac;touch /tmp/cass-owned",
2788 "work mac",
2789 "work-mac\nhostname",
2790 "work-mac`hostname`",
2791 "work-mac/../../secret",
2792 "-oProxyCommand=evil",
2793 "",
2794 "@host",
2795 "user@",
2796 "user@host@extra",
2797 ] {
2798 let err = engine.get_remote_home(host).unwrap_err();
2799 assert!(
2800 matches!(err, SyncError::SshFailed(ref message) if message.contains("Invalid characters in host")),
2801 "expected invalid-host rejection for {host:?}, got {err}"
2802 );
2803 }
2804 }
2805
2806 #[test]
2807 fn test_sync_source_rejects_invalid_source_name_before_mirror_creation() {
2808 let temp = TempDir::new().unwrap();
2809 let engine = SyncEngine::new(temp.path());
2810 let mut source = SourceDefinition::ssh("../escape", "user@host");
2811 source.paths = vec!["/tmp/sessions".to_string()];
2812
2813 let err = engine
2814 .sync_source(&source)
2815 .expect_err("invalid source name should fail before local writes");
2816
2817 assert!(
2818 matches!(err, SyncError::InvalidSource(ref message) if message.contains("Source name cannot contain path separators")),
2819 "expected invalid source-name rejection, got {err}"
2820 );
2821 assert!(
2822 !temp.path().join("escape").exists(),
2823 "invalid source name must not escape the remotes mirror layout"
2824 );
2825 assert!(
2826 !temp.path().join("remotes").exists(),
2827 "invalid source name must be rejected before creating mirror roots"
2828 );
2829 }
2830
2831 #[test]
2832 fn test_sync_source_rejects_invalid_host_before_mirror_creation() {
2833 let temp = TempDir::new().unwrap();
2834 let engine = SyncEngine::new(temp.path());
2835 let mut source = SourceDefinition::ssh("unsafe-host", "user@host withspace");
2836 source.paths = vec!["/tmp/sessions".to_string()];
2837
2838 let err = engine
2839 .sync_source(&source)
2840 .expect_err("invalid host should fail before local writes");
2841
2842 assert!(
2843 matches!(err, SyncError::InvalidSource(ref message) if message.contains("SSH host cannot contain whitespace")),
2844 "expected invalid host rejection, got {err}"
2845 );
2846 assert!(
2847 !temp.path().join("remotes").exists(),
2848 "invalid host must be rejected before creating mirror roots"
2849 );
2850 }
2851
2852 #[test]
2853 fn test_sync_source_reports_invalid_remote_paths_without_transfer() {
2854 let temp = TempDir::new().unwrap();
2855 let engine = SyncEngine::new(temp.path());
2856
2857 for (path, expected) in [
2858 ("", "paths[0] cannot be empty"),
2859 (" ", "paths[0] cannot be empty"),
2860 (" ~/.claude/projects", "paths[0] cannot have leading"),
2861 ("~/.claude/projects ", "paths[0] cannot have leading"),
2862 ("~/.claude\nprojects", "paths[0] cannot contain control"),
2863 ] {
2864 let mut source = SourceDefinition::ssh("laptop", "user@laptop.local");
2865 source.paths = vec![path.to_string()];
2866
2867 let report = engine.sync_source(&source).unwrap();
2868 assert_eq!(report.path_results.len(), 1);
2869 let result = &report.path_results[0];
2870 assert!(!result.success);
2871 assert_eq!(result.remote_path, path);
2872 assert!(
2873 result
2874 .error
2875 .as_deref()
2876 .is_some_and(|message| message.contains(expected)),
2877 "expected invalid path rejection for {path:?}, got {result:?}"
2878 );
2879 }
2880 }
2881
2882 #[test]
2883 fn test_remote_sync_path_validation_allows_internal_spaces() {
2884 assert!(
2885 validate_remote_sync_path_entry(
2886 0,
2887 "~/Library/Application Support/Cursor/User/globalStorage"
2888 )
2889 .is_ok()
2890 );
2891 }
2892
2893 #[test]
2894 fn test_sync_source_preserves_path_result_order_for_mixed_invalid_paths() {
2895 let temp = TempDir::new().unwrap();
2896 let engine = SyncEngine::new(temp.path()).with_connection_timeout(1);
2897 let mut source = SourceDefinition::ssh("laptop", "192.0.2.1");
2900 source.paths = vec![
2901 "~/.codex/sessions".to_string(),
2902 " ~/.claude/projects".to_string(),
2903 "~/.gemini/tmp".to_string(),
2904 ];
2905
2906 let report = engine.sync_source(&source).unwrap();
2907 let remote_paths = report
2908 .path_results
2909 .iter()
2910 .map(|result| result.remote_path.as_str())
2911 .collect::<Vec<_>>();
2912
2913 assert_eq!(
2914 remote_paths,
2915 vec!["~/.codex/sessions", " ~/.claude/projects", "~/.gemini/tmp"]
2916 );
2917 assert!(
2918 report.path_results[1]
2919 .error
2920 .as_deref()
2921 .is_some_and(|message| message.contains("paths[1] cannot have leading")),
2922 "expected invalid path error in original slot: {:?}",
2923 report.path_results
2924 );
2925 }
2926
2927 #[test]
2928 fn test_remote_find_regular_files_command_uses_physical_traversal() {
2929 assert_eq!(
2930 remote_find_regular_files_command("/tmp/has space"),
2931 "find -P '/tmp/has space' -type f -print0"
2932 );
2933 assert_eq!(
2934 remote_find_regular_files_command("/tmp/that's all"),
2935 "find -P '/tmp/that'\\''s all' -type f -print0"
2936 );
2937 }
2938
2939 #[test]
2940 fn test_parse_remote_home_stdout_accepts_single_absolute_candidate() {
2941 assert_eq!(
2942 parse_remote_home_stdout(b"Welcome to host\nCASS_HOME_MARKER:/home/user\n"),
2943 Some("/home/user".to_string())
2944 );
2945 assert_eq!(
2946 parse_remote_home_stdout(b"CASS_HOME_MARKER:/Users/test user\r\n"),
2947 Some("/Users/test user".to_string())
2948 );
2949 }
2950
2951 #[test]
2952 fn test_parse_remote_home_stdout_rejects_missing_or_ambiguous_home() {
2953 assert_eq!(parse_remote_home_stdout(b"Welcome to host\n"), None);
2954 assert_eq!(
2955 parse_remote_home_stdout(b"CASS_HOME_MARKER:not_absolute\n"),
2956 None
2957 );
2958 }
2959
2960 #[test]
2961 fn test_parse_null_terminated_utf8_paths_skips_invalid_entries() {
2962 let paths = parse_null_terminated_utf8_paths(
2963 b"/remote/sessions/a.jsonl\0bad-\xff-name\0/remote/sessions/b.jsonl\0",
2964 );
2965 assert_eq!(
2966 paths,
2967 vec![
2968 "/remote/sessions/a.jsonl".to_string(),
2969 "/remote/sessions/b.jsonl".to_string()
2970 ]
2971 );
2972 }
2973
2974 #[test]
2975 fn test_remote_file_to_safe_local_path_rejects_outside_root() {
2976 let root = Path::new("/remote/sessions");
2977 let local = Path::new("/mirror/root");
2978
2979 assert_eq!(
2980 remote_file_to_safe_local_path(
2981 root,
2982 Path::new("/remote/sessions/a/b.jsonl"),
2983 local,
2984 "sessions"
2985 ),
2986 Some(PathBuf::from("/mirror/root/sessions/a/b.jsonl"))
2987 );
2988 assert_eq!(
2989 remote_file_to_safe_local_path(
2990 Path::new("/remote/session.jsonl"),
2991 Path::new("/remote/session.jsonl"),
2992 local,
2993 "session.jsonl"
2994 ),
2995 Some(PathBuf::from("/mirror/root/session.jsonl"))
2996 );
2997 assert_eq!(
2998 remote_file_to_safe_local_path(
2999 root,
3000 Path::new("/remote/sessions/../secret.txt"),
3001 local,
3002 "sessions"
3003 ),
3004 None
3005 );
3006 assert_eq!(
3007 remote_file_to_safe_local_path(
3008 root,
3009 Path::new("/remote/other/secret.txt"),
3010 local,
3011 "sessions"
3012 ),
3013 None
3014 );
3015 }
3016
3017 #[test]
3018 fn test_local_symlink_guard_allows_regular_paths() {
3019 let temp = TempDir::new().expect("tempdir");
3020 let root = temp.path().join("mirror");
3021 let target = root.join("sessions/session.jsonl");
3022
3023 assert!(reject_local_symlink_below_root(&root, &target).is_ok());
3024
3025 std::fs::create_dir_all(target.parent().expect("target parent")).expect("create parent");
3026 std::fs::write(&target, "{}").expect("write target");
3027
3028 assert!(reject_local_symlink_below_root(&root, &target).is_ok());
3029 }
3030
3031 #[cfg(unix)]
3032 #[test]
3033 fn test_local_symlink_guard_rejects_nested_symlink() {
3034 use std::os::unix::fs::symlink;
3035
3036 let temp = TempDir::new().expect("tempdir");
3037 let root = temp.path().join("mirror");
3038 let outside = temp.path().join("outside");
3039 std::fs::create_dir_all(&root).expect("create root");
3040 std::fs::create_dir_all(&outside).expect("create outside");
3041 symlink(&outside, root.join("sessions")).expect("symlink nested dir");
3042
3043 let err = reject_local_symlink_below_root(&root, &root.join("sessions/session.jsonl"))
3044 .expect_err("nested symlink should be rejected");
3045
3046 assert!(err.contains("Refusing to write"));
3047 assert!(err.contains("sessions"));
3048 }
3049
3050 #[cfg(unix)]
3051 #[test]
3052 fn test_local_symlink_guard_rejects_root_symlink() {
3053 use std::os::unix::fs::symlink;
3054
3055 let temp = TempDir::new().expect("tempdir");
3056 let outside = temp.path().join("outside");
3057 let root = temp.path().join("mirror-link");
3058 std::fs::create_dir_all(&outside).expect("create outside");
3059 symlink(&outside, &root).expect("symlink root");
3060
3061 let err = reject_local_symlink_below_root(&root, &root.join("session.jsonl"))
3062 .expect_err("root symlink should be rejected");
3063
3064 assert!(err.contains("Refusing to write"));
3065 assert!(err.contains("mirror-link"));
3066 }
3067
3068 #[test]
3069 fn test_prepare_local_sync_container_creates_regular_container() {
3070 let temp = TempDir::new().expect("tempdir");
3071 let root = temp.path().join("mirror");
3072 let target = root.join("sessions");
3073
3074 prepare_local_sync_container(&root, &target).expect("regular container should be created");
3075
3076 assert!(target.is_dir());
3077 }
3078
3079 #[cfg(unix)]
3080 #[test]
3081 fn test_prepare_local_sync_container_rejects_preexisting_target_symlink() {
3082 use std::os::unix::fs::symlink;
3083
3084 let temp = TempDir::new().expect("tempdir");
3085 let root = temp.path().join("mirror");
3086 let outside = temp.path().join("outside");
3087 let target = root.join("sessions");
3088 std::fs::create_dir_all(&root).expect("create root");
3089 std::fs::create_dir_all(&outside).expect("create outside");
3090 symlink(&outside, &target).expect("symlink target");
3091
3092 let err = prepare_local_sync_container(&root, &target)
3093 .expect_err("sync container symlink should be rejected");
3094
3095 assert!(err.contains("Refusing to write"));
3096 assert!(err.contains("sessions"));
3097 }
3098
3099 #[cfg(unix)]
3100 #[test]
3101 fn test_prepare_local_sync_container_rejects_root_symlink() {
3102 use std::os::unix::fs::symlink;
3103
3104 let temp = TempDir::new().expect("tempdir");
3105 let outside = temp.path().join("outside");
3106 let root = temp.path().join("mirror-link");
3107 let target = root.join("sessions");
3108 std::fs::create_dir_all(&outside).expect("create outside");
3109 symlink(&outside, &root).expect("symlink root");
3110
3111 let err = prepare_local_sync_container(&root, &target)
3112 .expect_err("sync root symlink should be rejected");
3113
3114 assert!(err.contains("Refusing to write"));
3115 assert!(err.contains("mirror-link"));
3116 }
3117
3118 #[cfg(unix)]
3119 #[test]
3120 fn test_prepare_local_sync_root_rejects_symlinked_source_parent() {
3121 use std::os::unix::fs::symlink;
3122
3123 let temp = TempDir::new().expect("tempdir");
3124 let local_store = temp.path().join("data");
3125 let remotes = local_store.join("remotes");
3126 let outside = temp.path().join("outside");
3127 let source_link = remotes.join("laptop");
3128 let mirror_dir = source_link.join("mirror");
3129
3130 std::fs::create_dir_all(&remotes).expect("create remotes");
3131 std::fs::create_dir_all(&outside).expect("create outside");
3132 symlink(&outside, &source_link).expect("symlink source parent");
3133
3134 let err = prepare_local_sync_root(&local_store, &mirror_dir)
3135 .expect_err("symlinked source parent should be rejected before mkdir");
3136
3137 assert!(err.contains("Refusing to write"));
3138 assert!(err.contains("laptop"));
3139 assert!(
3140 !outside.join("mirror").exists(),
3141 "sync root preparation must not create directories through source parent symlinks"
3142 );
3143 }
3144
3145 #[test]
3146 fn test_sftp_file_stat_is_symlink_detects_link_modes() {
3147 let symlink = FileStat {
3148 size: None,
3149 uid: None,
3150 gid: None,
3151 perm: Some(0o120000 | 0o777),
3152 atime: None,
3153 mtime: None,
3154 };
3155 let regular = FileStat {
3156 size: None,
3157 uid: None,
3158 gid: None,
3159 perm: Some(0o100000 | 0o644),
3160 atime: None,
3161 mtime: None,
3162 };
3163
3164 assert!(sftp_file_stat_is_symlink(&symlink));
3165 assert!(!sftp_file_stat_is_symlink(®ular));
3166 }
3167
3168 #[test]
3169 fn test_sftp_entry_file_name_accepts_regular_names() {
3170 let parent = Path::new("/remote");
3171 let entry = parent.join("session.jsonl");
3172
3173 assert_eq!(sftp_entry_file_name(&entry, parent), Some("session.jsonl"));
3174 }
3175
3176 #[test]
3177 fn test_sftp_entry_file_name_skips_dot_entries() {
3178 let parent = Path::new("/remote");
3179
3180 assert_eq!(sftp_entry_file_name(Path::new("."), parent), None);
3181 assert_eq!(sftp_entry_file_name(Path::new(".."), parent), None);
3182 }
3183
3184 #[cfg(unix)]
3185 #[test]
3186 fn test_sftp_entry_file_name_rejects_non_utf8_names() {
3187 use std::ffi::OsStr;
3188 use std::os::unix::ffi::OsStrExt;
3189
3190 let parent = Path::new("/remote");
3191 let bad_component = Path::new(OsStr::from_bytes(b"bad-\xff-name"));
3192 let entry = parent.join(bad_component);
3193
3194 assert_eq!(sftp_entry_file_name(&entry, parent), None);
3195 }
3196
3197 #[test]
3198 fn test_parse_rsync_stats() {
3199 let output = r#"
3200Number of files: 42
3201Number of regular files transferred: 10
3202Total transferred file size: 1,234 bytes
3203 "#;
3204
3205 let stats = parse_rsync_stats(output);
3206 assert_eq!(stats.files_transferred, 10);
3207 assert_eq!(stats.bytes_transferred, 1234);
3208 }
3209
3210 #[test]
3211 fn test_parse_rsync_stats_empty() {
3212 let stats = parse_rsync_stats("");
3213 assert_eq!(stats.files_transferred, 0);
3214 assert_eq!(stats.bytes_transferred, 0);
3215 }
3216
3217 #[test]
3218 fn test_quote_remote_shell_path_handles_spaces_and_quotes() {
3219 assert_eq!(
3220 quote_remote_shell_path("/Users/me/Library/Application Support/Cursor"),
3221 "'/Users/me/Library/Application Support/Cursor'"
3222 );
3223 assert_eq!(
3224 quote_remote_shell_path("/tmp/that's all"),
3225 "'/tmp/that'\\''s all'"
3226 );
3227 }
3228
3229 #[test]
3230 fn test_remote_spec_for_rsync_quotes_only_when_needed() {
3231 assert_eq!(
3232 remote_spec_for_rsync("work-mac", "/tmp/has space", true),
3233 "work-mac:/tmp/has space"
3234 );
3235 assert_eq!(
3236 remote_spec_for_rsync("work-mac", "/tmp/that's all", true),
3237 "work-mac:/tmp/that's all"
3238 );
3239 assert_eq!(
3240 remote_spec_for_rsync("work-mac", "/tmp/has space", false),
3241 "work-mac:'/tmp/has space'"
3242 );
3243 }
3244
3245 #[test]
3246 fn rsync_arg_protection_enum_maps_flags_correctly() {
3247 assert_eq!(
3252 RsyncArgProtection::ProtectArgs.flag(),
3253 Some("--protect-args")
3254 );
3255 assert_eq!(
3256 RsyncArgProtection::SecludedArgs.flag(),
3257 Some("--secluded-args")
3258 );
3259 assert_eq!(RsyncArgProtection::None.flag(), None);
3260 assert!(RsyncArgProtection::ProtectArgs.is_supported());
3261 assert!(RsyncArgProtection::SecludedArgs.is_supported());
3262 assert!(!RsyncArgProtection::None.is_supported());
3263 }
3264
3265 #[test]
3266 fn test_remote_spec_for_shell_bound_copy_quotes_remote_path() {
3267 assert_eq!(
3268 remote_spec_for_shell_bound_copy("work-mac", "/tmp/has space"),
3269 "work-mac:'/tmp/has space'"
3270 );
3271 }
3272
3273 #[test]
3274 fn test_remote_spec_for_scp_always_quotes_remote_path() {
3275 assert_eq!(
3276 remote_spec_for_scp("work-mac", "/tmp/that's all"),
3277 "work-mac:'/tmp/that'\\''s all'"
3278 );
3279 }
3280
3281 #[test]
3282 fn test_sync_report_totals() {
3283 let mut report = SyncReport::new("test", SyncMethod::Rsync);
3284 report.add_path_result(PathSyncResult {
3285 files_transferred: 5,
3286 bytes_transferred: 100,
3287 success: true,
3288 ..Default::default()
3289 });
3290 report.add_path_result(PathSyncResult {
3291 files_transferred: 3,
3292 bytes_transferred: 50,
3293 success: true,
3294 ..Default::default()
3295 });
3296
3297 assert_eq!(report.total_files(), 8);
3298 assert_eq!(report.total_bytes(), 150);
3299 assert!(report.all_succeeded);
3300 }
3301
3302 #[test]
3303 fn test_sync_report_with_failure() {
3304 let mut report = SyncReport::new("test", SyncMethod::Rsync);
3305 report.add_path_result(PathSyncResult {
3306 success: true,
3307 ..Default::default()
3308 });
3309 report.add_path_result(PathSyncResult {
3310 success: false,
3311 error: Some("Connection refused".into()),
3312 ..Default::default()
3313 });
3314
3315 assert!(!report.all_succeeded);
3316 assert_eq!(report.successful_paths(), 1);
3317 assert_eq!(report.failed_paths(), 1);
3318 }
3319
3320 #[test]
3321 fn test_detect_sync_method() {
3322 let method = SyncEngine::detect_sync_method();
3324 assert!(matches!(
3325 method,
3326 SyncMethod::Rsync | SyncMethod::WslRsync | SyncMethod::Scp | SyncMethod::Sftp
3327 ));
3328 }
3329
3330 #[test]
3331 fn test_sync_engine_mirror_dir() {
3332 let engine = SyncEngine::new(Path::new("/data/cass"));
3333 let mirror = engine.mirror_dir("laptop");
3334 assert_eq!(mirror, PathBuf::from("/data/cass/remotes/laptop/mirror"));
3335 }
3336
3337 #[test]
3338 fn test_sync_method_display() {
3339 for (method, expected) in [
3340 (SyncMethod::Rsync, "rsync"),
3341 (SyncMethod::WslRsync, "wsl-rsync"),
3342 (SyncMethod::Scp, "scp"),
3343 (SyncMethod::Sftp, "sftp"),
3344 ] {
3345 assert_eq!(method.as_str(), expected);
3346 assert_eq!(method.to_string(), expected);
3347 }
3348 }
3349
3350 #[test]
3351 fn test_windows_path_to_wsl_drive() {
3352 assert_eq!(
3353 windows_path_to_wsl("C:\\Users\\george\\AppData\\Roaming\\cass"),
3354 "/mnt/c/Users/george/AppData/Roaming/cass"
3355 );
3356 }
3357
3358 #[test]
3359 fn test_windows_path_to_wsl_forward_slash() {
3360 assert_eq!(
3361 windows_path_to_wsl("C:/Users/george/data"),
3362 "/mnt/c/Users/george/data"
3363 );
3364 }
3365
3366 #[test]
3367 fn test_windows_path_to_wsl_non_windows_path_unchanged() {
3368 assert_eq!(
3370 windows_path_to_wsl("/home/george/data"),
3371 "/home/george/data"
3372 );
3373 }
3374
3375 #[test]
3376 fn test_expand_tilde_with_home() {
3377 assert_eq!(
3379 SyncEngine::expand_tilde_with_home("/home/user/projects", Some("/home/user")),
3380 "/home/user/projects"
3381 );
3382
3383 assert_eq!(
3385 SyncEngine::expand_tilde_with_home("~/.claude/projects", Some("/home/user")),
3386 "/home/user/.claude/projects"
3387 );
3388
3389 assert_eq!(
3391 SyncEngine::expand_tilde_with_home("~", Some("/home/user")),
3392 "/home/user"
3393 );
3394
3395 assert_eq!(
3397 SyncEngine::expand_tilde_with_home("~/.claude/projects", None),
3398 "~/.claude/projects"
3399 );
3400
3401 assert_eq!(
3403 SyncEngine::expand_tilde_with_home("~otheruser/projects", Some("/home/user")),
3404 "~otheruser/projects"
3405 );
3406 }
3407
3408 #[test]
3409 fn test_sync_report_failed() {
3410 let report = SyncReport::failed("test-source", SyncError::NoHost);
3411 assert_eq!(report.source_name, "test-source");
3412 assert!(!report.all_succeeded);
3413 assert_eq!(report.path_results.len(), 1);
3414 assert!(!report.path_results[0].success);
3415 assert!(report.path_results[0].error.is_some());
3416 }
3417
3418 #[test]
3419 fn test_sync_result_default() {
3420 let result = SyncResult::default();
3421 assert!(matches!(result, SyncResult::Skipped));
3422 assert_eq!(result.label(), "never");
3423 }
3424
3425 #[test]
3426 fn test_source_sync_info_default() {
3427 let info = SourceSyncInfo::default();
3428 assert!(info.last_sync.is_none());
3429 assert_eq!(info.files_synced, 0);
3430 assert_eq!(info.bytes_transferred, 0);
3431 assert_eq!(info.duration_ms, 0);
3432 }
3433
3434 #[test]
3435 fn test_sync_status_update() {
3436 let mut status = SyncStatus::default();
3437
3438 let mut report = SyncReport::new("laptop", SyncMethod::Rsync);
3439 report.add_path_result(PathSyncResult {
3440 files_transferred: 10,
3441 bytes_transferred: 1000,
3442 success: true,
3443 ..Default::default()
3444 });
3445 report.total_duration_ms = 500;
3446
3447 status.update("laptop", &report);
3448
3449 let info = status.get("laptop").unwrap();
3450 assert!(info.last_sync.is_some());
3451 assert!(matches!(info.last_result, SyncResult::Success));
3452 assert_eq!(info.files_synced, 10);
3453 assert_eq!(info.bytes_transferred, 1000);
3454 assert_eq!(info.duration_ms, 500);
3455 }
3456
3457 #[test]
3458 fn test_sync_status_partial_failure() {
3459 let mut status = SyncStatus::default();
3460
3461 let mut report = SyncReport::new("server", SyncMethod::Rsync);
3462 report.add_path_result(PathSyncResult {
3463 success: true,
3464 files_transferred: 5,
3465 ..Default::default()
3466 });
3467 report.add_path_result(PathSyncResult {
3468 success: false,
3469 error: Some("Connection refused".into()),
3470 ..Default::default()
3471 });
3472
3473 status.update("server", &report);
3474
3475 let info = status.get("server").unwrap();
3476 assert!(matches!(info.last_result, SyncResult::PartialFailure(_)));
3477 }
3478
3479 #[test]
3480 fn test_sync_status_full_failure() {
3481 let mut status = SyncStatus::default();
3482
3483 let mut report = SyncReport::new("dead-host", SyncMethod::Rsync);
3484 report.add_path_result(PathSyncResult {
3485 success: false,
3486 error: Some("Host unreachable".into()),
3487 ..Default::default()
3488 });
3489
3490 status.update("dead-host", &report);
3491
3492 let info = status.get("dead-host").unwrap();
3493 assert!(matches!(info.last_result, SyncResult::Failed(_)));
3494 }
3495
3496 #[test]
3497 fn test_sync_status_save_round_trips() {
3498 let temp = TempDir::new().expect("tempdir");
3499 let mut status = SyncStatus::default();
3500 let mut report = SyncReport::new("laptop", SyncMethod::Rsync);
3501 report.add_path_result(PathSyncResult {
3502 files_transferred: 3,
3503 bytes_transferred: 42,
3504 success: true,
3505 ..Default::default()
3506 });
3507 status.update("laptop", &report);
3508
3509 status.save(temp.path()).expect("save status");
3510 let loaded = SyncStatus::load(temp.path()).expect("load status");
3511
3512 let info = loaded.get("laptop").expect("round-tripped source");
3513 assert_eq!(info.files_synced, 3);
3514 assert_eq!(info.bytes_transferred, 42);
3515 assert!(matches!(info.last_result, SyncResult::Success));
3516 }
3517
3518 #[test]
3519 fn test_sync_status_retain_sources_prunes_removed_entries() {
3520 let mut status = SyncStatus::default();
3521 status.sources.insert(
3522 "laptop".into(),
3523 SourceSyncInfo {
3524 files_synced: 3,
3525 ..Default::default()
3526 },
3527 );
3528 status.sources.insert(
3529 "desktop".into(),
3530 SourceSyncInfo {
3531 files_synced: 5,
3532 ..Default::default()
3533 },
3534 );
3535
3536 let removed_any = status.retain_sources(["laptop"]);
3537
3538 assert!(removed_any);
3539 assert!(status.get("laptop").is_some());
3540 assert!(status.get("desktop").is_none());
3541 }
3542
3543 fn source_with_schedule(schedule: SyncSchedule) -> SourceDefinition {
3544 let mut source = SourceDefinition::ssh("laptop", "user@laptop.local");
3545 source.sync_schedule = schedule;
3546 source.paths = vec!["~/.claude/projects".to_string()];
3547 source
3548 }
3549
3550 fn status_with_info(info: SourceSyncInfo) -> SyncStatus {
3551 let mut status = SyncStatus::default();
3552 status.set_info("laptop", info);
3553 status
3554 }
3555
3556 #[test]
3557 fn source_sync_decision_skips_healthy_source_until_schedule_due() {
3558 let now_ms = 1_700_000_000_000;
3559 let source = source_with_schedule(SyncSchedule::Hourly);
3560 let status = status_with_info(SourceSyncInfo {
3561 last_sync: Some(now_ms - 10 * 60 * 1000),
3562 last_result: SyncResult::Success,
3563 duration_ms: 250,
3564 ..Default::default()
3565 });
3566
3567 let decision = status.decision_for_source_at(&source, now_ms, false);
3568
3569 assert_eq!(decision.action, SourceSyncAction::Skip);
3570 assert_eq!(decision.health, SourceHealthKind::Healthy);
3571 assert!(!decision.fallback_active);
3572 assert_eq!(
3573 decision.next_eligible_sync_ms,
3574 Some(now_ms + 50 * 60 * 1000)
3575 );
3576 assert_eq!(decision.staleness_ms, Some(10 * 60 * 1000));
3577 assert_eq!(decision.stale_value_score, 16);
3578 }
3579
3580 #[test]
3581 fn source_sync_decision_syncs_stale_scheduled_source() {
3582 let now_ms = 1_700_000_000_000;
3583 let source = source_with_schedule(SyncSchedule::Hourly);
3584 let status = status_with_info(SourceSyncInfo {
3585 last_sync: Some(now_ms - 2 * 60 * 60 * 1000),
3586 last_result: SyncResult::Success,
3587 duration_ms: 250,
3588 ..Default::default()
3589 });
3590
3591 let decision = status.decision_for_source_at(&source, now_ms, false);
3592
3593 assert_eq!(decision.action, SourceSyncAction::Sync);
3594 assert_eq!(decision.health, SourceHealthKind::Stale);
3595 assert_eq!(decision.stale_value_score, 100);
3596 assert!(
3597 decision
3598 .reasons
3599 .iter()
3600 .any(|reason| reason.contains("schedule is due"))
3601 );
3602 }
3603
3604 #[test]
3605 fn source_sync_decision_defers_auth_failures_with_fallback_reason() {
3606 let now_ms = 1_700_000_000_000;
3607 let source = source_with_schedule(SyncSchedule::Hourly);
3608 let status = status_with_info(SourceSyncInfo {
3609 last_sync: Some(now_ms - 10 * 60 * 1000),
3610 last_result: SyncResult::Failed("Permission denied (publickey)".into()),
3611 duration_ms: 800,
3612 consecutive_failures: 1,
3613 ..Default::default()
3614 });
3615
3616 let decision = status.decision_for_source_at(&source, now_ms, false);
3617
3618 assert_eq!(decision.action, SourceSyncAction::Defer);
3619 assert_eq!(decision.health, SourceHealthKind::AuthFailed);
3620 assert!(decision.fallback_active);
3621 assert_eq!(decision.health_score, 10);
3622 }
3623
3624 #[test]
3625 fn source_sync_decision_marks_partial_success_as_flapping() {
3626 let now_ms = 1_700_000_000_000;
3627 let source = source_with_schedule(SyncSchedule::Hourly);
3628 let status = status_with_info(SourceSyncInfo {
3629 last_sync: Some(now_ms - 10 * 60 * 1000),
3630 last_result: SyncResult::PartialFailure("one path failed".into()),
3631 files_synced: 7,
3632 duration_ms: 900,
3633 consecutive_failures: 1,
3634 ..Default::default()
3635 });
3636
3637 let decision = status.decision_for_source_at(&source, now_ms, false);
3638
3639 assert_eq!(decision.action, SourceSyncAction::Skip);
3640 assert_eq!(decision.health, SourceHealthKind::Flapping);
3641 assert!(decision.fallback_active);
3642 }
3643
3644 #[test]
3645 fn source_sync_decision_keeps_local_fallback_after_unreachable_backoff_expires() {
3646 let now_ms = 1_700_000_000_000;
3647 let source = source_with_schedule(SyncSchedule::Hourly);
3648 let last_sync = now_ms - 10 * 60 * 1000;
3649 let status = status_with_info(SourceSyncInfo {
3650 last_sync: Some(last_sync),
3651 last_result: SyncResult::Failed("Host unreachable".into()),
3652 duration_ms: 900,
3653 consecutive_failures: 1,
3654 ..Default::default()
3655 });
3656
3657 let decision = status.decision_for_source_at(&source, now_ms, false);
3658
3659 assert_eq!(decision.action, SourceSyncAction::Skip);
3660 assert_eq!(decision.health, SourceHealthKind::Flapping);
3661 assert!(decision.fallback_active);
3662 assert_eq!(
3663 decision.backoff_until_ms,
3664 Some(last_sync + SOURCE_FAILURE_BACKOFF_BASE_MS)
3665 );
3666 assert!(
3667 decision
3668 .reasons
3669 .iter()
3670 .any(|reason| reason.contains("local fallback remains active"))
3671 );
3672 }
3673
3674 #[test]
3675 fn source_sync_decision_marks_slow_source_as_high_latency() {
3676 let now_ms = 1_700_000_000_000;
3677 let source = source_with_schedule(SyncSchedule::Hourly);
3678 let status = status_with_info(SourceSyncInfo {
3679 last_sync: Some(now_ms - 10 * 60 * 1000),
3680 last_result: SyncResult::Success,
3681 duration_ms: SOURCE_HIGH_LATENCY_MS + 1,
3682 ..Default::default()
3683 });
3684
3685 let decision = status.decision_for_source_at(&source, now_ms, false);
3686
3687 assert_eq!(decision.action, SourceSyncAction::Skip);
3688 assert_eq!(decision.health, SourceHealthKind::HighLatency);
3689 assert!(decision.fallback_active);
3690 }
3691
3692 #[test]
3693 fn source_sync_decision_manual_override_forces_sync() {
3694 let now_ms = 1_700_000_000_000;
3695 let source = source_with_schedule(SyncSchedule::Manual);
3696 let status = status_with_info(SourceSyncInfo {
3697 last_sync: Some(now_ms),
3698 last_result: SyncResult::Success,
3699 duration_ms: 100,
3700 ..Default::default()
3701 });
3702
3703 let decision = status.decision_for_source_at(&source, now_ms, true);
3704
3705 assert_eq!(decision.action, SourceSyncAction::Sync);
3706 assert!(decision.manual_override);
3707 assert!(
3708 decision
3709 .reasons
3710 .iter()
3711 .any(|reason| reason.contains("overrides automatic scheduling"))
3712 );
3713 }
3714
3715 #[test]
3716 fn test_unique_atomic_temp_path_changes_each_call() {
3717 let final_path = Path::new("/tmp/sync_status.json");
3718 let first = unique_atomic_temp_path(final_path);
3719 let second = unique_atomic_temp_path(final_path);
3720
3721 assert_ne!(first, second);
3722 assert_eq!(first.parent(), final_path.parent());
3723 assert_eq!(second.parent(), final_path.parent());
3724 }
3725
3726 #[test]
3727 fn test_replace_file_from_temp_overwrites_existing_file() {
3728 let temp = TempDir::new().expect("tempdir");
3729 let final_path = temp.path().join("sync_status.json");
3730 let first_tmp = temp.path().join("first.tmp");
3731 let second_tmp = temp.path().join("second.tmp");
3732
3733 std::fs::write(&first_tmp, "{\"first\":true}").expect("write first temp");
3734 replace_file_from_temp(&first_tmp, &final_path).expect("initial replace");
3735 assert_eq!(
3736 std::fs::read_to_string(&final_path).expect("read first final"),
3737 "{\"first\":true}"
3738 );
3739
3740 std::fs::write(&second_tmp, "{\"second\":true}").expect("write second temp");
3741 replace_file_from_temp(&second_tmp, &final_path).expect("overwrite replace");
3742 assert_eq!(
3743 std::fs::read_to_string(&final_path).expect("read second final"),
3744 "{\"second\":true}"
3745 );
3746 }
3747
3748 #[test]
3749 fn test_sync_engine_with_timeouts() {
3750 let engine = SyncEngine::new(Path::new("/data"))
3751 .with_connection_timeout(30)
3752 .with_transfer_timeout(600);
3753
3754 assert_eq!(engine.connection_timeout, 30);
3755 assert_eq!(engine.transfer_timeout, 600);
3756 }
3757
3758 #[test]
3759 fn test_sync_error_display() {
3760 assert_eq!(
3761 SyncError::NoHost.to_string(),
3762 "Source has no host configured"
3763 );
3764 assert_eq!(
3765 SyncError::NoPaths.to_string(),
3766 "Source has no paths configured"
3767 );
3768 assert_eq!(
3769 SyncError::InvalidPath("paths[0] cannot be empty".to_string()).to_string(),
3770 "Invalid source path: paths[0] cannot be empty"
3771 );
3772 assert_eq!(
3773 SyncError::Timeout(30).to_string(),
3774 "Connection timed out after 30 seconds"
3775 );
3776 assert_eq!(SyncError::Cancelled.to_string(), "Sync cancelled");
3777 }
3778
3779 #[test]
3784 fn test_parse_ssh_host_simple() {
3785 let (user, host) = parse_ssh_host("myserver");
3786 assert!(user.is_none());
3787 assert_eq!(host, "myserver");
3788 }
3789
3790 #[test]
3791 fn test_parse_ssh_host_with_user() {
3792 let (user, host) = parse_ssh_host("admin@myserver");
3793 assert_eq!(user, Some("admin"));
3794 assert_eq!(host, "myserver");
3795 }
3796
3797 #[test]
3798 fn test_parse_ssh_host_with_domain() {
3799 let (user, host) = parse_ssh_host("deploy@server.example.com");
3800 assert_eq!(user, Some("deploy"));
3801 assert_eq!(host, "server.example.com");
3802 }
3803
3804 #[test]
3805 fn test_parse_ssh_host_email_like() {
3806 let (user, host) = parse_ssh_host("user@host");
3808 assert_eq!(user, Some("user"));
3809 assert_eq!(host, "host");
3810 }
3811
3812 #[test]
3813 fn test_first_nonblank_username_priority_and_trimming() {
3814 assert_eq!(
3815 first_nonblank_username([Some(" alice "), Some("bob")]),
3816 Some("alice".to_string())
3817 );
3818 assert_eq!(
3819 first_nonblank_username([Some(" "), None, Some("carol")]),
3820 Some("carol".to_string())
3821 );
3822 assert_eq!(first_nonblank_username([None, Some("\t")]), None);
3823 }
3824
3825 #[test]
3826 fn test_expand_tilde_local_with_tilde_prefix() {
3827 let expanded = expand_tilde_local("~/Documents/file.txt");
3828 assert!(!expanded.starts_with('~'));
3830 assert!(expanded.ends_with("/Documents/file.txt"));
3831 }
3832
3833 #[test]
3834 fn test_expand_tilde_local_just_tilde() {
3835 let expanded = expand_tilde_local("~");
3836 assert!(!expanded.starts_with('~'));
3838 assert!(!expanded.is_empty());
3839 }
3840
3841 #[test]
3842 fn test_expand_tilde_local_no_tilde() {
3843 let path = "/absolute/path/to/file";
3844 let expanded = expand_tilde_local(path);
3845 assert_eq!(expanded, path);
3846 }
3847
3848 #[test]
3849 fn test_expand_tilde_local_tilde_in_middle() {
3850 let path = "/path/with/~tilde/inside";
3852 let expanded = expand_tilde_local(path);
3853 assert_eq!(expanded, path);
3854 }
3855}