use crate::streaming::channel::{DELTA_MIN_SIZE, DeltaInfo, DestFileState, DestIndex, FileJob, FileJobSender, GeneratorMessage};
use crate::streaming::protocol::{DestFileEntry, DestFileFlags};
use crate::sync::scanner::Scanner;
use anyhow::Result;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
pub struct GeneratorConfig {
pub root: PathBuf,
pub include_hidden: bool,
pub follow_symlinks: bool,
pub delete_enabled: bool,
}
pub struct Generator {
config: GeneratorConfig,
dest_index: DestIndex,
seen_inodes: HashMap<u64, Arc<PathBuf>>, }
impl Generator {
pub fn new(config: GeneratorConfig) -> Self {
Self { config, dest_index: DestIndex::new(), seen_inodes: HashMap::new() }
}
pub fn add_dest_entry(&mut self, entry: DestFileEntry) {
let delta_info = if entry.flags.contains(DestFileFlags::HAS_CHECKSUMS) {
Some(DeltaInfo { block_size: entry.block_size, file_size: entry.size, checksums: entry.checksums })
} else {
None
};
self.dest_index.insert(
entry.path,
DestFileState { size: entry.size, mtime: entry.mtime, mode: entry.mode, is_dir: entry.flags.contains(DestFileFlags::DIR), delta_info },
);
}
pub fn dest_count(&self) -> usize {
self.dest_index.len()
}
pub async fn run(mut self, tx: FileJobSender) -> Result<(u64, u64)> {
let mut scanner = Scanner::new(&self.config.root);
scanner = scanner.follow_links(self.config.follow_symlinks);
let mut total_files = 0u64;
let mut total_bytes = 0u64;
let entries = tokio::task::spawn_blocking(move || scanner.scan()).await??;
for entry in entries {
let rel_path = entry.relative_path.as_ref().to_path_buf();
let rel_path_str = rel_path.to_string_lossy().to_string();
if rel_path_str.is_empty() {
continue;
}
let dest_state = self.dest_index.remove(&rel_path_str);
let mtime = entry.modified.duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_secs() as i64;
let mode = if entry.is_dir { 0o755 } else { 0o644 };
if !entry.is_dir
&& !entry.is_symlink
&& let Some(ref dest) = dest_state
&& dest.size == entry.size
&& dest.mtime == mtime
{
continue;
}
let msg = if entry.is_dir {
GeneratorMessage::Mkdir { path: Arc::new(rel_path), mode }
} else if entry.is_symlink {
GeneratorMessage::Symlink {
path: Arc::new(rel_path),
target: entry.symlink_target.as_ref().map(|t| t.to_string_lossy().to_string()).unwrap_or_default(),
}
} else {
let inode = entry.inode.unwrap_or(0);
let _link_target = if entry.nlink > 1 {
if let Some(existing) = self.seen_inodes.get(&inode) {
Some(existing.clone())
} else {
self.seen_inodes.insert(inode, Arc::new(rel_path.clone()));
None
}
} else {
None
};
let (need_delta, checksums) = self.check_delta_for_state(dest_state.as_ref(), entry.size);
total_files += 1;
total_bytes += entry.size;
GeneratorMessage::File(FileJob { path: Arc::new(rel_path), size: entry.size, mtime, mode, inode, need_delta, checksums })
};
tx.send(msg).await?;
}
tx.send(GeneratorMessage::FileEnd { total_files, total_bytes }).await?;
if self.config.delete_enabled {
let mut delete_count = 0u64;
let remaining: Vec<_> = self.dest_index.remaining_paths().map(|(path, state)| (path.clone(), state.is_dir)).collect();
for (path, is_dir) in remaining {
tx.send(GeneratorMessage::Delete { path: Arc::new(PathBuf::from(path)), is_dir }).await?;
delete_count += 1;
}
tx.send(GeneratorMessage::DeleteEnd { count: delete_count }).await?;
}
Ok((total_files, total_bytes))
}
fn check_delta_for_state(&self, dest_state: Option<&DestFileState>, size: u64) -> (bool, Option<DeltaInfo>) {
if size < DELTA_MIN_SIZE {
return (false, None);
}
if let Some(state) = dest_state
&& let Some(ref delta_info) = state.delta_info
{
return (true, Some(delta_info.clone()));
}
(false, None)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use tempfile::TempDir;
#[tokio::test]
async fn test_generator_empty_dir() {
let tmp = TempDir::new().unwrap();
let config = GeneratorConfig { root: tmp.path().to_path_buf(), include_hidden: false, follow_symlinks: false, delete_enabled: false };
let (tx, mut rx) = crate::streaming::channel::file_job_channel();
let genr = Generator::new(config);
tokio::spawn(async move {
genr.run(tx).await.unwrap();
});
match rx.recv().await {
Some(GeneratorMessage::FileEnd { total_files, .. }) => {
assert_eq!(total_files, 0);
}
other => panic!("Expected FileEnd, got {:?}", other),
}
}
#[tokio::test]
async fn test_generator_with_files() {
let tmp = TempDir::new().unwrap();
fs::write(tmp.path().join("file1.txt"), "hello").unwrap();
fs::write(tmp.path().join("file2.txt"), "world").unwrap();
fs::create_dir(tmp.path().join("subdir")).unwrap();
let config = GeneratorConfig { root: tmp.path().to_path_buf(), include_hidden: false, follow_symlinks: false, delete_enabled: false };
let (tx, mut rx) = crate::streaming::channel::file_job_channel();
let genr = Generator::new(config);
tokio::spawn(async move {
genr.run(tx).await.unwrap();
});
let mut file_count = 0;
let mut dir_count = 0;
while let Some(msg) = rx.recv().await {
match msg {
GeneratorMessage::File(_) => file_count += 1,
GeneratorMessage::Mkdir { .. } => dir_count += 1,
GeneratorMessage::FileEnd { total_files, .. } => {
assert_eq!(total_files, 2);
break;
}
_ => {}
}
}
assert_eq!(file_count, 2);
assert_eq!(dir_count, 1);
}
#[tokio::test]
async fn test_generator_delete_detection() {
let tmp = TempDir::new().unwrap();
fs::write(tmp.path().join("keep.txt"), "keep").unwrap();
let config = GeneratorConfig { root: tmp.path().to_path_buf(), include_hidden: false, follow_symlinks: false, delete_enabled: true };
let (tx, mut rx) = crate::streaming::channel::file_job_channel();
let mut genr = Generator::new(config);
genr.add_dest_entry(DestFileEntry {
path: "delete_me.txt".to_string(),
size: 100,
mtime: 0,
mode: 0o644,
flags: DestFileFlags::empty(),
block_size: 0,
checksums: vec![],
});
tokio::spawn(async move {
genr.run(tx).await.unwrap();
});
let mut got_delete = false;
while let Some(msg) = rx.recv().await {
match msg {
GeneratorMessage::Delete { path, .. } => {
if path.to_string_lossy() == "delete_me.txt" {
got_delete = true;
}
}
GeneratorMessage::DeleteEnd { .. } => break,
_ => {}
}
}
assert!(got_delete, "Should have received delete for delete_me.txt");
}
}