use super::checksumdb::ChecksumDatabase;
use super::scanner::FileEntry;
use crate::error::{Result, SyncError};
use crate::integrity::{Checksum, ChecksumType, IntegrityVerifier};
use crate::transport::{FileInfo, Transport};
use std::path::Path;
use std::sync::Arc;
use std::time::SystemTime;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SyncAction {
Skip,
Create,
Update,
Delete,
}
#[derive(Debug, Clone)]
pub struct SyncTask {
pub source: Option<Arc<FileEntry>>,
pub dest_path: std::path::PathBuf,
pub action: SyncAction,
#[allow(dead_code)] pub source_checksum: Option<Checksum>,
#[allow(dead_code)] pub dest_checksum: Option<Checksum>,
}
#[derive(Clone)]
pub struct StrategyPlanner {
mtime_tolerance: u64,
ignore_times: bool,
size_only: bool,
checksum: bool,
update_only: bool,
ignore_existing: bool,
verifier: Option<IntegrityVerifier>,
}
impl StrategyPlanner {
pub fn new() -> Self {
Self {
mtime_tolerance: 1, ignore_times: false,
size_only: false,
checksum: false,
update_only: false,
ignore_existing: false,
verifier: None,
}
}
pub fn with_comparison_flags(ignore_times: bool, size_only: bool, checksum: bool, update_only: bool, ignore_existing: bool) -> Self {
let verifier = if checksum {
Some(IntegrityVerifier::new(ChecksumType::Fast, false))
} else {
None
};
Self { mtime_tolerance: 1, ignore_times, size_only, checksum, update_only, ignore_existing, verifier }
}
pub async fn plan_file_async<T: Transport>(
&self, source: &FileEntry, dest_root: &Path, transport: &T, checksum_db: Option<&ChecksumDatabase>,
) -> Result<SyncTask> {
let dest_path = dest_root.join(&*source.relative_path);
let (action, source_checksum, dest_checksum) = if source.is_dir {
let exists = transport.exists(&dest_path).await.unwrap_or(false);
let action = if exists { SyncAction::Skip } else { SyncAction::Create };
(action, None, None)
} else {
match transport.file_info(&dest_path).await {
Ok(dest_info) => {
if self.ignore_existing {
tracing::debug!("File exists, skipping (--ignore-existing): {}", source.relative_path.display());
return Ok(SyncTask {
source: Some(Arc::new(source.clone())),
dest_path,
action: SyncAction::Skip,
source_checksum: None,
dest_checksum: None,
});
}
if self.update_only && self.dest_is_newer(source, &dest_info) {
tracing::debug!("Destination is newer, skipping (--update): {}", source.relative_path.display());
return Ok(SyncTask {
source: Some(Arc::new(source.clone())),
dest_path,
action: SyncAction::Skip,
source_checksum: None,
dest_checksum: None,
});
}
let (source_cksum, dest_cksum) = if let Some(verifier) = &self.verifier {
self.compute_checksums_local(source, &dest_path, verifier, checksum_db)?
} else {
(None, None)
};
let action = if let (Some(src_cksum), Some(dst_cksum)) = (&source_cksum, &dest_cksum) {
if src_cksum == dst_cksum {
tracing::debug!("Checksums match for {}, skipping transfer", source.relative_path.display());
SyncAction::Skip
} else {
tracing::debug!("Checksums differ for {}, will transfer", source.relative_path.display());
SyncAction::Update
}
} else {
let needs_update = self.needs_update(source, &dest_info);
if needs_update { SyncAction::Update } else { SyncAction::Skip }
};
(action, source_cksum, dest_cksum)
}
Err(_) => (SyncAction::Create, None, None),
}
};
Ok(SyncTask { source: Some(Arc::new(source.clone())), dest_path, action, source_checksum, dest_checksum })
}
fn compute_checksums_local(
&self, source: &FileEntry, dest_path: &Path, verifier: &IntegrityVerifier, checksum_db: Option<&ChecksumDatabase>,
) -> Result<(Option<Checksum>, Option<Checksum>)> {
let checksum_type = match verifier.checksum_type() {
ChecksumType::None => "none",
ChecksumType::Fast => "fast",
ChecksumType::Cryptographic => "cryptographic",
};
let source_checksum = if source.path.exists() {
if let Some(db) = checksum_db {
if let Ok(Some(cached)) = db.get_checksum(&source.path, source.modified, source.size, checksum_type) {
tracing::debug!("Database hit for source: {}", source.path.display());
Some(cached)
} else {
tracing::debug!("Database miss for source: {}, computing", source.path.display());
match verifier.compute_file_checksum(&source.path) {
Ok(cksum) => Some(cksum),
Err(e) => {
tracing::warn!("Failed to compute source checksum for {}: {} (file may have been deleted after scan)", source.path.display(), e);
None
}
}
}
} else {
match verifier.compute_file_checksum(&source.path) {
Ok(cksum) => Some(cksum),
Err(e) => {
tracing::warn!("Failed to compute source checksum for {}: {} (file may have been deleted after scan)", source.path.display(), e);
None
}
}
}
} else {
None
};
let dest_checksum = if dest_path.exists() {
let dest_metadata = std::fs::metadata(dest_path).map_err(|e| {
SyncError::Io(std::io::Error::new(
e.kind(),
format!(
"Failed to read metadata for destination file {}: {}. This may indicate a remote path being accessed locally.",
dest_path.display(),
e
),
))
})?;
let dest_mtime = dest_metadata.modified().ok();
let dest_size = Some(dest_metadata.len());
if let (Some(db), Some(mtime), Some(size)) = (checksum_db, dest_mtime, dest_size) {
if let Ok(Some(cached)) = db.get_checksum(dest_path, mtime, size, checksum_type) {
tracing::debug!("Database hit for dest: {}", dest_path.display());
Some(cached)
} else {
tracing::debug!("Database miss for dest: {}, computing", dest_path.display());
match verifier.compute_file_checksum(dest_path) {
Ok(cksum) => Some(cksum),
Err(e) => {
tracing::warn!(
"Failed to compute dest checksum for {}: {} (if this is a remote destination, this is a bug - checksum computation should not access remote paths locally)",
dest_path.display(),
e
);
None
}
}
}
} else {
match verifier.compute_file_checksum(dest_path) {
Ok(cksum) => Some(cksum),
Err(e) => {
tracing::warn!(
"Failed to compute dest checksum for {}: {} (if this is a remote destination, this is a bug - checksum computation should not access remote paths locally)",
dest_path.display(),
e
);
None
}
}
}
} else {
tracing::debug!("Skipping destination checksum for {} (path doesn't exist locally - may be remote)", dest_path.display());
None
};
Ok((source_checksum, dest_checksum))
}
#[allow(dead_code)]
pub fn plan_file(&self, source: &FileEntry, dest_root: &Path) -> SyncTask {
let dest_path = dest_root.join(&*source.relative_path);
let (action, source_checksum, dest_checksum) = if source.is_dir {
let action = if dest_path.exists() { SyncAction::Skip } else { SyncAction::Create };
(action, None, None)
} else {
match std::fs::metadata(&dest_path) {
Ok(dest_meta) => {
let (source_cksum, dest_cksum) = if let Some(verifier) = &self.verifier {
self.compute_checksums_local(source, &dest_path, verifier, None).unwrap_or((None, None))
} else {
(None, None)
};
let action = if let (Some(src_cksum), Some(dst_cksum)) = (&source_cksum, &dest_cksum) {
if src_cksum == dst_cksum {
tracing::debug!("Checksums match for {}, skipping transfer", source.relative_path.display());
SyncAction::Skip
} else {
tracing::debug!("Checksums differ for {}, will transfer", source.relative_path.display());
SyncAction::Update
}
} else {
let dest_info = FileInfo { size: dest_meta.len(), modified: dest_meta.modified().unwrap_or(SystemTime::UNIX_EPOCH) };
let needs_update = self.needs_update(source, &dest_info);
if needs_update { SyncAction::Update } else { SyncAction::Skip }
};
(action, source_cksum, dest_cksum)
}
Err(_) => (SyncAction::Create, None, None),
}
};
SyncTask { source: Some(Arc::new(source.clone())), dest_path, action, source_checksum, dest_checksum }
}
pub fn plan_file_with_dest_map(
&self, source: &FileEntry, dest_root: &Path, dest_map: &std::collections::HashMap<std::path::PathBuf, FileEntry>,
) -> SyncTask {
let dest_path = dest_root.join(&*source.relative_path);
let action = if source.is_dir {
if dest_map.contains_key(&*source.relative_path) { SyncAction::Skip } else { SyncAction::Create }
} else if source.is_symlink {
match dest_map.get(&*source.relative_path) {
Some(dest_file) => {
if dest_file.is_symlink && dest_file.symlink_target == source.symlink_target {
SyncAction::Skip
} else {
SyncAction::Create
}
}
None => SyncAction::Create,
}
} else {
match dest_map.get(&*source.relative_path) {
Some(dest_file) => {
if self.ignore_existing {
tracing::debug!("File exists, skipping (--ignore-existing): {}", source.relative_path.display());
return SyncTask {
source: Some(Arc::new(source.clone())),
dest_path,
action: SyncAction::Skip,
source_checksum: None,
dest_checksum: None,
};
}
let dest_info = FileInfo { size: dest_file.size, modified: dest_file.modified };
if self.update_only && self.dest_is_newer(source, &dest_info) {
tracing::debug!("Destination is newer, skipping (--update): {}", source.relative_path.display());
return SyncTask {
source: Some(Arc::new(source.clone())),
dest_path,
action: SyncAction::Skip,
source_checksum: None,
dest_checksum: None,
};
}
if self.needs_update(source, &dest_info) { SyncAction::Update } else { SyncAction::Skip }
}
None => SyncAction::Create,
}
};
SyncTask { source: Some(Arc::new(source.clone())), dest_path, action, source_checksum: None, dest_checksum: None }
}
fn needs_update(&self, source: &FileEntry, dest_info: &FileInfo) -> bool {
if self.checksum {
return true;
}
if self.ignore_times {
if source.size != dest_info.size {
return true; }
return true; }
if self.size_only {
return source.size != dest_info.size;
}
if source.size != dest_info.size {
return true;
}
if !self.mtime_matches(&source.modified, &dest_info.modified) {
return true;
}
false
}
fn mtime_matches(&self, source_mtime: &SystemTime, dest_mtime: &SystemTime) -> bool {
match source_mtime.duration_since(*dest_mtime) {
Ok(duration) => duration.as_secs() <= self.mtime_tolerance,
Err(e) => e.duration().as_secs() <= self.mtime_tolerance,
}
}
fn dest_is_newer(&self, source: &FileEntry, dest_info: &FileInfo) -> bool {
match dest_info.modified.duration_since(source.modified) {
Ok(duration) => duration.as_secs() > self.mtime_tolerance,
Err(_) => false, }
}
pub fn plan_deletions(&self, source_files: &[FileEntry], dest_root: &Path) -> Vec<SyncTask> {
let mut deletions = Vec::new();
const BLOOM_THRESHOLD: usize = 10_000;
if source_files.len() > BLOOM_THRESHOLD {
use crate::sync::scale::FileSetBloom;
let mut source_bloom = FileSetBloom::new(source_files.len());
for file in source_files {
source_bloom.insert(&file.relative_path);
}
let source_paths: std::collections::HashSet<_> = {
let mut set = std::collections::HashSet::with_capacity(source_files.len());
for f in source_files {
set.insert((*f.relative_path).clone());
}
set
};
if let Ok(dest_scanner) = crate::sync::scanner::Scanner::new(dest_root).scan_streaming() {
for dest_file in dest_scanner.flatten() {
if !source_bloom.contains(&dest_file.relative_path) {
deletions.push(SyncTask {
source: None,
dest_path: (*dest_file.path).clone(),
action: SyncAction::Delete,
source_checksum: None,
dest_checksum: None,
});
} else {
if !source_paths.contains(&**dest_file.relative_path) {
deletions.push(SyncTask {
source: None,
dest_path: (*dest_file.path).clone(),
action: SyncAction::Delete,
source_checksum: None,
dest_checksum: None,
});
}
}
}
}
} else {
let source_paths: std::collections::HashSet<_> = source_files.iter().map(|f| f.relative_path.clone()).collect();
if let Ok(dest_scanner) = crate::sync::scanner::Scanner::new(dest_root).scan_streaming() {
for dest_file in dest_scanner.flatten() {
if !source_paths.contains(&dest_file.relative_path) {
deletions.push(SyncTask {
source: None,
dest_path: (*dest_file.path).clone(),
action: SyncAction::Delete,
source_checksum: None,
dest_checksum: None,
});
}
}
}
}
deletions
}
}
impl Default for StrategyPlanner {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use std::path::PathBuf;
use std::sync::Arc;
use tempfile::TempDir;
#[test]
fn test_plan_create() {
let temp = TempDir::new().unwrap();
let dest_root = temp.path();
let source_file = FileEntry {
path: Arc::new(PathBuf::from("/source/file.txt")),
relative_path: Arc::new(PathBuf::from("file.txt")),
size: 100,
modified: SystemTime::now(),
is_dir: false,
is_symlink: false,
symlink_target: None,
is_sparse: false,
allocated_size: 100,
xattrs: None,
inode: None,
nlink: 1,
acls: None,
bsd_flags: None,
};
let planner = StrategyPlanner::new();
let task = planner.plan_file(&source_file, dest_root);
assert_eq!(task.action, SyncAction::Create);
}
#[test]
fn test_plan_skip_identical() {
let temp = TempDir::new().unwrap();
let dest_root = temp.path();
fs::write(dest_root.join("file.txt"), "content").unwrap();
let source_file = FileEntry {
path: Arc::new(PathBuf::from("/source/file.txt")),
relative_path: Arc::new(PathBuf::from("file.txt")),
size: 7, modified: SystemTime::now(),
is_dir: false,
is_symlink: false,
symlink_target: None,
is_sparse: false,
allocated_size: 7,
xattrs: None,
inode: None,
nlink: 1,
acls: None,
bsd_flags: None,
};
let planner = StrategyPlanner::new();
let task = planner.plan_file(&source_file, dest_root);
assert_eq!(task.action, SyncAction::Skip);
}
#[test]
fn test_plan_update_different_size() {
let temp = TempDir::new().unwrap();
let dest_root = temp.path();
fs::write(dest_root.join("file.txt"), "old").unwrap();
let source_file = FileEntry {
path: Arc::new(PathBuf::from("/source/file.txt")),
relative_path: Arc::new(PathBuf::from("file.txt")),
size: 100, modified: SystemTime::now(),
is_dir: false,
is_symlink: false,
symlink_target: None,
is_sparse: false,
allocated_size: 100,
xattrs: None,
inode: None,
nlink: 1,
acls: None,
bsd_flags: None,
};
let planner = StrategyPlanner::new();
let task = planner.plan_file(&source_file, dest_root);
assert_eq!(task.action, SyncAction::Update);
}
#[test]
fn test_plan_deletions_small_set() {
let temp_dest = TempDir::new().unwrap();
let dest_root = temp_dest.path();
fs::write(dest_root.join("keep.txt"), "keep").unwrap();
fs::write(dest_root.join("delete1.txt"), "delete").unwrap();
fs::write(dest_root.join("delete2.txt"), "delete").unwrap();
let source_files = vec![FileEntry {
path: Arc::new(PathBuf::from("/source/keep.txt")),
relative_path: Arc::new(PathBuf::from("keep.txt")),
size: 4,
modified: SystemTime::now(),
is_dir: false,
is_symlink: false,
symlink_target: None,
is_sparse: false,
allocated_size: 4,
xattrs: None,
inode: None,
nlink: 1,
acls: None,
bsd_flags: None,
}];
let planner = StrategyPlanner::new();
let deletions = planner.plan_deletions(&source_files, dest_root);
assert_eq!(deletions.len(), 2);
assert!(deletions.iter().all(|t| t.action == SyncAction::Delete));
let deletion_names: Vec<_> = deletions.iter().map(|t| t.dest_path.file_name().unwrap().to_str().unwrap()).collect();
assert!(deletion_names.contains(&"delete1.txt"));
assert!(deletion_names.contains(&"delete2.txt"));
}
#[test]
fn test_plan_deletions_large_set_with_bloom() {
let temp_dest = TempDir::new().unwrap();
let dest_root = temp_dest.path();
for i in 0..100 {
fs::write(dest_root.join(format!("file{}.txt", i)), "content").unwrap();
}
fs::write(dest_root.join("delete1.txt"), "delete").unwrap();
fs::write(dest_root.join("delete2.txt"), "delete").unwrap();
let mut source_files = Vec::with_capacity(11_000);
for i in 0..11_000 {
source_files.push(FileEntry {
path: Arc::new(PathBuf::from(format!("/source/file{}.txt", i))),
relative_path: Arc::new(PathBuf::from(format!("file{}.txt", i))),
size: 7,
modified: SystemTime::now(),
is_dir: false,
is_symlink: false,
symlink_target: None,
is_sparse: false,
allocated_size: 7,
xattrs: None,
inode: None,
nlink: 1,
acls: None,
bsd_flags: None,
});
}
let planner = StrategyPlanner::new();
let deletions = planner.plan_deletions(&source_files, dest_root);
assert_eq!(deletions.len(), 2);
assert!(deletions.iter().all(|t| t.action == SyncAction::Delete));
let deletion_names: Vec<_> = deletions.iter().map(|t| t.dest_path.file_name().unwrap().to_str().unwrap()).collect();
assert!(deletion_names.contains(&"delete1.txt"));
assert!(deletion_names.contains(&"delete2.txt"));
}
#[test]
fn test_plan_deletions_empty_source() {
let temp_dest = TempDir::new().unwrap();
let dest_root = temp_dest.path();
fs::write(dest_root.join("file1.txt"), "content").unwrap();
fs::write(dest_root.join("file2.txt"), "content").unwrap();
let source_files: Vec<FileEntry> = vec![];
let planner = StrategyPlanner::new();
let deletions = planner.plan_deletions(&source_files, dest_root);
assert_eq!(deletions.len(), 2);
assert!(deletions.iter().all(|t| t.action == SyncAction::Delete));
}
#[test]
fn test_checksum_mode_skip_identical_files() {
let temp = TempDir::new().unwrap();
let dest_root = temp.path();
let content = b"Hello, world!";
fs::write(dest_root.join("file.txt"), content).unwrap();
let source_file = FileEntry {
path: Arc::new(dest_root.join("file.txt")), relative_path: Arc::new(PathBuf::from("file.txt")),
size: content.len() as u64,
modified: SystemTime::now(),
is_dir: false,
is_symlink: false,
symlink_target: None,
is_sparse: false,
allocated_size: content.len() as u64,
xattrs: None,
inode: None,
nlink: 1,
acls: None,
bsd_flags: None,
};
let planner = StrategyPlanner::with_comparison_flags(false, false, true, false, false);
let task = planner.plan_file(&source_file, dest_root);
assert_eq!(task.action, SyncAction::Skip);
assert!(task.source_checksum.is_some());
assert!(task.dest_checksum.is_some());
assert_eq!(task.source_checksum, task.dest_checksum);
}
#[test]
fn test_checksum_mode_transfer_different_files() {
let temp = TempDir::new().unwrap();
let source_dir = temp.path().join("source");
let dest_dir = temp.path().join("dest");
fs::create_dir_all(&source_dir).unwrap();
fs::create_dir_all(&dest_dir).unwrap();
fs::write(source_dir.join("file.txt"), b"Source content").unwrap();
fs::write(dest_dir.join("file.txt"), b"Dest content (different)").unwrap();
let source_file = FileEntry {
path: Arc::new(source_dir.join("file.txt")),
relative_path: Arc::new(PathBuf::from("file.txt")),
size: 14, modified: SystemTime::now(),
is_dir: false,
is_symlink: false,
symlink_target: None,
is_sparse: false,
allocated_size: 14,
xattrs: None,
inode: None,
nlink: 1,
acls: None,
bsd_flags: None,
};
let planner = StrategyPlanner::with_comparison_flags(false, false, true, false, false);
let task = planner.plan_file(&source_file, &dest_dir);
assert_eq!(task.action, SyncAction::Update);
assert!(task.source_checksum.is_some());
assert!(task.dest_checksum.is_some());
assert_ne!(task.source_checksum, task.dest_checksum);
}
#[test]
fn test_checksum_mode_create_new_file() {
let temp = TempDir::new().unwrap();
let source_dir = temp.path().join("source");
let dest_dir = temp.path().join("dest");
fs::create_dir_all(&source_dir).unwrap();
fs::create_dir_all(&dest_dir).unwrap();
fs::write(source_dir.join("file.txt"), b"New file content").unwrap();
let source_file = FileEntry {
path: Arc::new(source_dir.join("file.txt")),
relative_path: Arc::new(PathBuf::from("file.txt")),
size: 16, modified: SystemTime::now(),
is_dir: false,
is_symlink: false,
symlink_target: None,
is_sparse: false,
allocated_size: 16,
xattrs: None,
inode: None,
nlink: 1,
acls: None,
bsd_flags: None,
};
let planner = StrategyPlanner::with_comparison_flags(false, false, true, false, false);
let task = planner.plan_file(&source_file, &dest_dir);
assert_eq!(task.action, SyncAction::Create);
assert!(task.source_checksum.is_none());
assert!(task.dest_checksum.is_none());
}
#[test]
fn test_plan_deletions_no_deletions_needed() {
let temp_dest = TempDir::new().unwrap();
let dest_root = temp_dest.path();
fs::write(dest_root.join("file1.txt"), "content").unwrap();
fs::write(dest_root.join("file2.txt"), "content").unwrap();
let source_files = vec![
FileEntry {
path: Arc::new(PathBuf::from("/source/file1.txt")),
relative_path: Arc::new(PathBuf::from("file1.txt")),
size: 7,
modified: SystemTime::now(),
is_dir: false,
is_symlink: false,
symlink_target: None,
is_sparse: false,
allocated_size: 7,
xattrs: None,
inode: None,
nlink: 1,
acls: None,
bsd_flags: None,
},
FileEntry {
path: Arc::new(PathBuf::from("/source/file2.txt")),
relative_path: Arc::new(PathBuf::from("file2.txt")),
size: 7,
modified: SystemTime::now(),
is_dir: false,
is_symlink: false,
symlink_target: None,
is_sparse: false,
allocated_size: 7,
xattrs: None,
inode: None,
nlink: 1,
acls: None,
bsd_flags: None,
},
];
let planner = StrategyPlanner::new();
let deletions = planner.plan_deletions(&source_files, dest_root);
assert_eq!(deletions.len(), 0);
}
}