use notify::{recommended_watcher, Event, RecursiveMode, Result, Watcher};
use std::path::{Path, PathBuf};
use std::sync::mpsc::channel;
use std::{fs, thread};
use std::collections::{HashMap, HashSet};
use std::io;
use tokio::time::{self, Duration, Instant};
use walkdir::WalkDir;
fn copy_dir_recursive(src: &Path, dst: &Path) -> io::Result<()> {
if !src.is_dir() {
return Err(io::Error::new(
io::ErrorKind::NotFound,
format!("Source directory not found: {:?}", src),
));
}
fs::create_dir_all(&dst)?;
for entry in WalkDir::new(src) {
let entry = entry?;
let entry_path = entry.path();
let relative_path = entry_path.strip_prefix(src).unwrap();
let dest_path = dst.join(relative_path);
if entry.file_type().is_dir() {
fs::create_dir_all(&dest_path)?;
} else {
fs::copy(entry_path, &dest_path)?;
}
}
Ok(())
}
fn find_checkpoint_dir(path: &Path) -> Option<PathBuf> {
let mut current_path = path.to_path_buf();
loop {
let file_name = current_path.file_name()?.to_string_lossy();
if file_name.starts_with("checkpoint-") {
return Some(current_path);
}
if !current_path.pop() {
break;
}
}
None
}
pub async fn execute_report(watch_dir: String, save_dir: String, debounce_secs: u64) -> Result<()> {
println!("Watching directory: {}", watch_dir);
println!("Copying new checkpoints to: {}", save_dir);
let debounce_duration = Duration::from_secs(debounce_secs);
fs::create_dir_all(&save_dir)?;
let (tx, mut rx) = tokio::sync::mpsc::channel(100);
let mut watcher = recommended_watcher(move |res: notify::Result<Event>| {
if let Err(e) = tx.blocking_send(res) {
println!("Error sending event: {}", e);
}
})?;
watcher.watch(Path::new(&watch_dir), RecursiveMode::Recursive)?;
println!("Watcher started...");
let mut paths_last_event = HashMap::<PathBuf, Instant>::new();
let mut interval = time::interval(Duration::from_millis(500));
loop {
tokio::select! {
Some(event_res) = rx.recv() => {
match event_res {
Ok(event) => {
for path in event.paths {
if let Some(checkpoint_dir) = find_checkpoint_dir(&path) {
paths_last_event.insert(checkpoint_dir, Instant::now());
}
}
}
Err(e) => println!("Watch error: {:?}", e),
}
}
_ = interval.tick() => {
let now = Instant::now();
let mut to_remove = vec![];
for (checkpoint_dir, &last_event_time) in paths_last_event.iter() {
if now.duration_since(last_event_time) >= debounce_duration {
println!("Processing checkpoint after debounce: {:?}", checkpoint_dir);
if let Err(e) = handle_new_checkpoint(&checkpoint_dir, &save_dir) {
println!("Error handling checkpoint: {}", e);
}
to_remove.push(checkpoint_dir.clone());
}
}
for checkpoint_dir in to_remove {
paths_last_event.remove(&checkpoint_dir);
}
}
}
}
}
fn handle_new_checkpoint(checkpoint_dir: &Path, save_dir: &str) -> std::io::Result<()> {
if !checkpoint_dir.exists() || !checkpoint_dir.is_dir() {
println!(
"Checkpoint directory {:?} does not exist yet or is not a directory",
checkpoint_dir
);
return Ok(());
} else {
println!("Checkpoint directory exists: {:?}", checkpoint_dir);
}
let file_name = checkpoint_dir
.file_name()
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Invalid path: no filename"))?
.to_string_lossy()
.to_string();
let dest_path = Path::new(save_dir).join(&file_name);
println!("Copying checkpoint: {:?}", checkpoint_dir);
println!("Destination path: {:?}", dest_path);
copy_dir_recursive(&checkpoint_dir, &dest_path).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("Failed to copy checkpoint: {}", e),
)
})?;
println!("Checkpoint copied to {:?}", dest_path);
Ok(())
}