orign 0.2.3

A globally distributed container orchestrator
Documentation
use notify::{recommended_watcher, Event, RecursiveMode, Result, Watcher};
use std::path::{Path, PathBuf};
use std::sync::mpsc::channel;
use std::{fs, thread};

use std::collections::{HashMap, HashSet};
use std::io;
use tokio::time::{self, Duration, Instant};
use walkdir::WalkDir;

fn copy_dir_recursive(src: &Path, dst: &Path) -> io::Result<()> {
    // Ensure the source directory exists
    if !src.is_dir() {
        return Err(io::Error::new(
            io::ErrorKind::NotFound,
            format!("Source directory not found: {:?}", src),
        ));
    }

    // Create the destination directory
    fs::create_dir_all(&dst)?;

    // Iterate over the entries in the directory
    for entry in WalkDir::new(src) {
        let entry = entry?;
        let entry_path = entry.path();
        let relative_path = entry_path.strip_prefix(src).unwrap();
        let dest_path = dst.join(relative_path);

        if entry.file_type().is_dir() {
            // Create the directory in the destination
            fs::create_dir_all(&dest_path)?;
        } else {
            // Copy the file to the destination
            fs::copy(entry_path, &dest_path)?;
        }
    }

    Ok(())
}

fn find_checkpoint_dir(path: &Path) -> Option<PathBuf> {
    let mut current_path = path.to_path_buf();
    loop {
        let file_name = current_path.file_name()?.to_string_lossy();
        if file_name.starts_with("checkpoint-") {
            return Some(current_path);
        }
        if !current_path.pop() {
            break;
        }
    }
    None
}

pub async fn execute_report(watch_dir: String, save_dir: String, debounce_secs: u64) -> Result<()> {
    println!("Watching directory: {}", watch_dir);
    println!("Copying new checkpoints to: {}", save_dir);

    let debounce_duration = Duration::from_secs(debounce_secs);

    // Create the save directory if it doesn't exist
    fs::create_dir_all(&save_dir)?;

    // Set up the async channel to receive events
    let (tx, mut rx) = tokio::sync::mpsc::channel(100);

    // Create a watcher object using the recommended watcher for the platform
    let mut watcher = recommended_watcher(move |res: notify::Result<Event>| {
        if let Err(e) = tx.blocking_send(res) {
            println!("Error sending event: {}", e);
        }
    })?;

    // Start watching the watch directory recursively
    watcher.watch(Path::new(&watch_dir), RecursiveMode::Recursive)?;

    println!("Watcher started...");

    // Process events asynchronously with debouncing
    let mut paths_last_event = HashMap::<PathBuf, Instant>::new();

    // A task to process the paths after debounce_duration
    let mut interval = time::interval(Duration::from_millis(500));

    loop {
        tokio::select! {
            Some(event_res) = rx.recv() => {
                match event_res {
                    Ok(event) => {
                        for path in event.paths {
                            // Find the checkpoint directory for the path
                            if let Some(checkpoint_dir) = find_checkpoint_dir(&path) {
                                // Update the last event time for this checkpoint directory
                                paths_last_event.insert(checkpoint_dir, Instant::now());
                            }
                        }
                    }
                    Err(e) => println!("Watch error: {:?}", e),
                }
            }
            _ = interval.tick() => {
                let now = Instant::now();
                let mut to_remove = vec![];

                for (checkpoint_dir, &last_event_time) in paths_last_event.iter() {
                    if now.duration_since(last_event_time) >= debounce_duration {
                        // Debounce duration has passed, process the checkpoint directory
                        println!("Processing checkpoint after debounce: {:?}", checkpoint_dir);

                        if let Err(e) = handle_new_checkpoint(&checkpoint_dir, &save_dir) {
                            println!("Error handling checkpoint: {}", e);
                        }

                        // Mark for removal from the map
                        to_remove.push(checkpoint_dir.clone());
                    }
                }

                // Remove processed paths from the map
                for checkpoint_dir in to_remove {
                    paths_last_event.remove(&checkpoint_dir);
                }
            }
        }
    }
}

fn handle_new_checkpoint(checkpoint_dir: &Path, save_dir: &str) -> std::io::Result<()> {
    // Check if the checkpoint directory exists and is a directory
    if !checkpoint_dir.exists() || !checkpoint_dir.is_dir() {
        println!(
            "Checkpoint directory {:?} does not exist yet or is not a directory",
            checkpoint_dir
        );
        return Ok(());
    } else {
        println!("Checkpoint directory exists: {:?}", checkpoint_dir);
    }

    let file_name = checkpoint_dir
        .file_name()
        .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Invalid path: no filename"))?
        .to_string_lossy()
        .to_string();

    // Create the destination path
    let dest_path = Path::new(save_dir).join(&file_name);

    println!("Copying checkpoint: {:?}", checkpoint_dir);
    println!("Destination path: {:?}", dest_path);

    // Copy the directory recursively using the custom function
    copy_dir_recursive(&checkpoint_dir, &dest_path).map_err(|e| {
        io::Error::new(
            io::ErrorKind::Other,
            format!("Failed to copy checkpoint: {}", e),
        )
    })?;

    println!("Checkpoint copied to {:?}", dest_path);

    Ok(())
}