1use std::path::{Path, PathBuf};
28use std::process::{Command, Stdio};
29use std::sync::OnceLock;
30use std::time::{Duration, Instant};
31
32use thiserror::Error;
33
34use super::{
35 config::{
36 SourceDefinition, SyncSchedule, discover_ssh_hosts, source_path_entry_error,
37 ssh_host_has_safe_token_chars, validate_optional_user_host_shape,
38 },
39 host_key_verification_error, is_host_key_verification_failure, strict_ssh_cli_tokens,
40 strict_ssh_command_for_rsync, wait_for_child_output_with_timeout,
41};
42use ssh2::{FileStat, Session, Sftp};
43use std::io::{Read as IoRead, Write as IoWrite};
44use std::net::{Shutdown, TcpStream};
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55enum RsyncArgProtection {
56 None,
59 ProtectArgs,
61 SecludedArgs,
63}
64
65impl RsyncArgProtection {
66 fn is_supported(self) -> bool {
67 !matches!(self, Self::None)
68 }
69
70 fn flag(self) -> Option<&'static str> {
73 match self {
74 Self::ProtectArgs => Some("--protect-args"),
75 Self::SecludedArgs => Some("--secluded-args"),
76 Self::None => None,
77 }
78 }
79}
80
81fn detect_rsync_arg_protection() -> RsyncArgProtection {
82 static CACHED: OnceLock<RsyncArgProtection> = OnceLock::new();
83 *CACHED.get_or_init(|| {
84 let Some(out) = Command::new("rsync").arg("--help").output().ok() else {
85 return RsyncArgProtection::None;
86 };
87 let mut combined = String::from_utf8_lossy(&out.stdout).into_owned();
91 combined.push_str(&String::from_utf8_lossy(&out.stderr));
92 if combined.contains("--secluded-args") {
97 RsyncArgProtection::SecludedArgs
98 } else if combined.contains("--protect-args") {
99 RsyncArgProtection::ProtectArgs
100 } else {
101 RsyncArgProtection::None
102 }
103 })
104}
105
106fn quote_remote_shell_path(path: &str) -> String {
107 format!("'{}'", path.replace('\'', r#"'\''"#))
113}
114
115fn remote_spec_for_shell_bound_copy(host: &str, remote_path: &str) -> String {
116 format!("{host}:{}", quote_remote_shell_path(remote_path))
120}
121
122fn remote_spec_for_rsync(host: &str, remote_path: &str, protect_args_supported: bool) -> String {
123 if protect_args_supported {
124 format!("{host}:{remote_path}")
126 } else {
127 remote_spec_for_shell_bound_copy(host, remote_path)
129 }
130}
131
132fn remote_spec_for_scp(host: &str, remote_path: &str) -> String {
133 remote_spec_for_shell_bound_copy(host, remote_path)
136}
137
138fn remote_find_regular_files_command(remote_path: &str) -> String {
139 format!(
140 "find -P {} -type f -print0",
141 quote_remote_shell_path(remote_path)
142 )
143}
144
145fn parse_remote_home_stdout(stdout: &[u8]) -> Option<String> {
146 let output = String::from_utf8_lossy(stdout);
147 for line in output.lines() {
148 if let Some(home) = line.trim().strip_prefix("CASS_HOME_MARKER:")
149 && home.starts_with('/')
150 && !home.contains('\0')
151 {
152 return Some(home.to_string());
153 }
154 }
155 None
156}
157
158fn parse_null_terminated_utf8_paths(bytes: &[u8]) -> Vec<String> {
159 bytes
160 .split(|byte| *byte == 0)
161 .filter(|part| !part.is_empty())
162 .filter_map(|part| std::str::from_utf8(part).ok())
163 .map(ToOwned::to_owned)
164 .collect()
165}
166
167fn validate_remote_sync_path_entry(index: usize, path: &str) -> Result<(), SyncError> {
168 match source_path_entry_error(index, path) {
169 Some(message) => Err(SyncError::InvalidPath(message)),
170 None => Ok(()),
171 }
172}
173
174fn invalid_remote_sync_path_result(remote_path: &str, err: SyncError) -> PathSyncResult {
175 PathSyncResult {
176 remote_path: remote_path.to_string(),
177 success: false,
178 error: Some(err.to_string()),
179 ..Default::default()
180 }
181}
182
183fn remote_file_to_safe_local_path(
184 remote_root: &Path,
185 remote_file: &Path,
186 local_container: &Path,
187 leaf_name: &str,
188) -> Option<PathBuf> {
189 let mut local_path = local_container.join(leaf_name);
190 if remote_file == remote_root {
191 return Some(local_path);
192 }
193
194 let relative = remote_file.strip_prefix(remote_root).ok()?;
195 for component in relative.components() {
196 match component {
197 std::path::Component::Normal(name) => local_path.push(name),
198 std::path::Component::CurDir => {}
199 _ => return None,
200 }
201 }
202
203 Some(local_path)
204}
205
206fn existing_local_symlink_below_root(root: &Path, path: &Path) -> Result<Option<PathBuf>, String> {
207 let rel = path.strip_prefix(root).map_err(|_| {
208 format!(
209 "Local path {} is outside sync root {}",
210 path.display(),
211 root.display()
212 )
213 })?;
214
215 let mut current = root.to_path_buf();
216 if let Some(link) = existing_path_symlink(¤t)? {
217 return Ok(Some(link));
218 }
219
220 for component in rel.components() {
221 match component {
222 std::path::Component::Normal(name) => current.push(name),
223 std::path::Component::CurDir => continue,
224 _ => {
225 return Err(format!(
226 "Local path {} contains unsafe component below sync root {}",
227 path.display(),
228 root.display()
229 ));
230 }
231 }
232
233 if let Some(link) = existing_path_symlink(¤t)? {
234 return Ok(Some(link));
235 }
236 }
237
238 Ok(None)
239}
240
241fn existing_path_symlink(path: &Path) -> Result<Option<PathBuf>, String> {
242 match std::fs::symlink_metadata(path) {
243 Ok(metadata) if metadata.file_type().is_symlink() => Ok(Some(path.to_path_buf())),
244 Ok(_) => Ok(None),
245 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
246 Err(e) => Err(format!("Failed to inspect {}: {}", path.display(), e)),
247 }
248}
249
250fn reject_local_symlink_below_root(root: &Path, path: &Path) -> Result<(), String> {
251 if let Some(link) = existing_local_symlink_below_root(root, path)? {
252 return Err(format!(
253 "Refusing to write {} through local symlink {}",
254 path.display(),
255 link.display()
256 ));
257 }
258
259 Ok(())
260}
261
262fn prepare_local_sync_container(sync_root: &Path, local_path: &Path) -> Result<(), String> {
263 reject_local_symlink_below_root(sync_root, local_path)?;
264 std::fs::create_dir_all(local_path)
265 .map_err(|e| format!("Failed to create directory: {}", e))?;
266 reject_local_symlink_below_root(sync_root, local_path)?;
267 Ok(())
268}
269
270fn sftp_file_stat_is_symlink(stat: &FileStat) -> bool {
271 stat.file_type().is_symlink()
272}
273
274#[derive(Error, Debug)]
276pub enum SyncError {
277 #[error("Source has no host configured")]
278 NoHost,
279
280 #[error("Source has no paths configured")]
281 NoPaths,
282
283 #[error("Invalid source path: {0}")]
284 InvalidPath(String),
285
286 #[error("Invalid source definition: {0}")]
287 InvalidSource(String),
288
289 #[error("rsync command failed: {0}")]
290 RsyncFailed(String),
291
292 #[error("Failed to create local directory: {0}")]
293 CreateDirFailed(#[from] std::io::Error),
294
295 #[error("SSH connection failed: {0}")]
296 SshFailed(String),
297
298 #[error("Connection timed out after {0} seconds")]
299 Timeout(u64),
300
301 #[error("Sync cancelled")]
302 Cancelled,
303}
304
305#[derive(Debug, Clone, Copy, PartialEq, Eq)]
307pub enum SyncMethod {
308 Rsync,
310 WslRsync,
313 Scp,
320 Sftp,
326}
327
328impl SyncMethod {
329 pub fn as_str(self) -> &'static str {
330 match self {
331 Self::Rsync => "rsync",
332 Self::WslRsync => "wsl-rsync",
333 Self::Scp => "scp",
334 Self::Sftp => "sftp",
335 }
336 }
337}
338
339impl std::fmt::Display for SyncMethod {
340 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
341 f.write_str(self.as_str())
342 }
343}
344
345#[derive(Debug, Clone, Default)]
347pub struct PathSyncResult {
348 pub remote_path: String,
350 pub local_path: PathBuf,
352 pub files_transferred: u64,
354 pub bytes_transferred: u64,
356 pub success: bool,
358 pub error: Option<String>,
360 pub duration_ms: u64,
362}
363
364#[derive(Debug, Clone)]
366pub struct SyncReport {
367 pub source_name: String,
369 pub method: SyncMethod,
371 pub path_results: Vec<PathSyncResult>,
373 pub total_duration_ms: u64,
375 pub all_succeeded: bool,
377}
378
379impl SyncReport {
380 pub fn new(source_name: impl Into<String>, method: SyncMethod) -> Self {
382 Self {
383 source_name: source_name.into(),
384 method,
385 path_results: Vec::new(),
386 total_duration_ms: 0,
387 all_succeeded: true,
388 }
389 }
390
391 pub fn failed(source_name: impl Into<String>, error: SyncError) -> Self {
393 Self {
394 source_name: source_name.into(),
395 method: SyncMethod::Rsync,
396 path_results: vec![PathSyncResult {
397 error: Some(error.to_string()),
398 success: false,
399 ..Default::default()
400 }],
401 total_duration_ms: 0,
402 all_succeeded: false,
403 }
404 }
405
406 pub fn add_path_result(&mut self, result: PathSyncResult) {
408 if !result.success {
409 self.all_succeeded = false;
410 }
411 self.path_results.push(result);
412 }
413
414 pub fn total_files(&self) -> u64 {
416 self.path_results.iter().map(|r| r.files_transferred).sum()
417 }
418
419 pub fn total_bytes(&self) -> u64 {
421 self.path_results.iter().map(|r| r.bytes_transferred).sum()
422 }
423
424 pub fn successful_paths(&self) -> usize {
426 self.path_results.iter().filter(|r| r.success).count()
427 }
428
429 pub fn failed_paths(&self) -> usize {
431 self.path_results.iter().filter(|r| !r.success).count()
432 }
433
434 pub fn sync_result(&self) -> SyncResult {
436 if self.all_succeeded {
437 SyncResult::Success
438 } else {
439 let errors: Vec<String> = self
440 .path_results
441 .iter()
442 .filter_map(|r| r.error.clone())
443 .collect();
444 if self.successful_paths() > 0 {
445 SyncResult::PartialFailure(errors.join("; "))
446 } else {
447 SyncResult::Failed(errors.join("; "))
448 }
449 }
450 }
451}
452
453#[derive(Debug, Default)]
455struct RsyncStats {
456 files_transferred: u64,
457 bytes_transferred: u64,
458}
459
460pub struct SyncEngine {
462 local_store: PathBuf,
465 connection_timeout: u64,
467 transfer_timeout: u64,
469}
470
471impl SyncEngine {
472 pub fn new(data_dir: &Path) -> Self {
477 Self {
478 local_store: data_dir.to_path_buf(),
479 connection_timeout: 10,
480 transfer_timeout: 300, }
482 }
483
484 pub fn with_connection_timeout(mut self, seconds: u64) -> Self {
486 self.connection_timeout = seconds;
487 self
488 }
489
490 pub fn with_transfer_timeout(mut self, seconds: u64) -> Self {
492 self.transfer_timeout = seconds;
493 self
494 }
495
496 pub fn mirror_dir(&self, source_name: &str) -> PathBuf {
498 self.local_store
499 .join("remotes")
500 .join(source_name)
501 .join("mirror")
502 }
503
504 fn get_remote_home(&self, host: &str) -> Result<String, SyncError> {
508 if host.trim().is_empty()
510 || host.starts_with('-')
511 || !ssh_host_has_safe_token_chars(host)
512 || validate_optional_user_host_shape(host).is_err()
513 {
514 return Err(SyncError::SshFailed(format!(
515 "Invalid characters in host: {}",
516 host
517 )));
518 }
519
520 let timeout_secs = self.connection_timeout.max(1);
521 let mut cmd = Command::new("ssh");
522 cmd.args(strict_ssh_cli_tokens(timeout_secs))
523 .arg("--")
524 .arg(host)
525 .arg("printf 'CASS_HOME_MARKER:%s\\n' \"$HOME\"")
526 .stdout(Stdio::piped())
527 .stderr(Stdio::piped());
528
529 let child = cmd
530 .spawn()
531 .map_err(|e| SyncError::SshFailed(format!("Failed to execute ssh: {}", e)))?;
532 let output = wait_for_child_output_with_timeout(child, Duration::from_secs(timeout_secs))
533 .map_err(|e| SyncError::SshFailed(format!("SSH command failed: {}", e)))?
534 .ok_or(SyncError::Timeout(timeout_secs))?;
535
536 if !output.status.success() {
537 let stderr = String::from_utf8_lossy(&output.stderr);
538 if is_host_key_verification_failure(&stderr) {
539 return Err(SyncError::SshFailed(host_key_verification_error(host)));
540 }
541 return Err(SyncError::SshFailed(format!(
542 "Failed to get remote home directory: {}",
543 stderr.trim()
544 )));
545 }
546
547 let remote_home = parse_remote_home_stdout(&output.stdout).ok_or_else(|| {
548 SyncError::SshFailed(
549 "Unable to parse remote home directory from SSH output".to_string(),
550 )
551 })?;
552
553 tracing::debug!(host = %host, remote_home = %remote_home, "got remote home directory");
554 Ok(remote_home)
555 }
556
557 fn expand_tilde_with_home(path: &str, remote_home: Option<&str>) -> String {
561 if !path.starts_with('~') {
562 return path.to_string();
563 }
564
565 let Some(home) = remote_home else {
566 return path.to_string();
567 };
568
569 if path == "~" {
570 home.to_string()
571 } else if let Some(rest) = path.strip_prefix("~/") {
572 format!("{}/{}", home, rest)
573 } else {
574 path.to_string()
576 }
577 }
578
579 pub fn detect_sync_method() -> SyncMethod {
593 if Command::new("rsync")
595 .arg("--version")
596 .output()
597 .map(|o| o.status.success())
598 .unwrap_or(false)
599 {
600 return SyncMethod::Rsync;
601 }
602
603 #[cfg(target_os = "windows")]
605 if Command::new("wsl")
606 .args(["rsync", "--version"])
607 .output()
608 .map(|o| o.status.success())
609 .unwrap_or(false)
610 {
611 return SyncMethod::WslRsync;
612 }
613
614 if Command::new("scp")
617 .arg("-S")
618 .arg("ssh")
619 .arg("--")
620 .output()
623 .is_ok()
624 {
625 if which_scp_exists() {
627 return SyncMethod::Scp;
628 }
629 }
630
631 SyncMethod::Sftp
633 }
634
635 pub fn sync_source(&self, source: &SourceDefinition) -> Result<SyncReport, SyncError> {
640 if !source.is_remote() {
641 return Err(SyncError::NoHost);
642 }
643
644 let host = source.host.as_ref().ok_or(SyncError::NoHost)?;
645
646 if source.paths.is_empty() {
647 return Err(SyncError::NoPaths);
648 }
649
650 source
651 .validate_structure()
652 .map_err(|e| SyncError::InvalidSource(e.to_string()))?;
653
654 let method = Self::detect_sync_method();
655 let mut report = SyncReport::new(&source.name, method);
656 let overall_start = Instant::now();
657
658 let mirror_dir = self.mirror_dir(&source.name);
660 std::fs::create_dir_all(&mirror_dir)?;
661
662 let remote_home = if source.paths.iter().enumerate().any(|(index, path)| {
664 path.starts_with('~') && validate_remote_sync_path_entry(index, path).is_ok()
665 }) {
666 match self.get_remote_home(host) {
667 Ok(home) => Some(home),
668 Err(e) => {
669 tracing::warn!(host = %host, error = %e, "Failed to get remote home directory");
670 None
671 }
672 }
673 } else {
674 None
675 };
676
677 for (index, remote_path) in source.paths.iter().enumerate() {
678 if let Err(err) = validate_remote_sync_path_entry(index, remote_path) {
679 report.add_path_result(invalid_remote_sync_path_result(remote_path, err));
680 continue;
681 }
682
683 let result = match method {
684 SyncMethod::Rsync => {
685 self.sync_path_rsync(host, remote_path, &mirror_dir, remote_home.as_deref())
686 }
687 SyncMethod::WslRsync => {
688 self.sync_path_wsl_rsync(host, remote_path, &mirror_dir, remote_home.as_deref())
689 }
690 SyncMethod::Scp => {
691 self.sync_path_scp(host, remote_path, &mirror_dir, remote_home.as_deref())
692 }
693 SyncMethod::Sftp => {
694 self.sync_path_sftp(host, remote_path, &mirror_dir, remote_home.as_deref())
695 }
696 };
697 report.add_path_result(result);
698 }
699
700 report.total_duration_ms = overall_start.elapsed().as_millis() as u64;
701 Ok(report)
702 }
703
704 pub fn sync_all(
708 &self,
709 sources: impl Iterator<Item = impl std::borrow::Borrow<SourceDefinition>>,
710 ) -> Vec<SyncReport> {
711 sources
712 .map(|source| {
713 let source = source.borrow();
714 self.sync_source(source)
715 .unwrap_or_else(|e| SyncReport::failed(&source.name, e))
716 })
717 .collect()
718 }
719
720 fn sync_path_rsync(
727 &self,
728 host: &str,
729 remote_path: &str,
730 dest_dir: &Path,
731 remote_home: Option<&str>,
732 ) -> PathSyncResult {
733 let start = Instant::now();
734 if remote_path.starts_with('~') && remote_home.is_none() {
735 let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
736 return PathSyncResult {
737 remote_path: remote_path.to_string(),
738 local_path,
739 success: false,
740 error: Some(
741 "Cannot expand '~' in remote path; failed to determine remote home directory"
742 .to_string(),
743 ),
744 duration_ms: start.elapsed().as_millis() as u64,
745 ..Default::default()
746 };
747 }
748
749 let expanded_path = Self::expand_tilde_with_home(remote_path, remote_home);
751
752 if remote_path.starts_with('~') && expanded_path == remote_path {
754 tracing::warn!(
755 remote_path = %remote_path,
756 "Could not expand tilde in path (remote home directory not available)"
757 );
758 }
759
760 let safe_name = path_to_safe_dirname(remote_path);
763 let local_path = dest_dir.join(&safe_name);
764
765 if let Err(e) = prepare_local_sync_container(dest_dir, &local_path) {
767 return PathSyncResult {
768 remote_path: remote_path.to_string(),
769 local_path: local_path.clone(),
770 success: false,
771 error: Some(e),
772 duration_ms: start.elapsed().as_millis() as u64,
773 ..Default::default()
774 };
775 }
776
777 let arg_protection = detect_rsync_arg_protection();
780 let protect_args_supported = arg_protection.is_supported();
781 let remote_spec = remote_spec_for_rsync(host, &expanded_path, protect_args_supported);
782 let ssh_opts = strict_ssh_command_for_rsync(self.connection_timeout);
783
784 let local_path_str = match local_path.to_str() {
785 Some(s) => s,
786 None => {
787 return PathSyncResult {
788 remote_path: remote_path.to_string(),
789 local_path,
790 success: false,
791 error: Some("Local path contains invalid UTF-8".to_string()),
792 duration_ms: start.elapsed().as_millis() as u64,
793 ..Default::default()
794 };
795 }
796 };
797
798 let timeout_str = self.transfer_timeout.to_string();
799 let mut cmd = Command::new("rsync");
800 cmd.args(["-avz", "--links", "--safe-links", "--stats", "--partial"]);
801 if let Some(flag) = arg_protection.flag() {
802 cmd.arg(flag);
803 }
804 cmd.args([
805 "--timeout",
806 &timeout_str,
807 "-e",
808 &ssh_opts,
809 "--",
810 &remote_spec,
811 local_path_str,
812 ]);
813
814 tracing::debug!(
815 host = %host,
816 remote_path = %expanded_path,
817 local_path = %local_path.display(),
818 "starting rsync"
819 );
820
821 let output = match cmd.output() {
822 Ok(o) => o,
823 Err(e) => {
824 return PathSyncResult {
825 remote_path: remote_path.to_string(),
826 local_path,
827 success: false,
828 error: Some(format!("Failed to execute rsync: {}", e)),
829 duration_ms: start.elapsed().as_millis() as u64,
830 ..Default::default()
831 };
832 }
833 };
834
835 let duration_ms = start.elapsed().as_millis() as u64;
836 let stdout = String::from_utf8_lossy(&output.stdout);
837 let stderr = String::from_utf8_lossy(&output.stderr);
838
839 if !output.status.success() {
840 let error_msg = if stderr.contains("Connection refused")
842 || stderr.contains("Connection timed out")
843 {
844 format!("SSH connection failed: {}", stderr.trim())
845 } else if is_host_key_verification_failure(&stderr) {
846 host_key_verification_error(host)
847 } else if stderr.contains("No such file or directory") {
848 format!("Remote path not found: {}", expanded_path)
849 } else if stderr.contains("Permission denied") {
850 format!("Permission denied: {}", stderr.trim())
851 } else {
852 format!("rsync failed: {}", stderr.trim())
853 };
854
855 tracing::warn!(
856 host = %host,
857 remote_path = %expanded_path,
858 error = %error_msg,
859 "rsync failed"
860 );
861
862 return PathSyncResult {
863 remote_path: remote_path.to_string(),
864 local_path,
865 success: false,
866 error: Some(error_msg),
867 duration_ms,
868 ..Default::default()
869 };
870 }
871
872 let stats = parse_rsync_stats(&stdout);
874
875 tracing::info!(
876 host = %host,
877 remote_path = %expanded_path,
878 files = stats.files_transferred,
879 bytes = stats.bytes_transferred,
880 duration_ms,
881 "rsync completed"
882 );
883
884 PathSyncResult {
885 remote_path: remote_path.to_string(),
886 local_path,
887 files_transferred: stats.files_transferred,
888 bytes_transferred: stats.bytes_transferred,
889 success: true,
890 error: None,
891 duration_ms,
892 }
893 }
894
895 fn sync_path_wsl_rsync(
900 &self,
901 host: &str,
902 remote_path: &str,
903 dest_dir: &Path,
904 remote_home: Option<&str>,
905 ) -> PathSyncResult {
906 let start = Instant::now();
907
908 if remote_path.starts_with('~') && remote_home.is_none() {
909 let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
910 return PathSyncResult {
911 remote_path: remote_path.to_string(),
912 local_path,
913 success: false,
914 error: Some(
915 "Cannot expand '~' in remote path; failed to determine remote home directory"
916 .to_string(),
917 ),
918 duration_ms: start.elapsed().as_millis() as u64,
919 ..Default::default()
920 };
921 }
922
923 let expanded_path = Self::expand_tilde_with_home(remote_path, remote_home);
924 let safe_name = path_to_safe_dirname(remote_path);
925 let local_path = dest_dir.join(&safe_name);
926
927 if let Err(e) = prepare_local_sync_container(dest_dir, &local_path) {
928 return PathSyncResult {
929 remote_path: remote_path.to_string(),
930 local_path,
931 success: false,
932 error: Some(e),
933 duration_ms: start.elapsed().as_millis() as u64,
934 ..Default::default()
935 };
936 }
937
938 let local_path_str = match local_path.to_str() {
939 Some(s) => s,
940 None => {
941 return PathSyncResult {
942 remote_path: remote_path.to_string(),
943 local_path,
944 success: false,
945 error: Some("Local path contains invalid UTF-8".to_string()),
946 duration_ms: start.elapsed().as_millis() as u64,
947 ..Default::default()
948 };
949 }
950 };
951
952 let wsl_dest = windows_path_to_wsl(local_path_str);
956
957 let remote_spec = remote_spec_for_rsync(host, &expanded_path, true);
958 let ssh_opts = strict_ssh_command_for_rsync(self.connection_timeout);
959 let timeout_str = self.transfer_timeout.to_string();
960
961 let mut cmd = Command::new("wsl");
962 cmd.args([
963 "rsync",
964 "-avz",
965 "--links",
966 "--safe-links",
967 "--stats",
968 "--partial",
969 ]);
970 cmd.arg("--protect-args");
972 cmd.args([
973 "--timeout",
974 &timeout_str,
975 "-e",
976 &ssh_opts,
977 "--",
978 &remote_spec,
979 &wsl_dest,
980 ]);
981
982 tracing::debug!(
983 host = %host,
984 remote_path = %expanded_path,
985 local_path = %local_path.display(),
986 wsl_dest = %wsl_dest,
987 "starting wsl rsync"
988 );
989
990 let output = match cmd.output() {
991 Ok(o) => o,
992 Err(e) => {
993 return PathSyncResult {
994 remote_path: remote_path.to_string(),
995 local_path,
996 success: false,
997 error: Some(format!("Failed to execute wsl rsync: {}", e)),
998 duration_ms: start.elapsed().as_millis() as u64,
999 ..Default::default()
1000 };
1001 }
1002 };
1003
1004 let duration_ms = start.elapsed().as_millis() as u64;
1005 let stdout = String::from_utf8_lossy(&output.stdout);
1006 let stderr = String::from_utf8_lossy(&output.stderr);
1007
1008 if !output.status.success() {
1009 let error_msg = if stderr.contains("Connection refused")
1010 || stderr.contains("Connection timed out")
1011 {
1012 format!("SSH connection failed: {}", stderr.trim())
1013 } else if is_host_key_verification_failure(&stderr) {
1014 host_key_verification_error(host)
1015 } else if stderr.contains("No such file or directory") {
1016 format!("Remote path not found: {}", expanded_path)
1017 } else if stderr.contains("Permission denied") {
1018 format!("Permission denied: {}", stderr.trim())
1019 } else {
1020 format!("wsl rsync failed: {}", stderr.trim())
1021 };
1022
1023 tracing::warn!(
1024 host = %host,
1025 remote_path = %expanded_path,
1026 error = %error_msg,
1027 "wsl rsync failed"
1028 );
1029
1030 return PathSyncResult {
1031 remote_path: remote_path.to_string(),
1032 local_path,
1033 success: false,
1034 error: Some(error_msg),
1035 duration_ms,
1036 ..Default::default()
1037 };
1038 }
1039
1040 let stats = parse_rsync_stats(&stdout);
1041
1042 tracing::info!(
1043 host = %host,
1044 remote_path = %expanded_path,
1045 files = stats.files_transferred,
1046 bytes = stats.bytes_transferred,
1047 duration_ms,
1048 "wsl rsync completed"
1049 );
1050
1051 PathSyncResult {
1052 remote_path: remote_path.to_string(),
1053 local_path,
1054 files_transferred: stats.files_transferred,
1055 bytes_transferred: stats.bytes_transferred,
1056 success: true,
1057 error: None,
1058 duration_ms,
1059 }
1060 }
1061
1062 fn sync_path_scp(
1072 &self,
1073 host: &str,
1074 remote_path: &str,
1075 dest_dir: &Path,
1076 remote_home: Option<&str>,
1077 ) -> PathSyncResult {
1078 let start = Instant::now();
1079
1080 if remote_path.starts_with('~') && remote_home.is_none() {
1081 let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
1082 return PathSyncResult {
1083 remote_path: remote_path.to_string(),
1084 local_path,
1085 success: false,
1086 error: Some(
1087 "Cannot expand '~' in remote path; failed to determine remote home directory"
1088 .to_string(),
1089 ),
1090 duration_ms: start.elapsed().as_millis() as u64,
1091 ..Default::default()
1092 };
1093 }
1094
1095 let expanded_path = Self::expand_tilde_with_home(remote_path, remote_home);
1096 let safe_name = path_to_safe_dirname(remote_path);
1097 let local_path = dest_dir.join(&safe_name);
1098
1099 if let Err(e) = prepare_local_sync_container(dest_dir, &local_path) {
1100 return PathSyncResult {
1101 remote_path: remote_path.to_string(),
1102 local_path,
1103 success: false,
1104 error: Some(e),
1105 duration_ms: start.elapsed().as_millis() as u64,
1106 ..Default::default()
1107 };
1108 }
1109
1110 let connect_timeout = self.connection_timeout.to_string();
1113 let find_command = remote_find_regular_files_command(&expanded_path);
1114
1115 tracing::debug!(
1116 host = %host,
1117 remote_path = %expanded_path,
1118 local_path = %local_path.display(),
1119 "listing regular files for scp sync"
1120 );
1121
1122 let timeout_secs = self.connection_timeout.max(1);
1123 let mut cmd = Command::new("ssh");
1124 cmd.args(strict_ssh_cli_tokens(timeout_secs))
1125 .arg("--")
1126 .arg(host)
1127 .arg(&find_command)
1128 .stdout(Stdio::piped())
1129 .stderr(Stdio::piped());
1130
1131 let output = match cmd.spawn().and_then(|child| {
1132 wait_for_child_output_with_timeout(child, Duration::from_secs(timeout_secs))
1133 }) {
1134 Ok(Some(o)) => o,
1135 Ok(None) => {
1136 return PathSyncResult {
1137 remote_path: remote_path.to_string(),
1138 local_path,
1139 success: false,
1140 error: Some(format!(
1141 "SSH file listing timed out after {timeout_secs} seconds"
1142 )),
1143 duration_ms: start.elapsed().as_millis() as u64,
1144 ..Default::default()
1145 };
1146 }
1147 Err(e) => {
1148 return PathSyncResult {
1149 remote_path: remote_path.to_string(),
1150 local_path,
1151 success: false,
1152 error: Some(format!("Failed to execute ssh file listing: {}", e)),
1153 duration_ms: start.elapsed().as_millis() as u64,
1154 ..Default::default()
1155 };
1156 }
1157 };
1158
1159 let stderr = String::from_utf8_lossy(&output.stderr);
1160 if !output.status.success() {
1161 let error_msg = if stderr.contains("Connection refused")
1162 || stderr.contains("Connection timed out")
1163 {
1164 format!("SSH connection failed: {}", stderr.trim())
1165 } else if is_host_key_verification_failure(&stderr) {
1166 host_key_verification_error(host)
1167 } else if stderr.contains("No such file or directory") {
1168 format!("Remote path not found: {}", expanded_path)
1169 } else if stderr.contains("Permission denied") {
1170 format!("Permission denied: {}", stderr.trim())
1171 } else {
1172 format!("Remote file listing failed: {}", stderr.trim())
1173 };
1174
1175 tracing::warn!(
1176 host = %host,
1177 remote_path = %expanded_path,
1178 error = %error_msg,
1179 "scp file listing failed"
1180 );
1181
1182 return PathSyncResult {
1183 remote_path: remote_path.to_string(),
1184 local_path,
1185 success: false,
1186 error: Some(error_msg),
1187 duration_ms: start.elapsed().as_millis() as u64,
1188 ..Default::default()
1189 };
1190 }
1191
1192 let remote_files = parse_null_terminated_utf8_paths(&output.stdout);
1193 let remote_root = Path::new(&expanded_path);
1194 let leaf_name = Path::new(remote_path)
1195 .file_name()
1196 .and_then(|n| n.to_str())
1197 .unwrap_or("remote");
1198 let mut files_transferred = 0u64;
1199 let mut bytes_transferred = 0u64;
1200
1201 for remote_file in remote_files {
1202 let remote_file_path = Path::new(&remote_file);
1203 let Some(local_file) = remote_file_to_safe_local_path(
1204 remote_root,
1205 remote_file_path,
1206 &local_path,
1207 leaf_name,
1208 ) else {
1209 tracing::warn!(
1210 remote_path = %remote_file,
1211 root = %expanded_path,
1212 "skipping scp file outside listed root"
1213 );
1214 continue;
1215 };
1216
1217 if let Err(e) = reject_local_symlink_below_root(&local_path, &local_file) {
1218 return PathSyncResult {
1219 remote_path: remote_path.to_string(),
1220 local_path,
1221 success: false,
1222 error: Some(e),
1223 duration_ms: start.elapsed().as_millis() as u64,
1224 ..Default::default()
1225 };
1226 }
1227
1228 if let Some(parent) = local_file.parent() {
1229 if let Err(e) = std::fs::create_dir_all(parent) {
1230 return PathSyncResult {
1231 remote_path: remote_path.to_string(),
1232 local_path,
1233 success: false,
1234 error: Some(format!("Failed to create {}: {}", parent.display(), e)),
1235 duration_ms: start.elapsed().as_millis() as u64,
1236 ..Default::default()
1237 };
1238 }
1239
1240 if let Err(e) = reject_local_symlink_below_root(&local_path, parent) {
1241 return PathSyncResult {
1242 remote_path: remote_path.to_string(),
1243 local_path,
1244 success: false,
1245 error: Some(e),
1246 duration_ms: start.elapsed().as_millis() as u64,
1247 ..Default::default()
1248 };
1249 }
1250 }
1251
1252 if let Err(e) = reject_local_symlink_below_root(&local_path, &local_file) {
1253 return PathSyncResult {
1254 remote_path: remote_path.to_string(),
1255 local_path,
1256 success: false,
1257 error: Some(e),
1258 duration_ms: start.elapsed().as_millis() as u64,
1259 ..Default::default()
1260 };
1261 }
1262
1263 let temp_path =
1264 unique_atomic_sidecar_path(&local_file, "download", "cass-sync-scp-download");
1265 let Some(temp_path_str) = temp_path.to_str() else {
1266 return PathSyncResult {
1267 remote_path: remote_path.to_string(),
1268 local_path,
1269 success: false,
1270 error: Some("Local path contains invalid UTF-8".to_string()),
1271 duration_ms: start.elapsed().as_millis() as u64,
1272 ..Default::default()
1273 };
1274 };
1275 if let Err(e) = std::fs::OpenOptions::new()
1276 .write(true)
1277 .create_new(true)
1278 .open(&temp_path)
1279 .and_then(|file| file.sync_all())
1280 {
1281 return PathSyncResult {
1282 remote_path: remote_path.to_string(),
1283 local_path,
1284 success: false,
1285 error: Some(format!("Failed to create {}: {}", temp_path.display(), e)),
1286 duration_ms: start.elapsed().as_millis() as u64,
1287 ..Default::default()
1288 };
1289 }
1290
1291 let remote_spec = remote_spec_for_scp(host, &remote_file);
1292 let mut cmd = Command::new("scp");
1293 cmd.args([
1294 "-B",
1295 "-o",
1296 &format!("ConnectTimeout={}", connect_timeout),
1297 "-o",
1298 "ServerAliveInterval=15",
1299 "-o",
1300 "ServerAliveCountMax=3",
1301 "-o",
1302 "StrictHostKeyChecking=yes",
1303 "--",
1304 &remote_spec,
1305 temp_path_str,
1306 ]);
1307
1308 let output = match cmd.output() {
1309 Ok(o) => o,
1310 Err(e) => {
1311 return PathSyncResult {
1312 remote_path: remote_path.to_string(),
1313 local_path,
1314 success: false,
1315 error: Some(format!("Failed to execute scp: {}", e)),
1316 duration_ms: start.elapsed().as_millis() as u64,
1317 ..Default::default()
1318 };
1319 }
1320 };
1321
1322 if !output.status.success() {
1323 let _ = std::fs::remove_file(&temp_path);
1324 let stderr = String::from_utf8_lossy(&output.stderr);
1325 let error_msg = if is_host_key_verification_failure(&stderr) {
1326 host_key_verification_error(host)
1327 } else if stderr.contains("Permission denied") {
1328 format!("Permission denied: {}", stderr.trim())
1329 } else {
1330 format!("scp failed: {}", stderr.trim())
1331 };
1332
1333 tracing::warn!(
1334 host = %host,
1335 remote_path = %remote_file,
1336 error = %error_msg,
1337 "scp file transfer failed"
1338 );
1339
1340 return PathSyncResult {
1341 remote_path: remote_path.to_string(),
1342 local_path,
1343 success: false,
1344 error: Some(error_msg),
1345 duration_ms: start.elapsed().as_millis() as u64,
1346 ..Default::default()
1347 };
1348 }
1349
1350 files_transferred += 1;
1351 if let Err(e) = sync_file_path(&temp_path) {
1352 return PathSyncResult {
1353 remote_path: remote_path.to_string(),
1354 local_path,
1355 success: false,
1356 error: Some(format!("Failed to sync {}: {}", temp_path.display(), e)),
1357 duration_ms: start.elapsed().as_millis() as u64,
1358 ..Default::default()
1359 };
1360 }
1361 if let Ok(metadata) = std::fs::metadata(&temp_path) {
1362 bytes_transferred = bytes_transferred.saturating_add(metadata.len());
1363 }
1364 if let Err(e) = replace_file_from_temp(&temp_path, &local_file) {
1365 return PathSyncResult {
1366 remote_path: remote_path.to_string(),
1367 local_path,
1368 success: false,
1369 error: Some(format!(
1370 "Failed to publish {} to {}: {}",
1371 temp_path.display(),
1372 local_file.display(),
1373 e
1374 )),
1375 duration_ms: start.elapsed().as_millis() as u64,
1376 ..Default::default()
1377 };
1378 }
1379 }
1380
1381 let duration_ms = start.elapsed().as_millis() as u64;
1382
1383 tracing::info!(
1384 host = %host,
1385 remote_path = %expanded_path,
1386 files = files_transferred,
1387 bytes = bytes_transferred,
1388 duration_ms,
1389 "scp sync completed"
1390 );
1391
1392 PathSyncResult {
1393 remote_path: remote_path.to_string(),
1394 local_path,
1395 files_transferred,
1396 bytes_transferred,
1397 success: true,
1398 error: None,
1399 duration_ms,
1400 }
1401 }
1402
1403 fn sync_path_sftp(
1408 &self,
1409 host: &str,
1410 remote_path: &str,
1411 dest_dir: &Path,
1412 remote_home: Option<&str>,
1413 ) -> PathSyncResult {
1414 let start = Instant::now();
1415 if remote_path.starts_with('~') && remote_home.is_none() {
1416 let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
1417 return PathSyncResult {
1418 remote_path: remote_path.to_string(),
1419 local_path,
1420 success: false,
1421 error: Some(
1422 "Cannot expand '~' in remote path; failed to determine remote home directory"
1423 .to_string(),
1424 ),
1425 duration_ms: start.elapsed().as_millis() as u64,
1426 ..Default::default()
1427 };
1428 }
1429 let expanded_path = Self::expand_tilde_with_home(remote_path, remote_home);
1430 let local_path = dest_dir.join(path_to_safe_dirname(remote_path));
1432
1433 if let Err(e) = prepare_local_sync_container(dest_dir, &local_path) {
1435 return PathSyncResult {
1436 remote_path: remote_path.to_string(),
1437 local_path,
1438 success: false,
1439 error: Some(e),
1440 duration_ms: start.elapsed().as_millis() as u64,
1441 ..Default::default()
1442 };
1443 }
1444
1445 let (ssh_user, ssh_host) = parse_ssh_host(host);
1447
1448 let ssh_config = discover_ssh_hosts();
1451 let host_config = ssh_config.iter().find(|h| h.name == ssh_host).or_else(|| {
1452 ssh_config
1453 .iter()
1454 .find(|h| h.hostname.as_deref() == Some(ssh_host))
1455 });
1456
1457 let hostname = host_config
1459 .and_then(|h| h.hostname.as_deref())
1460 .unwrap_or(ssh_host);
1461 let port = host_config.and_then(|h| h.port).unwrap_or(22);
1462 let username = match first_nonblank_username([
1464 ssh_user,
1465 host_config.and_then(|h| h.user.as_deref()),
1466 ])
1467 .or_else(|| env_username("USER"))
1468 .or_else(|| env_username("LOGNAME"))
1469 {
1470 Some(user) => user,
1471 None => {
1472 return PathSyncResult {
1473 remote_path: remote_path.to_string(),
1474 local_path,
1475 success: false,
1476 error: Some(format!(
1477 "Unable to determine SSH username for host '{}' (missing/blank user@host, SSH config user, USER, and LOGNAME)",
1478 host
1479 )),
1480 duration_ms: start.elapsed().as_millis() as u64,
1481 ..Default::default()
1482 };
1483 }
1484 };
1485 let identity_file = host_config.and_then(|h| h.identity_file.as_deref());
1486
1487 tracing::debug!(
1488 hostname = %hostname,
1489 port,
1490 username = %username,
1491 identity_file = ?identity_file,
1492 remote_path = %expanded_path,
1493 "SFTP connection parameters"
1494 );
1495
1496 let conn_timeout = std::time::Duration::from_secs(self.connection_timeout);
1498 let addr = format!("{}:{}", hostname, port);
1499 let sock_addr: std::net::SocketAddr = match addr.parse().or_else(|_| {
1500 use std::net::ToSocketAddrs;
1502 (hostname, port)
1503 .to_socket_addrs()
1504 .ok()
1505 .and_then(|mut addrs| addrs.next())
1506 .ok_or(std::io::Error::new(
1507 std::io::ErrorKind::InvalidInput,
1508 "cannot resolve hostname",
1509 ))
1510 }) {
1511 Ok(a) => a,
1512 Err(e) => {
1513 return PathSyncResult {
1514 remote_path: remote_path.to_string(),
1515 local_path,
1516 success: false,
1517 error: Some(format!("DNS resolution failed for {hostname}:{port}: {e}")),
1518 duration_ms: start.elapsed().as_millis() as u64,
1519 ..Default::default()
1520 };
1521 }
1522 };
1523 let tcp = match TcpStream::connect_timeout(&sock_addr, conn_timeout) {
1524 Ok(t) => t,
1525 Err(e) => {
1526 return PathSyncResult {
1527 remote_path: remote_path.to_string(),
1528 local_path,
1529 success: false,
1530 error: Some(format!(
1531 "TCP connection failed to {}:{}: {}",
1532 hostname, port, e
1533 )),
1534 duration_ms: start.elapsed().as_millis() as u64,
1535 ..Default::default()
1536 };
1537 }
1538 };
1539
1540 let timeout = std::time::Duration::from_secs(self.transfer_timeout);
1542 if let Err(e) = tcp.set_read_timeout(Some(timeout)) {
1543 tracing::warn!("Failed to set TCP read timeout: {}", e);
1544 }
1545 if let Err(e) = tcp.set_write_timeout(Some(timeout)) {
1546 tracing::warn!("Failed to set TCP write timeout: {}", e);
1547 }
1548 let tcp_shutdown = tcp.try_clone().ok();
1549
1550 let mut session = match Session::new() {
1552 Ok(s) => s,
1553 Err(e) => {
1554 let _ = tcp.shutdown(Shutdown::Both);
1555 return PathSyncResult {
1556 remote_path: remote_path.to_string(),
1557 local_path,
1558 success: false,
1559 error: Some(format!("Failed to create SSH session: {}", e)),
1560 duration_ms: start.elapsed().as_millis() as u64,
1561 ..Default::default()
1562 };
1563 }
1564 };
1565
1566 session.set_tcp_stream(tcp);
1567 let close_connections = |session: &mut Session, reason: &str| {
1568 let _ = session.disconnect(None, reason, None);
1569 if let Some(stream) = tcp_shutdown.as_ref() {
1570 let _ = stream.shutdown(Shutdown::Both);
1571 }
1572 };
1573
1574 if let Err(e) = session.handshake() {
1575 close_connections(&mut session, "handshake failed");
1576 return PathSyncResult {
1577 remote_path: remote_path.to_string(),
1578 local_path,
1579 success: false,
1580 error: Some(format!("SSH handshake failed: {}", e)),
1581 duration_ms: start.elapsed().as_millis() as u64,
1582 ..Default::default()
1583 };
1584 }
1585
1586 if let Err(e) = self.authenticate_ssh(&session, &username, identity_file) {
1588 close_connections(&mut session, "authentication failed");
1589 return PathSyncResult {
1590 remote_path: remote_path.to_string(),
1591 local_path,
1592 success: false,
1593 error: Some(format!("SSH authentication failed: {}", e)),
1594 duration_ms: start.elapsed().as_millis() as u64,
1595 ..Default::default()
1596 };
1597 }
1598
1599 let sftp = match session.sftp() {
1601 Ok(s) => s,
1602 Err(e) => {
1603 close_connections(&mut session, "sftp open failed");
1604 return PathSyncResult {
1605 remote_path: remote_path.to_string(),
1606 local_path,
1607 success: false,
1608 error: Some(format!("Failed to open SFTP session: {}", e)),
1609 duration_ms: start.elapsed().as_millis() as u64,
1610 ..Default::default()
1611 };
1612 }
1613 };
1614
1615 tracing::info!(
1616 host = %host,
1617 remote_path = %expanded_path,
1618 local_path = %local_path.display(),
1619 "starting SFTP sync"
1620 );
1621
1622 let mut files_transferred = 0u64;
1624 let mut bytes_transferred = 0u64;
1625
1626 let leaf_name = Path::new(remote_path)
1629 .file_name()
1630 .and_then(|n| n.to_str())
1631 .unwrap_or("remote");
1632 let target_local_path = local_path.join(leaf_name);
1633
1634 if let Err(e) = self.sftp_download_recursive(
1635 &sftp,
1636 Path::new(&expanded_path),
1637 &target_local_path,
1638 &local_path,
1639 &mut files_transferred,
1640 &mut bytes_transferred,
1641 ) {
1642 close_connections(&mut session, "sftp download failed");
1643 return PathSyncResult {
1644 remote_path: remote_path.to_string(),
1645 local_path,
1646 files_transferred,
1647 bytes_transferred,
1648 success: false,
1649 error: Some(format!("SFTP download failed: {}", e)),
1650 duration_ms: start.elapsed().as_millis() as u64,
1651 };
1652 }
1653
1654 let duration_ms = start.elapsed().as_millis() as u64;
1655
1656 tracing::info!(
1657 host = %host,
1658 remote_path = %expanded_path,
1659 files = files_transferred,
1660 bytes = bytes_transferred,
1661 duration_ms,
1662 "SFTP sync completed"
1663 );
1664
1665 close_connections(&mut session, "sync complete");
1666 PathSyncResult {
1667 remote_path: remote_path.to_string(),
1668 local_path,
1669 files_transferred,
1670 bytes_transferred,
1671 success: true,
1672 error: None,
1673 duration_ms,
1674 }
1675 }
1676
1677 fn authenticate_ssh(
1679 &self,
1680 session: &Session,
1681 username: &str,
1682 identity_file: Option<&str>,
1683 ) -> Result<(), String> {
1684 if let Ok(mut agent) = session.agent()
1686 && agent.connect().is_ok()
1687 && agent.list_identities().is_ok()
1688 {
1689 for identity in agent.identities().unwrap_or_default() {
1690 if agent.userauth(username, &identity).is_ok() && session.authenticated() {
1691 tracing::debug!("Authenticated via SSH agent");
1692 return Ok(());
1693 }
1694 }
1695 }
1696
1697 if let Some(key_path) = identity_file {
1699 let key_path_expanded = expand_tilde_local(key_path);
1700 let key_path_buf = Path::new(&key_path_expanded);
1701
1702 if key_path_buf.exists()
1703 && session
1704 .userauth_pubkey_file(username, None, key_path_buf, None)
1705 .is_ok()
1706 && session.authenticated()
1707 {
1708 tracing::debug!(key = %key_path_buf.display(), "Authenticated via key file");
1709 return Ok(());
1710 }
1711 }
1712
1713 if let Some(home) = dirs::home_dir() {
1715 for key_name in ["id_ed25519", "id_rsa", "id_ecdsa"] {
1716 let key_path = home.join(".ssh").join(key_name);
1717 if key_path.exists()
1718 && session
1719 .userauth_pubkey_file(username, None, &key_path, None)
1720 .is_ok()
1721 && session.authenticated()
1722 {
1723 tracing::debug!(key = %key_path.display(), "Authenticated via default key");
1724 return Ok(());
1725 }
1726 }
1727 }
1728
1729 Err(format!(
1730 "No valid authentication method found for user '{}'",
1731 username
1732 ))
1733 }
1734
1735 fn sftp_download_recursive(
1737 &self,
1738 sftp: &Sftp,
1739 remote_path: &Path,
1740 local_path: &Path,
1741 local_root: &Path,
1742 files_transferred: &mut u64,
1743 bytes_transferred: &mut u64,
1744 ) -> Result<(), String> {
1745 let stat = sftp
1748 .lstat(remote_path)
1749 .map_err(|e| format!("Failed to lstat {}: {}", remote_path.display(), e))?;
1750
1751 if sftp_file_stat_is_symlink(&stat) {
1752 tracing::warn!(
1753 path = %remote_path.display(),
1754 "Skipping remote symlink during SFTP sync"
1755 );
1756 return Ok(());
1757 }
1758
1759 if stat.is_dir() {
1760 reject_local_symlink_below_root(local_root, local_path)?;
1762 std::fs::create_dir_all(local_path)
1763 .map_err(|e| format!("Failed to create {}: {}", local_path.display(), e))?;
1764 reject_local_symlink_below_root(local_root, local_path)?;
1765
1766 let entries = sftp
1768 .readdir(remote_path)
1769 .map_err(|e| format!("Failed to list {}: {}", remote_path.display(), e))?;
1770
1771 for (entry_path, _entry_stat) in entries {
1772 let Some(file_name) = sftp_entry_file_name(&entry_path, remote_path) else {
1773 continue;
1774 };
1775
1776 let entry_stat = sftp
1777 .lstat(&entry_path)
1778 .map_err(|e| format!("Failed to lstat {}: {}", entry_path.display(), e))?;
1779 if sftp_file_stat_is_symlink(&entry_stat) {
1780 tracing::warn!(
1781 path = %entry_path.display(),
1782 "Skipping remote symlink during SFTP sync"
1783 );
1784 continue;
1785 }
1786
1787 let local_entry = local_path.join(file_name);
1788
1789 if entry_stat.is_dir() {
1790 self.sftp_download_recursive(
1792 sftp,
1793 &entry_path,
1794 &local_entry,
1795 local_root,
1796 files_transferred,
1797 bytes_transferred,
1798 )?;
1799 } else if entry_stat.is_file() {
1800 if self.sftp_download_file(
1802 sftp,
1803 &entry_path,
1804 &local_entry,
1805 local_root,
1806 bytes_transferred,
1807 )? {
1808 *files_transferred += 1;
1809 }
1810 }
1811 }
1813 } else if stat.is_file() {
1814 if let Some(parent) = local_path.parent() {
1816 reject_local_symlink_below_root(local_root, parent)?;
1817 std::fs::create_dir_all(parent).map_err(|e| {
1818 format!("Failed to create local dir {}: {}", parent.display(), e)
1819 })?;
1820 reject_local_symlink_below_root(local_root, parent)?;
1821 }
1822
1823 if self.sftp_download_file(
1824 sftp,
1825 remote_path,
1826 local_path,
1827 local_root,
1828 bytes_transferred,
1829 )? {
1830 *files_transferred += 1;
1831 }
1832 } else {
1833 tracing::warn!(
1835 path = %remote_path.display(),
1836 "Skipping remote path: not a regular file or directory"
1837 );
1838 }
1839
1840 Ok(())
1841 }
1842
1843 fn sftp_download_file(
1845 &self,
1846 sftp: &Sftp,
1847 remote_path: &Path,
1848 local_path: &Path,
1849 local_root: &Path,
1850 bytes_transferred: &mut u64,
1851 ) -> Result<bool, String> {
1852 let stat = sftp
1853 .lstat(remote_path)
1854 .map_err(|e| format!("Failed to lstat {}: {}", remote_path.display(), e))?;
1855 if sftp_file_stat_is_symlink(&stat) {
1856 tracing::warn!(
1857 path = %remote_path.display(),
1858 "Skipping remote symlink during SFTP sync"
1859 );
1860 return Ok(false);
1861 }
1862 if !stat.is_file() {
1863 tracing::warn!(
1864 path = %remote_path.display(),
1865 "Skipping remote path: not a regular file"
1866 );
1867 return Ok(false);
1868 }
1869
1870 let mut remote_file = sftp
1871 .open(remote_path)
1872 .map_err(|e| format!("Failed to open {}: {}", remote_path.display(), e))?;
1873
1874 reject_local_symlink_below_root(local_root, local_path)?;
1875
1876 let temp_path = unique_atomic_sidecar_path(local_path, "download", "cass-sync-download");
1877 let mut local_file = std::fs::OpenOptions::new()
1878 .write(true)
1879 .create_new(true)
1880 .open(&temp_path)
1881 .map_err(|e| format!("Failed to create {}: {}", temp_path.display(), e))?;
1882
1883 let mut buffer = [0u8; 32768]; loop {
1886 let bytes_read = remote_file
1887 .read(&mut buffer)
1888 .map_err(|e| format!("Failed to read {}: {}", remote_path.display(), e))?;
1889
1890 if bytes_read == 0 {
1891 break;
1892 }
1893
1894 local_file
1895 .write_all(&buffer[..bytes_read])
1896 .map_err(|e| format!("Failed to write {}: {}", local_path.display(), e))?;
1897
1898 *bytes_transferred += bytes_read as u64;
1899 }
1900
1901 tracing::trace!(
1902 remote = %remote_path.display(),
1903 local = %local_path.display(),
1904 "downloaded file"
1905 );
1906
1907 local_file
1908 .sync_all()
1909 .map_err(|e| format!("Failed to sync {}: {}", temp_path.display(), e))?;
1910 drop(local_file);
1911 replace_file_from_temp(&temp_path, local_path).map_err(|e| {
1912 format!(
1913 "Failed to publish {} to {}: {}",
1914 temp_path.display(),
1915 local_path.display(),
1916 e
1917 )
1918 })?;
1919
1920 Ok(true)
1921 }
1922}
1923
1924fn sftp_entry_file_name<'a>(entry_path: &'a Path, parent_path: &Path) -> Option<&'a str> {
1926 let Some(file_name) = entry_path.file_name() else {
1927 tracing::warn!(
1928 parent = %parent_path.display(),
1929 entry = ?entry_path,
1930 "Skipping SFTP entry without a file name"
1931 );
1932 return None;
1933 };
1934
1935 let Some(file_name) = file_name.to_str() else {
1936 tracing::warn!(
1937 parent = %parent_path.display(),
1938 entry = ?entry_path,
1939 "Skipping SFTP entry with non-UTF-8 file name"
1940 );
1941 return None;
1942 };
1943
1944 if file_name.is_empty() {
1945 tracing::warn!(
1946 parent = %parent_path.display(),
1947 entry = ?entry_path,
1948 "Skipping SFTP entry with empty file name"
1949 );
1950 return None;
1951 }
1952
1953 if file_name == "." || file_name == ".." {
1954 return None;
1955 }
1956
1957 Some(file_name)
1958}
1959
1960fn which_scp_exists() -> bool {
1965 std::env::var_os("PATH")
1966 .map(|path_var| {
1967 std::env::split_paths(&path_var).any(|dir| {
1968 let candidate = dir.join(if cfg!(target_os = "windows") {
1969 "scp.exe"
1970 } else {
1971 "scp"
1972 });
1973 candidate.is_file()
1974 })
1975 })
1976 .unwrap_or(false)
1977}
1978
1979fn windows_path_to_wsl(path: &str) -> String {
1986 if path.len() >= 3 {
1988 let bytes = path.as_bytes();
1989 if bytes[1] == b':' && (bytes[2] == b'\\' || bytes[2] == b'/') {
1990 let drive = (bytes[0] as char).to_lowercase().next().unwrap_or('c');
1991 let rest = path[3..].replace('\\', "/");
1992 return format!("/mnt/{}/{}", drive, rest);
1993 }
1994 }
1995 path.to_string()
1996}
1997
1998fn parse_ssh_host(host: &str) -> (Option<&str>, &str) {
2004 if let Some(at_pos) = host.find('@') {
2005 let user = &host[..at_pos];
2006 let hostname = &host[at_pos + 1..];
2007 (Some(user), hostname)
2008 } else {
2009 (None, host)
2010 }
2011}
2012
2013fn first_nonblank_username<'a>(
2014 candidates: impl IntoIterator<Item = Option<&'a str>>,
2015) -> Option<String> {
2016 candidates.into_iter().find_map(|candidate| {
2017 let trimmed = candidate?.trim();
2018 if trimmed.is_empty() {
2019 None
2020 } else {
2021 Some(trimmed.to_string())
2022 }
2023 })
2024}
2025
2026fn env_username(key: &str) -> Option<String> {
2027 dotenvy::var(key)
2028 .ok()
2029 .and_then(|value| first_nonblank_username([Some(value.as_str())]))
2030}
2031
2032fn expand_tilde_local(path: &str) -> String {
2034 if let Some(stripped) = path.strip_prefix("~/")
2035 && let Some(home) = dirs::home_dir()
2036 {
2037 return format!("{}/{}", home.display(), stripped);
2038 } else if path == "~"
2039 && let Some(home) = dirs::home_dir()
2040 {
2041 return home.display().to_string();
2042 }
2043 path.to_string()
2044}
2045
2046pub fn path_to_safe_dirname(path: &str) -> String {
2055 use std::path::{Component, Path};
2056
2057 let path_obj = Path::new(path);
2058 let mut parts: Vec<&str> = Vec::new();
2059
2060 for component in path_obj.components() {
2061 match component {
2062 Component::Normal(name) => {
2063 if let Some(s) = name.to_str() {
2064 if !s.is_empty() && s != "." && s != "~" {
2066 parts.push(s);
2067 }
2068 }
2069 }
2070 Component::ParentDir
2072 | Component::CurDir
2073 | Component::RootDir
2074 | Component::Prefix(_) => {}
2075 }
2076 }
2077
2078 let cleaned = parts.join("_").replace([' ', '\\'], "_");
2079
2080 let hash = fnv1a_hash(path);
2082 let hash_suffix = format!("{:08x}", hash);
2083
2084 if cleaned.is_empty() {
2085 format!("root_{}", hash_suffix)
2086 } else {
2087 format!("{}_{}", cleaned, hash_suffix)
2088 }
2089}
2090
2091fn fnv1a_hash(text: &str) -> u64 {
2092 let mut hash: u64 = 0xcbf29ce484222325;
2093 for byte in text.bytes() {
2094 hash ^= u64::from(byte);
2095 hash = hash.wrapping_mul(0x100000001b3);
2096 }
2097 hash
2098}
2099
2100fn parse_rsync_stats(output: &str) -> RsyncStats {
2102 let mut stats = RsyncStats::default();
2103
2104 for line in output.lines() {
2105 let line = line.trim();
2106
2107 if line.starts_with("Number of regular files transferred:")
2109 && let Some(num_str) = line.split(':').nth(1)
2110 {
2111 stats.files_transferred = num_str.trim().replace(',', "").parse().unwrap_or(0);
2112 }
2113
2114 if line.starts_with("Total transferred file size:")
2116 && let Some(size_part) = line.split(':').nth(1)
2117 {
2118 let size_str = size_part
2120 .split_whitespace()
2121 .next()
2122 .unwrap_or("0")
2123 .replace(',', "");
2124 stats.bytes_transferred = size_str.parse().unwrap_or(0);
2125 }
2126 }
2127
2128 stats
2129}
2130
2131#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
2137#[serde(rename_all = "snake_case")]
2138pub enum SyncResult {
2139 Success,
2141 PartialFailure(String),
2143 Failed(String),
2145 #[default]
2147 Skipped,
2148}
2149
2150impl SyncResult {
2151 pub fn label(&self) -> &'static str {
2153 match self {
2154 Self::Success => "success",
2155 Self::PartialFailure(_) => "partial",
2156 Self::Failed(_) => "failed",
2157 Self::Skipped => "never",
2158 }
2159 }
2160
2161 pub fn error_message(&self) -> Option<&str> {
2163 match self {
2164 Self::PartialFailure(error) | Self::Failed(error) => Some(error.as_str()),
2165 Self::Success | Self::Skipped => None,
2166 }
2167 }
2168}
2169
2170#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
2172#[serde(rename_all = "snake_case")]
2173pub enum SourceSyncAction {
2174 Sync,
2176 Skip,
2178 Defer,
2180}
2181
2182impl SourceSyncAction {
2183 pub fn as_str(self) -> &'static str {
2184 match self {
2185 Self::Sync => "sync",
2186 Self::Skip => "skip",
2187 Self::Defer => "defer",
2188 }
2189 }
2190}
2191
2192#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
2194#[serde(rename_all = "snake_case")]
2195pub enum SourceHealthKind {
2196 NeverSynced,
2197 Healthy,
2198 Stale,
2199 HighLatency,
2200 Flapping,
2201 AuthFailed,
2202 BackingOff,
2203}
2204
2205impl SourceHealthKind {
2206 pub fn as_str(self) -> &'static str {
2207 match self {
2208 Self::NeverSynced => "never_synced",
2209 Self::Healthy => "healthy",
2210 Self::Stale => "stale",
2211 Self::HighLatency => "high_latency",
2212 Self::Flapping => "flapping",
2213 Self::AuthFailed => "auth_failed",
2214 Self::BackingOff => "backing_off",
2215 }
2216 }
2217}
2218
2219#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
2221pub struct SourceSyncDecision {
2222 pub action: SourceSyncAction,
2224 pub health: SourceHealthKind,
2226 pub health_score: u8,
2228 pub staleness_ms: Option<i64>,
2230 pub stale_value_score: u8,
2232 pub manual_override: bool,
2234 pub fallback_active: bool,
2236 pub next_eligible_sync_ms: Option<i64>,
2238 pub backoff_until_ms: Option<i64>,
2240 pub reasons: Vec<String>,
2242}
2243
2244impl SourceSyncDecision {
2245 fn evaluate(
2246 source: &SourceDefinition,
2247 info: Option<&SourceSyncInfo>,
2248 now_ms: i64,
2249 manual_override: bool,
2250 ) -> Self {
2251 let period_ms = sync_schedule_period_ms(source.sync_schedule);
2252 let next_eligible_sync_ms = info
2253 .and_then(|info| info.last_sync)
2254 .and_then(|last_sync| period_ms.map(|period| last_sync.saturating_add(period)));
2255 let backoff_until_ms = info.and_then(failure_backoff_until_ms);
2256 let staleness_ms = info.and_then(|info| {
2257 info.last_sync
2258 .map(|last_sync| now_ms.saturating_sub(last_sync).max(0))
2259 });
2260 let stale_value_score =
2261 stale_value_score_for_source(source.sync_schedule, staleness_ms, info);
2262 let mut reasons = Vec::new();
2263
2264 let health = match info {
2265 None => {
2266 reasons.push("no durable sync status exists for this source".to_string());
2267 SourceHealthKind::NeverSynced
2268 }
2269 Some(info) if info.last_sync.is_none() => {
2270 reasons.push("source has never completed or attempted a sync".to_string());
2271 SourceHealthKind::NeverSynced
2272 }
2273 Some(info) if sync_result_auth_failure(&info.last_result) => {
2274 reasons
2275 .push("last sync failed with an authentication or host-key error".to_string());
2276 SourceHealthKind::AuthFailed
2277 }
2278 Some(info) if matches!(info.last_result, SyncResult::PartialFailure(_)) => {
2279 reasons.push("last sync partially succeeded and partially failed".to_string());
2280 SourceHealthKind::Flapping
2281 }
2282 Some(info)
2283 if info.consecutive_failures > 0
2284 && backoff_until_ms.is_some_and(|until| until > now_ms) =>
2285 {
2286 reasons.push(format!(
2287 "{} consecutive failure(s) are inside retry backoff",
2288 info.consecutive_failures
2289 ));
2290 SourceHealthKind::BackingOff
2291 }
2292 Some(info) if matches!(info.last_result, SyncResult::Failed(_)) => {
2293 let error = info.last_result.error_message().unwrap_or("unknown error");
2294 reasons.push(format!(
2295 "last sync failed completely ({error}); local fallback remains active"
2296 ));
2297 SourceHealthKind::Flapping
2298 }
2299 Some(info) if info.duration_ms >= SOURCE_HIGH_LATENCY_MS => {
2300 reasons.push(format!(
2301 "last sync took {}ms, above {}ms high-latency guard",
2302 info.duration_ms, SOURCE_HIGH_LATENCY_MS
2303 ));
2304 SourceHealthKind::HighLatency
2305 }
2306 Some(info) if sync_schedule_due(info.last_sync, period_ms, now_ms) => {
2307 reasons.push("configured sync schedule is due".to_string());
2308 SourceHealthKind::Stale
2309 }
2310 Some(_) => SourceHealthKind::Healthy,
2311 };
2312
2313 let fallback_active = matches!(
2314 health,
2315 SourceHealthKind::AuthFailed
2316 | SourceHealthKind::BackingOff
2317 | SourceHealthKind::Flapping
2318 | SourceHealthKind::HighLatency
2319 );
2320
2321 let mut action = if manual_override {
2322 reasons.push("explicit sync command overrides automatic scheduling".to_string());
2323 SourceSyncAction::Sync
2324 } else {
2325 automatic_source_sync_action(source.sync_schedule, health, info, now_ms)
2326 };
2327
2328 if !manual_override && matches!(health, SourceHealthKind::AuthFailed) {
2329 action = SourceSyncAction::Defer;
2330 }
2331
2332 if !manual_override && matches!(source.sync_schedule, SyncSchedule::Manual) {
2333 reasons.push("sync_schedule=manual requires an explicit sync command".to_string());
2334 }
2335
2336 if !manual_override
2337 && matches!(action, SourceSyncAction::Skip)
2338 && let Some(next_ms) = next_eligible_sync_ms
2339 {
2340 reasons.push(format!(
2341 "next scheduled sync is eligible at unix_ms={next_ms}"
2342 ));
2343 }
2344
2345 if reasons.is_empty() {
2346 reasons.push("source is healthy and within schedule".to_string());
2347 }
2348
2349 Self {
2350 action,
2351 health,
2352 health_score: health_score_for_source(health),
2353 staleness_ms,
2354 stale_value_score,
2355 manual_override,
2356 fallback_active,
2357 next_eligible_sync_ms,
2358 backoff_until_ms,
2359 reasons,
2360 }
2361 }
2362}
2363
2364#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
2366pub struct SourceSyncInfo {
2367 pub last_sync: Option<i64>,
2369 pub last_result: SyncResult,
2371 pub files_synced: u64,
2373 pub bytes_transferred: u64,
2375 pub duration_ms: u64,
2377 #[serde(default)]
2379 pub consecutive_failures: u32,
2380}
2381
2382impl SourceSyncInfo {
2383 pub fn from_report(report: &SyncReport) -> Self {
2385 let last_result = report.sync_result();
2386 Self {
2387 last_sync: Some(current_unix_ms()),
2388 consecutive_failures: u32::from(!report.all_succeeded),
2389 last_result,
2390 files_synced: report.total_files(),
2391 bytes_transferred: report.total_bytes(),
2392 duration_ms: report.total_duration_ms,
2393 }
2394 }
2395}
2396
2397#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
2399pub struct SyncStatus {
2400 pub sources: std::collections::HashMap<String, SourceSyncInfo>,
2402}
2403
2404impl SyncStatus {
2405 pub fn load(data_dir: &Path) -> Result<Self, std::io::Error> {
2407 let path = Self::status_path(data_dir);
2408 match std::fs::read_to_string(&path) {
2409 Ok(content) => serde_json::from_str(&content)
2410 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
2411 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Self::default()),
2412 Err(e) => Err(e),
2413 }
2414 }
2415
2416 pub fn save(&self, data_dir: &Path) -> Result<(), std::io::Error> {
2421 let path = Self::status_path(data_dir);
2422 if let Some(parent) = path.parent() {
2423 std::fs::create_dir_all(parent)?;
2424 }
2425 let content = serde_json::to_string_pretty(self)?;
2426 let tmp_path = unique_atomic_temp_path(&path);
2427 std::fs::write(&tmp_path, content)?;
2428 sync_file_path(&tmp_path)?;
2429 replace_file_from_temp(&tmp_path, &path)
2430 }
2431
2432 pub fn update(&mut self, source_name: &str, report: &SyncReport) {
2434 let previous_failures = self
2435 .get(source_name)
2436 .map(|info| info.consecutive_failures)
2437 .unwrap_or_default();
2438 let mut info = SourceSyncInfo::from_report(report);
2439 if report.all_succeeded {
2440 info.consecutive_failures = 0;
2441 } else {
2442 info.consecutive_failures = previous_failures.saturating_add(1);
2443 }
2444 self.set_info(source_name, info);
2445 }
2446
2447 pub fn set_info(&mut self, source_name: &str, info: SourceSyncInfo) {
2449 self.sources.insert(source_name.to_string(), info);
2450 }
2451
2452 pub fn retain_sources<'a>(&mut self, source_names: impl IntoIterator<Item = &'a str>) -> bool {
2456 let allowed: std::collections::HashSet<&str> = source_names.into_iter().collect();
2457 let previous_len = self.sources.len();
2458 self.sources
2459 .retain(|source_name, _| allowed.contains(source_name.as_str()));
2460 self.sources.len() != previous_len
2461 }
2462
2463 pub fn get(&self, source_name: &str) -> Option<&SourceSyncInfo> {
2465 self.sources.get(source_name)
2466 }
2467
2468 pub fn decision_for_source_at(
2470 &self,
2471 source: &SourceDefinition,
2472 now_ms: i64,
2473 manual_override: bool,
2474 ) -> SourceSyncDecision {
2475 SourceSyncDecision::evaluate(source, self.get(&source.name), now_ms, manual_override)
2476 }
2477
2478 fn status_path(data_dir: &Path) -> PathBuf {
2480 data_dir.join("sync_status.json")
2481 }
2482}
2483
2484const SOURCE_HIGH_LATENCY_MS: u64 = 60_000;
2485const SOURCE_FAILURE_BACKOFF_BASE_MS: i64 = 5 * 60 * 1000;
2486const SOURCE_FAILURE_BACKOFF_MAX_MS: i64 = 60 * 60 * 1000;
2487
2488pub(crate) fn current_unix_ms() -> i64 {
2489 let now = std::time::SystemTime::now()
2490 .duration_since(std::time::UNIX_EPOCH)
2491 .unwrap_or_default()
2492 .as_millis();
2493 i64::try_from(now).unwrap_or(i64::MAX)
2494}
2495
2496fn sync_schedule_period_ms(schedule: SyncSchedule) -> Option<i64> {
2497 match schedule {
2498 SyncSchedule::Manual => None,
2499 SyncSchedule::Hourly => Some(60 * 60 * 1000),
2500 SyncSchedule::Daily => Some(24 * 60 * 60 * 1000),
2501 }
2502}
2503
2504fn sync_schedule_due(last_sync: Option<i64>, period_ms: Option<i64>, now_ms: i64) -> bool {
2505 match (last_sync, period_ms) {
2506 (None, _) => true,
2507 (Some(_), None) => false,
2508 (Some(last_sync), Some(period_ms)) => last_sync.saturating_add(period_ms) <= now_ms,
2509 }
2510}
2511
2512fn automatic_source_sync_action(
2513 schedule: SyncSchedule,
2514 health: SourceHealthKind,
2515 info: Option<&SourceSyncInfo>,
2516 now_ms: i64,
2517) -> SourceSyncAction {
2518 match health {
2519 SourceHealthKind::AuthFailed | SourceHealthKind::BackingOff => SourceSyncAction::Defer,
2520 _ if matches!(schedule, SyncSchedule::Manual) => SourceSyncAction::Skip,
2521 SourceHealthKind::NeverSynced | SourceHealthKind::Stale => SourceSyncAction::Sync,
2522 SourceHealthKind::Flapping | SourceHealthKind::HighLatency => {
2523 if sync_schedule_due(
2524 info.and_then(|info| info.last_sync),
2525 sync_schedule_period_ms(schedule),
2526 now_ms,
2527 ) {
2528 SourceSyncAction::Sync
2529 } else {
2530 SourceSyncAction::Skip
2531 }
2532 }
2533 SourceHealthKind::Healthy => {
2534 if sync_schedule_due(
2535 info.and_then(|info| info.last_sync),
2536 sync_schedule_period_ms(schedule),
2537 now_ms,
2538 ) {
2539 SourceSyncAction::Sync
2540 } else {
2541 SourceSyncAction::Skip
2542 }
2543 }
2544 }
2545}
2546
2547fn health_score_for_source(health: SourceHealthKind) -> u8 {
2548 match health {
2549 SourceHealthKind::Healthy => 100,
2550 SourceHealthKind::Stale => 75,
2551 SourceHealthKind::NeverSynced => 65,
2552 SourceHealthKind::HighLatency => 55,
2553 SourceHealthKind::Flapping => 40,
2554 SourceHealthKind::BackingOff => 25,
2555 SourceHealthKind::AuthFailed => 10,
2556 }
2557}
2558
2559fn stale_value_score_for_source(
2560 schedule: SyncSchedule,
2561 staleness_ms: Option<i64>,
2562 info: Option<&SourceSyncInfo>,
2563) -> u8 {
2564 let Some(info) = info else {
2565 return 100;
2566 };
2567 if info.last_sync.is_none() {
2568 return 100;
2569 }
2570
2571 let Some(staleness_ms) = staleness_ms else {
2572 return 100;
2573 };
2574
2575 let Some(period_ms) = sync_schedule_period_ms(schedule) else {
2576 return 0;
2577 };
2578
2579 let score = staleness_ms.saturating_mul(100) / period_ms.max(1);
2580 u8::try_from(score.clamp(0, 100)).unwrap_or(100)
2581}
2582
2583fn failure_backoff_until_ms(info: &SourceSyncInfo) -> Option<i64> {
2584 if info.consecutive_failures == 0 {
2585 return None;
2586 }
2587 let last_sync = info.last_sync?;
2588 let exponent = info.consecutive_failures.saturating_sub(1).min(4);
2589 let multiplier = 1_i64.checked_shl(exponent).unwrap_or(16);
2590 let backoff_ms = SOURCE_FAILURE_BACKOFF_BASE_MS
2591 .saturating_mul(multiplier)
2592 .min(SOURCE_FAILURE_BACKOFF_MAX_MS);
2593 Some(last_sync.saturating_add(backoff_ms))
2594}
2595
2596fn sync_result_auth_failure(result: &SyncResult) -> bool {
2597 let Some(error) = result.error_message() else {
2598 return false;
2599 };
2600 let error = error.to_ascii_lowercase();
2601 error.contains("permission denied")
2602 || error.contains("authentication")
2603 || error.contains("host key verification failed")
2604 || error.contains("known_hosts")
2605 || error.contains("no valid authentication")
2606}
2607
2608fn unique_atomic_temp_path(path: &Path) -> PathBuf {
2609 unique_atomic_sidecar_path(path, "tmp", "sync_status.json")
2610}
2611
2612fn replace_file_from_temp(temp_path: &Path, final_path: &Path) -> Result<(), std::io::Error> {
2613 #[cfg(windows)]
2614 {
2615 match std::fs::rename(temp_path, final_path) {
2616 Ok(()) => sync_parent_directory(final_path),
2617 Err(first_err)
2618 if final_path.exists()
2619 && matches!(
2620 first_err.kind(),
2621 std::io::ErrorKind::AlreadyExists | std::io::ErrorKind::PermissionDenied
2622 ) =>
2623 {
2624 let backup_path = unique_replace_backup_path(final_path);
2625 std::fs::rename(final_path, &backup_path).map_err(|backup_err| {
2626 let _ = std::fs::remove_file(temp_path);
2627 std::io::Error::other(format!(
2628 "failed preparing backup {} before replacing {}: first error: {}; backup error: {}",
2629 backup_path.display(),
2630 final_path.display(),
2631 first_err,
2632 backup_err
2633 ))
2634 })?;
2635 match std::fs::rename(temp_path, final_path) {
2636 Ok(()) => {
2637 let _ = std::fs::remove_file(&backup_path);
2638 sync_parent_directory(final_path)
2639 }
2640 Err(second_err) => {
2641 let restore_result = std::fs::rename(&backup_path, final_path);
2642 match restore_result {
2643 Ok(()) => {
2644 let _ = std::fs::remove_file(temp_path);
2645 sync_parent_directory(final_path).map_err(|sync_err| {
2646 std::io::Error::other(format!(
2647 "failed replacing {} with {}: first error: {}; second error: {}; restored original file but failed syncing parent directory: {}",
2648 final_path.display(),
2649 temp_path.display(),
2650 first_err,
2651 second_err,
2652 sync_err
2653 ))
2654 })?;
2655 Err(std::io::Error::new(
2656 second_err.kind(),
2657 format!(
2658 "failed replacing {} with {}: first error: {}; second error: {}; restored original file",
2659 final_path.display(),
2660 temp_path.display(),
2661 first_err,
2662 second_err
2663 ),
2664 ))
2665 }
2666 Err(restore_err) => Err(std::io::Error::other(format!(
2667 "failed replacing {} with {}: first error: {}; second error: {}; restore error: {}; temp file retained at {}",
2668 final_path.display(),
2669 temp_path.display(),
2670 first_err,
2671 second_err,
2672 restore_err,
2673 temp_path.display()
2674 ))),
2675 }
2676 }
2677 }
2678 }
2679 Err(rename_err) => Err(rename_err),
2680 }
2681 }
2682
2683 #[cfg(not(windows))]
2684 {
2685 std::fs::rename(temp_path, final_path)?;
2686 sync_parent_directory(final_path)
2687 }
2688}
2689
2690fn sync_file_path(path: &Path) -> Result<(), std::io::Error> {
2691 std::fs::File::open(path)?.sync_all()
2692}
2693
2694#[cfg(not(windows))]
2695fn sync_parent_directory(path: &Path) -> Result<(), std::io::Error> {
2696 let Some(parent) = path.parent() else {
2697 return Ok(());
2698 };
2699 std::fs::File::open(parent)?.sync_all()
2700}
2701
2702#[cfg(windows)]
2703fn sync_parent_directory(_path: &Path) -> Result<(), std::io::Error> {
2704 Ok(())
2705}
2706
2707#[cfg(windows)]
2708fn unique_replace_backup_path(path: &Path) -> PathBuf {
2709 unique_atomic_sidecar_path(path, "bak", "sync_status.json")
2710}
2711
2712fn unique_atomic_sidecar_path(path: &Path, suffix: &str, fallback_name: &str) -> PathBuf {
2713 static NEXT_NONCE: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
2714
2715 let timestamp = std::time::SystemTime::now()
2716 .duration_since(std::time::UNIX_EPOCH)
2717 .unwrap_or_default()
2718 .as_nanos();
2719 let nonce = NEXT_NONCE.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2720 let file_name = path
2721 .file_name()
2722 .and_then(|name| name.to_str())
2723 .unwrap_or(fallback_name);
2724
2725 path.with_file_name(format!(
2726 ".{file_name}.{suffix}.{}.{}.{}",
2727 std::process::id(),
2728 timestamp,
2729 nonce
2730 ))
2731}
2732
2733#[cfg(test)]
2734mod tests {
2735 use super::*;
2736 use tempfile::TempDir;
2737
2738 #[test]
2739 fn test_path_to_safe_dirname() {
2740 let res = path_to_safe_dirname("~/.claude/projects");
2741 assert!(res.starts_with(".claude_projects_"));
2742
2743 let res = path_to_safe_dirname("/home/user/data");
2744 assert!(res.starts_with("home_user_data_"));
2745
2746 let res = path_to_safe_dirname("~/");
2747 assert!(res.starts_with("root_"));
2748
2749 let res = path_to_safe_dirname("");
2750 assert!(res.starts_with("root_"));
2751 }
2752
2753 #[test]
2754 fn test_path_to_safe_dirname_empty() {
2755 let res = path_to_safe_dirname("~");
2756 assert!(res.starts_with("root_"));
2757
2758 let res = path_to_safe_dirname("/");
2759 assert!(res.starts_with("root_"));
2760 }
2761
2762 #[test]
2763 fn test_path_to_safe_dirname_strips_traversal_components() {
2764 let res = path_to_safe_dirname("../../etc/passwd");
2765
2766 assert!(res.starts_with("etc_passwd_"));
2767 assert!(!res.contains(".."));
2768 assert!(!res.contains('/'));
2769 assert!(!res.contains('\\'));
2770 }
2771
2772 #[test]
2773 fn test_get_remote_home_rejects_unsafe_hosts_before_ssh() {
2774 let temp = TempDir::new().unwrap();
2775 let engine = SyncEngine::new(temp.path());
2776
2777 for host in [
2778 "work-mac;touch /tmp/cass-owned",
2779 "work mac",
2780 "work-mac\nhostname",
2781 "work-mac`hostname`",
2782 "work-mac/../../secret",
2783 "-oProxyCommand=evil",
2784 "",
2785 "@host",
2786 "user@",
2787 "user@host@extra",
2788 ] {
2789 let err = engine.get_remote_home(host).unwrap_err();
2790 assert!(
2791 matches!(err, SyncError::SshFailed(ref message) if message.contains("Invalid characters in host")),
2792 "expected invalid-host rejection for {host:?}, got {err}"
2793 );
2794 }
2795 }
2796
2797 #[test]
2798 fn test_sync_source_rejects_invalid_source_name_before_mirror_creation() {
2799 let temp = TempDir::new().unwrap();
2800 let engine = SyncEngine::new(temp.path());
2801 let mut source = SourceDefinition::ssh("../escape", "user@host");
2802 source.paths = vec!["/tmp/sessions".to_string()];
2803
2804 let err = engine
2805 .sync_source(&source)
2806 .expect_err("invalid source name should fail before local writes");
2807
2808 assert!(
2809 matches!(err, SyncError::InvalidSource(ref message) if message.contains("Source name cannot contain path separators")),
2810 "expected invalid source-name rejection, got {err}"
2811 );
2812 assert!(
2813 !temp.path().join("escape").exists(),
2814 "invalid source name must not escape the remotes mirror layout"
2815 );
2816 assert!(
2817 !temp.path().join("remotes").exists(),
2818 "invalid source name must be rejected before creating mirror roots"
2819 );
2820 }
2821
2822 #[test]
2823 fn test_sync_source_rejects_invalid_host_before_mirror_creation() {
2824 let temp = TempDir::new().unwrap();
2825 let engine = SyncEngine::new(temp.path());
2826 let mut source = SourceDefinition::ssh("unsafe-host", "user@host withspace");
2827 source.paths = vec!["/tmp/sessions".to_string()];
2828
2829 let err = engine
2830 .sync_source(&source)
2831 .expect_err("invalid host should fail before local writes");
2832
2833 assert!(
2834 matches!(err, SyncError::InvalidSource(ref message) if message.contains("SSH host cannot contain whitespace")),
2835 "expected invalid host rejection, got {err}"
2836 );
2837 assert!(
2838 !temp.path().join("remotes").exists(),
2839 "invalid host must be rejected before creating mirror roots"
2840 );
2841 }
2842
2843 #[test]
2844 fn test_sync_source_reports_invalid_remote_paths_without_transfer() {
2845 let temp = TempDir::new().unwrap();
2846 let engine = SyncEngine::new(temp.path());
2847
2848 for (path, expected) in [
2849 ("", "paths[0] cannot be empty"),
2850 (" ", "paths[0] cannot be empty"),
2851 (" ~/.claude/projects", "paths[0] cannot have leading"),
2852 ("~/.claude/projects ", "paths[0] cannot have leading"),
2853 ("~/.claude\nprojects", "paths[0] cannot contain control"),
2854 ] {
2855 let mut source = SourceDefinition::ssh("laptop", "user@laptop.local");
2856 source.paths = vec![path.to_string()];
2857
2858 let report = engine.sync_source(&source).unwrap();
2859 assert_eq!(report.path_results.len(), 1);
2860 let result = &report.path_results[0];
2861 assert!(!result.success);
2862 assert_eq!(result.remote_path, path);
2863 assert!(
2864 result
2865 .error
2866 .as_deref()
2867 .is_some_and(|message| message.contains(expected)),
2868 "expected invalid path rejection for {path:?}, got {result:?}"
2869 );
2870 }
2871 }
2872
2873 #[test]
2874 fn test_remote_sync_path_validation_allows_internal_spaces() {
2875 assert!(
2876 validate_remote_sync_path_entry(
2877 0,
2878 "~/Library/Application Support/Cursor/User/globalStorage"
2879 )
2880 .is_ok()
2881 );
2882 }
2883
2884 #[test]
2885 fn test_sync_source_preserves_path_result_order_for_mixed_invalid_paths() {
2886 let temp = TempDir::new().unwrap();
2887 let engine = SyncEngine::new(temp.path()).with_connection_timeout(1);
2888 let mut source = SourceDefinition::ssh("laptop", "192.0.2.1");
2891 source.paths = vec![
2892 "~/.codex/sessions".to_string(),
2893 " ~/.claude/projects".to_string(),
2894 "~/.gemini/tmp".to_string(),
2895 ];
2896
2897 let report = engine.sync_source(&source).unwrap();
2898 let remote_paths = report
2899 .path_results
2900 .iter()
2901 .map(|result| result.remote_path.as_str())
2902 .collect::<Vec<_>>();
2903
2904 assert_eq!(
2905 remote_paths,
2906 vec!["~/.codex/sessions", " ~/.claude/projects", "~/.gemini/tmp"]
2907 );
2908 assert!(
2909 report.path_results[1]
2910 .error
2911 .as_deref()
2912 .is_some_and(|message| message.contains("paths[1] cannot have leading")),
2913 "expected invalid path error in original slot: {:?}",
2914 report.path_results
2915 );
2916 }
2917
2918 #[test]
2919 fn test_remote_find_regular_files_command_uses_physical_traversal() {
2920 assert_eq!(
2921 remote_find_regular_files_command("/tmp/has space"),
2922 "find -P '/tmp/has space' -type f -print0"
2923 );
2924 assert_eq!(
2925 remote_find_regular_files_command("/tmp/that's all"),
2926 "find -P '/tmp/that'\\''s all' -type f -print0"
2927 );
2928 }
2929
2930 #[test]
2931 fn test_parse_remote_home_stdout_accepts_single_absolute_candidate() {
2932 assert_eq!(
2933 parse_remote_home_stdout(b"Welcome to host\nCASS_HOME_MARKER:/home/user\n"),
2934 Some("/home/user".to_string())
2935 );
2936 assert_eq!(
2937 parse_remote_home_stdout(b"CASS_HOME_MARKER:/Users/test user\r\n"),
2938 Some("/Users/test user".to_string())
2939 );
2940 }
2941
2942 #[test]
2943 fn test_parse_remote_home_stdout_rejects_missing_or_ambiguous_home() {
2944 assert_eq!(parse_remote_home_stdout(b"Welcome to host\n"), None);
2945 assert_eq!(
2946 parse_remote_home_stdout(b"CASS_HOME_MARKER:not_absolute\n"),
2947 None
2948 );
2949 }
2950
2951 #[test]
2952 fn test_parse_null_terminated_utf8_paths_skips_invalid_entries() {
2953 let paths = parse_null_terminated_utf8_paths(
2954 b"/remote/sessions/a.jsonl\0bad-\xff-name\0/remote/sessions/b.jsonl\0",
2955 );
2956 assert_eq!(
2957 paths,
2958 vec![
2959 "/remote/sessions/a.jsonl".to_string(),
2960 "/remote/sessions/b.jsonl".to_string()
2961 ]
2962 );
2963 }
2964
2965 #[test]
2966 fn test_remote_file_to_safe_local_path_rejects_outside_root() {
2967 let root = Path::new("/remote/sessions");
2968 let local = Path::new("/mirror/root");
2969
2970 assert_eq!(
2971 remote_file_to_safe_local_path(
2972 root,
2973 Path::new("/remote/sessions/a/b.jsonl"),
2974 local,
2975 "sessions"
2976 ),
2977 Some(PathBuf::from("/mirror/root/sessions/a/b.jsonl"))
2978 );
2979 assert_eq!(
2980 remote_file_to_safe_local_path(
2981 Path::new("/remote/session.jsonl"),
2982 Path::new("/remote/session.jsonl"),
2983 local,
2984 "session.jsonl"
2985 ),
2986 Some(PathBuf::from("/mirror/root/session.jsonl"))
2987 );
2988 assert_eq!(
2989 remote_file_to_safe_local_path(
2990 root,
2991 Path::new("/remote/sessions/../secret.txt"),
2992 local,
2993 "sessions"
2994 ),
2995 None
2996 );
2997 assert_eq!(
2998 remote_file_to_safe_local_path(
2999 root,
3000 Path::new("/remote/other/secret.txt"),
3001 local,
3002 "sessions"
3003 ),
3004 None
3005 );
3006 }
3007
3008 #[test]
3009 fn test_local_symlink_guard_allows_regular_paths() {
3010 let temp = TempDir::new().expect("tempdir");
3011 let root = temp.path().join("mirror");
3012 let target = root.join("sessions/session.jsonl");
3013
3014 assert!(reject_local_symlink_below_root(&root, &target).is_ok());
3015
3016 std::fs::create_dir_all(target.parent().expect("target parent")).expect("create parent");
3017 std::fs::write(&target, "{}").expect("write target");
3018
3019 assert!(reject_local_symlink_below_root(&root, &target).is_ok());
3020 }
3021
3022 #[cfg(unix)]
3023 #[test]
3024 fn test_local_symlink_guard_rejects_nested_symlink() {
3025 use std::os::unix::fs::symlink;
3026
3027 let temp = TempDir::new().expect("tempdir");
3028 let root = temp.path().join("mirror");
3029 let outside = temp.path().join("outside");
3030 std::fs::create_dir_all(&root).expect("create root");
3031 std::fs::create_dir_all(&outside).expect("create outside");
3032 symlink(&outside, root.join("sessions")).expect("symlink nested dir");
3033
3034 let err = reject_local_symlink_below_root(&root, &root.join("sessions/session.jsonl"))
3035 .expect_err("nested symlink should be rejected");
3036
3037 assert!(err.contains("Refusing to write"));
3038 assert!(err.contains("sessions"));
3039 }
3040
3041 #[cfg(unix)]
3042 #[test]
3043 fn test_local_symlink_guard_rejects_root_symlink() {
3044 use std::os::unix::fs::symlink;
3045
3046 let temp = TempDir::new().expect("tempdir");
3047 let outside = temp.path().join("outside");
3048 let root = temp.path().join("mirror-link");
3049 std::fs::create_dir_all(&outside).expect("create outside");
3050 symlink(&outside, &root).expect("symlink root");
3051
3052 let err = reject_local_symlink_below_root(&root, &root.join("session.jsonl"))
3053 .expect_err("root symlink should be rejected");
3054
3055 assert!(err.contains("Refusing to write"));
3056 assert!(err.contains("mirror-link"));
3057 }
3058
3059 #[test]
3060 fn test_prepare_local_sync_container_creates_regular_container() {
3061 let temp = TempDir::new().expect("tempdir");
3062 let root = temp.path().join("mirror");
3063 let target = root.join("sessions");
3064
3065 prepare_local_sync_container(&root, &target).expect("regular container should be created");
3066
3067 assert!(target.is_dir());
3068 }
3069
3070 #[cfg(unix)]
3071 #[test]
3072 fn test_prepare_local_sync_container_rejects_preexisting_target_symlink() {
3073 use std::os::unix::fs::symlink;
3074
3075 let temp = TempDir::new().expect("tempdir");
3076 let root = temp.path().join("mirror");
3077 let outside = temp.path().join("outside");
3078 let target = root.join("sessions");
3079 std::fs::create_dir_all(&root).expect("create root");
3080 std::fs::create_dir_all(&outside).expect("create outside");
3081 symlink(&outside, &target).expect("symlink target");
3082
3083 let err = prepare_local_sync_container(&root, &target)
3084 .expect_err("sync container symlink should be rejected");
3085
3086 assert!(err.contains("Refusing to write"));
3087 assert!(err.contains("sessions"));
3088 }
3089
3090 #[cfg(unix)]
3091 #[test]
3092 fn test_prepare_local_sync_container_rejects_root_symlink() {
3093 use std::os::unix::fs::symlink;
3094
3095 let temp = TempDir::new().expect("tempdir");
3096 let outside = temp.path().join("outside");
3097 let root = temp.path().join("mirror-link");
3098 let target = root.join("sessions");
3099 std::fs::create_dir_all(&outside).expect("create outside");
3100 symlink(&outside, &root).expect("symlink root");
3101
3102 let err = prepare_local_sync_container(&root, &target)
3103 .expect_err("sync root symlink should be rejected");
3104
3105 assert!(err.contains("Refusing to write"));
3106 assert!(err.contains("mirror-link"));
3107 }
3108
3109 #[test]
3110 fn test_sftp_file_stat_is_symlink_detects_link_modes() {
3111 let symlink = FileStat {
3112 size: None,
3113 uid: None,
3114 gid: None,
3115 perm: Some(0o120000 | 0o777),
3116 atime: None,
3117 mtime: None,
3118 };
3119 let regular = FileStat {
3120 size: None,
3121 uid: None,
3122 gid: None,
3123 perm: Some(0o100000 | 0o644),
3124 atime: None,
3125 mtime: None,
3126 };
3127
3128 assert!(sftp_file_stat_is_symlink(&symlink));
3129 assert!(!sftp_file_stat_is_symlink(®ular));
3130 }
3131
3132 #[test]
3133 fn test_sftp_entry_file_name_accepts_regular_names() {
3134 let parent = Path::new("/remote");
3135 let entry = parent.join("session.jsonl");
3136
3137 assert_eq!(sftp_entry_file_name(&entry, parent), Some("session.jsonl"));
3138 }
3139
3140 #[test]
3141 fn test_sftp_entry_file_name_skips_dot_entries() {
3142 let parent = Path::new("/remote");
3143
3144 assert_eq!(sftp_entry_file_name(Path::new("."), parent), None);
3145 assert_eq!(sftp_entry_file_name(Path::new(".."), parent), None);
3146 }
3147
3148 #[cfg(unix)]
3149 #[test]
3150 fn test_sftp_entry_file_name_rejects_non_utf8_names() {
3151 use std::ffi::OsStr;
3152 use std::os::unix::ffi::OsStrExt;
3153
3154 let parent = Path::new("/remote");
3155 let bad_component = Path::new(OsStr::from_bytes(b"bad-\xff-name"));
3156 let entry = parent.join(bad_component);
3157
3158 assert_eq!(sftp_entry_file_name(&entry, parent), None);
3159 }
3160
3161 #[test]
3162 fn test_parse_rsync_stats() {
3163 let output = r#"
3164Number of files: 42
3165Number of regular files transferred: 10
3166Total transferred file size: 1,234 bytes
3167 "#;
3168
3169 let stats = parse_rsync_stats(output);
3170 assert_eq!(stats.files_transferred, 10);
3171 assert_eq!(stats.bytes_transferred, 1234);
3172 }
3173
3174 #[test]
3175 fn test_parse_rsync_stats_empty() {
3176 let stats = parse_rsync_stats("");
3177 assert_eq!(stats.files_transferred, 0);
3178 assert_eq!(stats.bytes_transferred, 0);
3179 }
3180
3181 #[test]
3182 fn test_quote_remote_shell_path_handles_spaces_and_quotes() {
3183 assert_eq!(
3184 quote_remote_shell_path("/Users/me/Library/Application Support/Cursor"),
3185 "'/Users/me/Library/Application Support/Cursor'"
3186 );
3187 assert_eq!(
3188 quote_remote_shell_path("/tmp/that's all"),
3189 "'/tmp/that'\\''s all'"
3190 );
3191 }
3192
3193 #[test]
3194 fn test_remote_spec_for_rsync_quotes_only_when_needed() {
3195 assert_eq!(
3196 remote_spec_for_rsync("work-mac", "/tmp/has space", true),
3197 "work-mac:/tmp/has space"
3198 );
3199 assert_eq!(
3200 remote_spec_for_rsync("work-mac", "/tmp/that's all", true),
3201 "work-mac:/tmp/that's all"
3202 );
3203 assert_eq!(
3204 remote_spec_for_rsync("work-mac", "/tmp/has space", false),
3205 "work-mac:'/tmp/has space'"
3206 );
3207 }
3208
3209 #[test]
3210 fn rsync_arg_protection_enum_maps_flags_correctly() {
3211 assert_eq!(
3216 RsyncArgProtection::ProtectArgs.flag(),
3217 Some("--protect-args")
3218 );
3219 assert_eq!(
3220 RsyncArgProtection::SecludedArgs.flag(),
3221 Some("--secluded-args")
3222 );
3223 assert_eq!(RsyncArgProtection::None.flag(), None);
3224 assert!(RsyncArgProtection::ProtectArgs.is_supported());
3225 assert!(RsyncArgProtection::SecludedArgs.is_supported());
3226 assert!(!RsyncArgProtection::None.is_supported());
3227 }
3228
3229 #[test]
3230 fn test_remote_spec_for_shell_bound_copy_quotes_remote_path() {
3231 assert_eq!(
3232 remote_spec_for_shell_bound_copy("work-mac", "/tmp/has space"),
3233 "work-mac:'/tmp/has space'"
3234 );
3235 }
3236
3237 #[test]
3238 fn test_remote_spec_for_scp_always_quotes_remote_path() {
3239 assert_eq!(
3240 remote_spec_for_scp("work-mac", "/tmp/that's all"),
3241 "work-mac:'/tmp/that'\\''s all'"
3242 );
3243 }
3244
3245 #[test]
3246 fn test_sync_report_totals() {
3247 let mut report = SyncReport::new("test", SyncMethod::Rsync);
3248 report.add_path_result(PathSyncResult {
3249 files_transferred: 5,
3250 bytes_transferred: 100,
3251 success: true,
3252 ..Default::default()
3253 });
3254 report.add_path_result(PathSyncResult {
3255 files_transferred: 3,
3256 bytes_transferred: 50,
3257 success: true,
3258 ..Default::default()
3259 });
3260
3261 assert_eq!(report.total_files(), 8);
3262 assert_eq!(report.total_bytes(), 150);
3263 assert!(report.all_succeeded);
3264 }
3265
3266 #[test]
3267 fn test_sync_report_with_failure() {
3268 let mut report = SyncReport::new("test", SyncMethod::Rsync);
3269 report.add_path_result(PathSyncResult {
3270 success: true,
3271 ..Default::default()
3272 });
3273 report.add_path_result(PathSyncResult {
3274 success: false,
3275 error: Some("Connection refused".into()),
3276 ..Default::default()
3277 });
3278
3279 assert!(!report.all_succeeded);
3280 assert_eq!(report.successful_paths(), 1);
3281 assert_eq!(report.failed_paths(), 1);
3282 }
3283
3284 #[test]
3285 fn test_detect_sync_method() {
3286 let method = SyncEngine::detect_sync_method();
3288 assert!(matches!(
3289 method,
3290 SyncMethod::Rsync | SyncMethod::WslRsync | SyncMethod::Scp | SyncMethod::Sftp
3291 ));
3292 }
3293
3294 #[test]
3295 fn test_sync_engine_mirror_dir() {
3296 let engine = SyncEngine::new(Path::new("/data/cass"));
3297 let mirror = engine.mirror_dir("laptop");
3298 assert_eq!(mirror, PathBuf::from("/data/cass/remotes/laptop/mirror"));
3299 }
3300
3301 #[test]
3302 fn test_sync_method_display() {
3303 for (method, expected) in [
3304 (SyncMethod::Rsync, "rsync"),
3305 (SyncMethod::WslRsync, "wsl-rsync"),
3306 (SyncMethod::Scp, "scp"),
3307 (SyncMethod::Sftp, "sftp"),
3308 ] {
3309 assert_eq!(method.as_str(), expected);
3310 assert_eq!(method.to_string(), expected);
3311 }
3312 }
3313
3314 #[test]
3315 fn test_windows_path_to_wsl_drive() {
3316 assert_eq!(
3317 windows_path_to_wsl("C:\\Users\\george\\AppData\\Roaming\\cass"),
3318 "/mnt/c/Users/george/AppData/Roaming/cass"
3319 );
3320 }
3321
3322 #[test]
3323 fn test_windows_path_to_wsl_forward_slash() {
3324 assert_eq!(
3325 windows_path_to_wsl("C:/Users/george/data"),
3326 "/mnt/c/Users/george/data"
3327 );
3328 }
3329
3330 #[test]
3331 fn test_windows_path_to_wsl_non_windows_path_unchanged() {
3332 assert_eq!(
3334 windows_path_to_wsl("/home/george/data"),
3335 "/home/george/data"
3336 );
3337 }
3338
3339 #[test]
3340 fn test_expand_tilde_with_home() {
3341 assert_eq!(
3343 SyncEngine::expand_tilde_with_home("/home/user/projects", Some("/home/user")),
3344 "/home/user/projects"
3345 );
3346
3347 assert_eq!(
3349 SyncEngine::expand_tilde_with_home("~/.claude/projects", Some("/home/user")),
3350 "/home/user/.claude/projects"
3351 );
3352
3353 assert_eq!(
3355 SyncEngine::expand_tilde_with_home("~", Some("/home/user")),
3356 "/home/user"
3357 );
3358
3359 assert_eq!(
3361 SyncEngine::expand_tilde_with_home("~/.claude/projects", None),
3362 "~/.claude/projects"
3363 );
3364
3365 assert_eq!(
3367 SyncEngine::expand_tilde_with_home("~otheruser/projects", Some("/home/user")),
3368 "~otheruser/projects"
3369 );
3370 }
3371
3372 #[test]
3373 fn test_sync_report_failed() {
3374 let report = SyncReport::failed("test-source", SyncError::NoHost);
3375 assert_eq!(report.source_name, "test-source");
3376 assert!(!report.all_succeeded);
3377 assert_eq!(report.path_results.len(), 1);
3378 assert!(!report.path_results[0].success);
3379 assert!(report.path_results[0].error.is_some());
3380 }
3381
3382 #[test]
3383 fn test_sync_result_default() {
3384 let result = SyncResult::default();
3385 assert!(matches!(result, SyncResult::Skipped));
3386 assert_eq!(result.label(), "never");
3387 }
3388
3389 #[test]
3390 fn test_source_sync_info_default() {
3391 let info = SourceSyncInfo::default();
3392 assert!(info.last_sync.is_none());
3393 assert_eq!(info.files_synced, 0);
3394 assert_eq!(info.bytes_transferred, 0);
3395 assert_eq!(info.duration_ms, 0);
3396 }
3397
3398 #[test]
3399 fn test_sync_status_update() {
3400 let mut status = SyncStatus::default();
3401
3402 let mut report = SyncReport::new("laptop", SyncMethod::Rsync);
3403 report.add_path_result(PathSyncResult {
3404 files_transferred: 10,
3405 bytes_transferred: 1000,
3406 success: true,
3407 ..Default::default()
3408 });
3409 report.total_duration_ms = 500;
3410
3411 status.update("laptop", &report);
3412
3413 let info = status.get("laptop").unwrap();
3414 assert!(info.last_sync.is_some());
3415 assert!(matches!(info.last_result, SyncResult::Success));
3416 assert_eq!(info.files_synced, 10);
3417 assert_eq!(info.bytes_transferred, 1000);
3418 assert_eq!(info.duration_ms, 500);
3419 }
3420
3421 #[test]
3422 fn test_sync_status_partial_failure() {
3423 let mut status = SyncStatus::default();
3424
3425 let mut report = SyncReport::new("server", SyncMethod::Rsync);
3426 report.add_path_result(PathSyncResult {
3427 success: true,
3428 files_transferred: 5,
3429 ..Default::default()
3430 });
3431 report.add_path_result(PathSyncResult {
3432 success: false,
3433 error: Some("Connection refused".into()),
3434 ..Default::default()
3435 });
3436
3437 status.update("server", &report);
3438
3439 let info = status.get("server").unwrap();
3440 assert!(matches!(info.last_result, SyncResult::PartialFailure(_)));
3441 }
3442
3443 #[test]
3444 fn test_sync_status_full_failure() {
3445 let mut status = SyncStatus::default();
3446
3447 let mut report = SyncReport::new("dead-host", SyncMethod::Rsync);
3448 report.add_path_result(PathSyncResult {
3449 success: false,
3450 error: Some("Host unreachable".into()),
3451 ..Default::default()
3452 });
3453
3454 status.update("dead-host", &report);
3455
3456 let info = status.get("dead-host").unwrap();
3457 assert!(matches!(info.last_result, SyncResult::Failed(_)));
3458 }
3459
3460 #[test]
3461 fn test_sync_status_save_round_trips() {
3462 let temp = TempDir::new().expect("tempdir");
3463 let mut status = SyncStatus::default();
3464 let mut report = SyncReport::new("laptop", SyncMethod::Rsync);
3465 report.add_path_result(PathSyncResult {
3466 files_transferred: 3,
3467 bytes_transferred: 42,
3468 success: true,
3469 ..Default::default()
3470 });
3471 status.update("laptop", &report);
3472
3473 status.save(temp.path()).expect("save status");
3474 let loaded = SyncStatus::load(temp.path()).expect("load status");
3475
3476 let info = loaded.get("laptop").expect("round-tripped source");
3477 assert_eq!(info.files_synced, 3);
3478 assert_eq!(info.bytes_transferred, 42);
3479 assert!(matches!(info.last_result, SyncResult::Success));
3480 }
3481
3482 #[test]
3483 fn test_sync_status_retain_sources_prunes_removed_entries() {
3484 let mut status = SyncStatus::default();
3485 status.sources.insert(
3486 "laptop".into(),
3487 SourceSyncInfo {
3488 files_synced: 3,
3489 ..Default::default()
3490 },
3491 );
3492 status.sources.insert(
3493 "desktop".into(),
3494 SourceSyncInfo {
3495 files_synced: 5,
3496 ..Default::default()
3497 },
3498 );
3499
3500 let removed_any = status.retain_sources(["laptop"]);
3501
3502 assert!(removed_any);
3503 assert!(status.get("laptop").is_some());
3504 assert!(status.get("desktop").is_none());
3505 }
3506
3507 fn source_with_schedule(schedule: SyncSchedule) -> SourceDefinition {
3508 let mut source = SourceDefinition::ssh("laptop", "user@laptop.local");
3509 source.sync_schedule = schedule;
3510 source.paths = vec!["~/.claude/projects".to_string()];
3511 source
3512 }
3513
3514 fn status_with_info(info: SourceSyncInfo) -> SyncStatus {
3515 let mut status = SyncStatus::default();
3516 status.set_info("laptop", info);
3517 status
3518 }
3519
3520 #[test]
3521 fn source_sync_decision_skips_healthy_source_until_schedule_due() {
3522 let now_ms = 1_700_000_000_000;
3523 let source = source_with_schedule(SyncSchedule::Hourly);
3524 let status = status_with_info(SourceSyncInfo {
3525 last_sync: Some(now_ms - 10 * 60 * 1000),
3526 last_result: SyncResult::Success,
3527 duration_ms: 250,
3528 ..Default::default()
3529 });
3530
3531 let decision = status.decision_for_source_at(&source, now_ms, false);
3532
3533 assert_eq!(decision.action, SourceSyncAction::Skip);
3534 assert_eq!(decision.health, SourceHealthKind::Healthy);
3535 assert!(!decision.fallback_active);
3536 assert_eq!(
3537 decision.next_eligible_sync_ms,
3538 Some(now_ms + 50 * 60 * 1000)
3539 );
3540 assert_eq!(decision.staleness_ms, Some(10 * 60 * 1000));
3541 assert_eq!(decision.stale_value_score, 16);
3542 }
3543
3544 #[test]
3545 fn source_sync_decision_syncs_stale_scheduled_source() {
3546 let now_ms = 1_700_000_000_000;
3547 let source = source_with_schedule(SyncSchedule::Hourly);
3548 let status = status_with_info(SourceSyncInfo {
3549 last_sync: Some(now_ms - 2 * 60 * 60 * 1000),
3550 last_result: SyncResult::Success,
3551 duration_ms: 250,
3552 ..Default::default()
3553 });
3554
3555 let decision = status.decision_for_source_at(&source, now_ms, false);
3556
3557 assert_eq!(decision.action, SourceSyncAction::Sync);
3558 assert_eq!(decision.health, SourceHealthKind::Stale);
3559 assert_eq!(decision.stale_value_score, 100);
3560 assert!(
3561 decision
3562 .reasons
3563 .iter()
3564 .any(|reason| reason.contains("schedule is due"))
3565 );
3566 }
3567
3568 #[test]
3569 fn source_sync_decision_defers_auth_failures_with_fallback_reason() {
3570 let now_ms = 1_700_000_000_000;
3571 let source = source_with_schedule(SyncSchedule::Hourly);
3572 let status = status_with_info(SourceSyncInfo {
3573 last_sync: Some(now_ms - 10 * 60 * 1000),
3574 last_result: SyncResult::Failed("Permission denied (publickey)".into()),
3575 duration_ms: 800,
3576 consecutive_failures: 1,
3577 ..Default::default()
3578 });
3579
3580 let decision = status.decision_for_source_at(&source, now_ms, false);
3581
3582 assert_eq!(decision.action, SourceSyncAction::Defer);
3583 assert_eq!(decision.health, SourceHealthKind::AuthFailed);
3584 assert!(decision.fallback_active);
3585 assert_eq!(decision.health_score, 10);
3586 }
3587
3588 #[test]
3589 fn source_sync_decision_marks_partial_success_as_flapping() {
3590 let now_ms = 1_700_000_000_000;
3591 let source = source_with_schedule(SyncSchedule::Hourly);
3592 let status = status_with_info(SourceSyncInfo {
3593 last_sync: Some(now_ms - 10 * 60 * 1000),
3594 last_result: SyncResult::PartialFailure("one path failed".into()),
3595 files_synced: 7,
3596 duration_ms: 900,
3597 consecutive_failures: 1,
3598 ..Default::default()
3599 });
3600
3601 let decision = status.decision_for_source_at(&source, now_ms, false);
3602
3603 assert_eq!(decision.action, SourceSyncAction::Skip);
3604 assert_eq!(decision.health, SourceHealthKind::Flapping);
3605 assert!(decision.fallback_active);
3606 }
3607
3608 #[test]
3609 fn source_sync_decision_keeps_local_fallback_after_unreachable_backoff_expires() {
3610 let now_ms = 1_700_000_000_000;
3611 let source = source_with_schedule(SyncSchedule::Hourly);
3612 let last_sync = now_ms - 10 * 60 * 1000;
3613 let status = status_with_info(SourceSyncInfo {
3614 last_sync: Some(last_sync),
3615 last_result: SyncResult::Failed("Host unreachable".into()),
3616 duration_ms: 900,
3617 consecutive_failures: 1,
3618 ..Default::default()
3619 });
3620
3621 let decision = status.decision_for_source_at(&source, now_ms, false);
3622
3623 assert_eq!(decision.action, SourceSyncAction::Skip);
3624 assert_eq!(decision.health, SourceHealthKind::Flapping);
3625 assert!(decision.fallback_active);
3626 assert_eq!(
3627 decision.backoff_until_ms,
3628 Some(last_sync + SOURCE_FAILURE_BACKOFF_BASE_MS)
3629 );
3630 assert!(
3631 decision
3632 .reasons
3633 .iter()
3634 .any(|reason| reason.contains("local fallback remains active"))
3635 );
3636 }
3637
3638 #[test]
3639 fn source_sync_decision_marks_slow_source_as_high_latency() {
3640 let now_ms = 1_700_000_000_000;
3641 let source = source_with_schedule(SyncSchedule::Hourly);
3642 let status = status_with_info(SourceSyncInfo {
3643 last_sync: Some(now_ms - 10 * 60 * 1000),
3644 last_result: SyncResult::Success,
3645 duration_ms: SOURCE_HIGH_LATENCY_MS + 1,
3646 ..Default::default()
3647 });
3648
3649 let decision = status.decision_for_source_at(&source, now_ms, false);
3650
3651 assert_eq!(decision.action, SourceSyncAction::Skip);
3652 assert_eq!(decision.health, SourceHealthKind::HighLatency);
3653 assert!(decision.fallback_active);
3654 }
3655
3656 #[test]
3657 fn source_sync_decision_manual_override_forces_sync() {
3658 let now_ms = 1_700_000_000_000;
3659 let source = source_with_schedule(SyncSchedule::Manual);
3660 let status = status_with_info(SourceSyncInfo {
3661 last_sync: Some(now_ms),
3662 last_result: SyncResult::Success,
3663 duration_ms: 100,
3664 ..Default::default()
3665 });
3666
3667 let decision = status.decision_for_source_at(&source, now_ms, true);
3668
3669 assert_eq!(decision.action, SourceSyncAction::Sync);
3670 assert!(decision.manual_override);
3671 assert!(
3672 decision
3673 .reasons
3674 .iter()
3675 .any(|reason| reason.contains("overrides automatic scheduling"))
3676 );
3677 }
3678
3679 #[test]
3680 fn test_unique_atomic_temp_path_changes_each_call() {
3681 let final_path = Path::new("/tmp/sync_status.json");
3682 let first = unique_atomic_temp_path(final_path);
3683 let second = unique_atomic_temp_path(final_path);
3684
3685 assert_ne!(first, second);
3686 assert_eq!(first.parent(), final_path.parent());
3687 assert_eq!(second.parent(), final_path.parent());
3688 }
3689
3690 #[test]
3691 fn test_replace_file_from_temp_overwrites_existing_file() {
3692 let temp = TempDir::new().expect("tempdir");
3693 let final_path = temp.path().join("sync_status.json");
3694 let first_tmp = temp.path().join("first.tmp");
3695 let second_tmp = temp.path().join("second.tmp");
3696
3697 std::fs::write(&first_tmp, "{\"first\":true}").expect("write first temp");
3698 replace_file_from_temp(&first_tmp, &final_path).expect("initial replace");
3699 assert_eq!(
3700 std::fs::read_to_string(&final_path).expect("read first final"),
3701 "{\"first\":true}"
3702 );
3703
3704 std::fs::write(&second_tmp, "{\"second\":true}").expect("write second temp");
3705 replace_file_from_temp(&second_tmp, &final_path).expect("overwrite replace");
3706 assert_eq!(
3707 std::fs::read_to_string(&final_path).expect("read second final"),
3708 "{\"second\":true}"
3709 );
3710 }
3711
3712 #[test]
3713 fn test_sync_engine_with_timeouts() {
3714 let engine = SyncEngine::new(Path::new("/data"))
3715 .with_connection_timeout(30)
3716 .with_transfer_timeout(600);
3717
3718 assert_eq!(engine.connection_timeout, 30);
3719 assert_eq!(engine.transfer_timeout, 600);
3720 }
3721
3722 #[test]
3723 fn test_sync_error_display() {
3724 assert_eq!(
3725 SyncError::NoHost.to_string(),
3726 "Source has no host configured"
3727 );
3728 assert_eq!(
3729 SyncError::NoPaths.to_string(),
3730 "Source has no paths configured"
3731 );
3732 assert_eq!(
3733 SyncError::InvalidPath("paths[0] cannot be empty".to_string()).to_string(),
3734 "Invalid source path: paths[0] cannot be empty"
3735 );
3736 assert_eq!(
3737 SyncError::Timeout(30).to_string(),
3738 "Connection timed out after 30 seconds"
3739 );
3740 assert_eq!(SyncError::Cancelled.to_string(), "Sync cancelled");
3741 }
3742
3743 #[test]
3748 fn test_parse_ssh_host_simple() {
3749 let (user, host) = parse_ssh_host("myserver");
3750 assert!(user.is_none());
3751 assert_eq!(host, "myserver");
3752 }
3753
3754 #[test]
3755 fn test_parse_ssh_host_with_user() {
3756 let (user, host) = parse_ssh_host("admin@myserver");
3757 assert_eq!(user, Some("admin"));
3758 assert_eq!(host, "myserver");
3759 }
3760
3761 #[test]
3762 fn test_parse_ssh_host_with_domain() {
3763 let (user, host) = parse_ssh_host("deploy@server.example.com");
3764 assert_eq!(user, Some("deploy"));
3765 assert_eq!(host, "server.example.com");
3766 }
3767
3768 #[test]
3769 fn test_parse_ssh_host_email_like() {
3770 let (user, host) = parse_ssh_host("user@host");
3772 assert_eq!(user, Some("user"));
3773 assert_eq!(host, "host");
3774 }
3775
3776 #[test]
3777 fn test_first_nonblank_username_priority_and_trimming() {
3778 assert_eq!(
3779 first_nonblank_username([Some(" alice "), Some("bob")]),
3780 Some("alice".to_string())
3781 );
3782 assert_eq!(
3783 first_nonblank_username([Some(" "), None, Some("carol")]),
3784 Some("carol".to_string())
3785 );
3786 assert_eq!(first_nonblank_username([None, Some("\t")]), None);
3787 }
3788
3789 #[test]
3790 fn test_expand_tilde_local_with_tilde_prefix() {
3791 let expanded = expand_tilde_local("~/Documents/file.txt");
3792 assert!(!expanded.starts_with('~'));
3794 assert!(expanded.ends_with("/Documents/file.txt"));
3795 }
3796
3797 #[test]
3798 fn test_expand_tilde_local_just_tilde() {
3799 let expanded = expand_tilde_local("~");
3800 assert!(!expanded.starts_with('~'));
3802 assert!(!expanded.is_empty());
3803 }
3804
3805 #[test]
3806 fn test_expand_tilde_local_no_tilde() {
3807 let path = "/absolute/path/to/file";
3808 let expanded = expand_tilde_local(path);
3809 assert_eq!(expanded, path);
3810 }
3811
3812 #[test]
3813 fn test_expand_tilde_local_tilde_in_middle() {
3814 let path = "/path/with/~tilde/inside";
3816 let expanded = expand_tilde_local(path);
3817 assert_eq!(expanded, path);
3818 }
3819}