use std::collections::HashMap;
use std::collections::HashSet;
use std::path::Path;
use std::path::PathBuf;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
use std::time::Instant;
use notify::RecursiveMode;
use notify::Watcher;
use crate::project;
use crate::project::RustProject;
use crate::scan;
use crate::scan::BackgroundMsg;
const DEBOUNCE_DURATION: Duration = Duration::from_millis(500);
const MAX_WAIT: Duration = Duration::from_secs(1);
const NEW_PROJECT_DEBOUNCE: Duration = Duration::from_secs(2);
const POLL_INTERVAL: Duration = Duration::from_millis(500);
pub struct WatchRequest {
pub project_path: String,
pub abs_path: PathBuf,
}
pub fn spawn_watcher(
scan_root: PathBuf,
bg_tx: mpsc::Sender<BackgroundMsg>,
ci_run_count: u32,
include_non_rust: bool,
) -> mpsc::Sender<WatchRequest> {
let (watch_tx, watch_rx) = mpsc::channel();
thread::spawn(move || {
watcher_loop(scan_root, bg_tx, watch_rx, ci_run_count, include_non_rust);
});
watch_tx
}
struct ProjectEntry {
project_path: String,
abs_path: PathBuf,
}
fn watcher_loop(
scan_root: PathBuf,
bg_tx: mpsc::Sender<BackgroundMsg>,
watch_rx: mpsc::Receiver<WatchRequest>,
ci_run_count: u32,
include_non_rust: bool,
) {
let (notify_tx, notify_rx) = mpsc::channel();
let handler = move |res| {
let _ = notify_tx.send(res);
};
let Ok(mut watcher) = notify::recommended_watcher(handler) else {
return;
};
if watcher.watch(&scan_root, RecursiveMode::Recursive).is_err() {
return;
}
let mut projects: HashMap<PathBuf, ProjectEntry> = HashMap::new();
let mut pending_disk: HashMap<String, (Instant, Instant)> = HashMap::new();
let mut pending_new: HashMap<PathBuf, Instant> = HashMap::new();
let mut discovered: HashSet<PathBuf> = HashSet::new();
loop {
loop {
match watch_rx.try_recv() {
Ok(req) => {
projects.insert(
req.abs_path.clone(),
ProjectEntry {
project_path: req.project_path,
abs_path: req.abs_path,
},
);
},
Err(mpsc::TryRecvError::Empty) => break,
Err(mpsc::TryRecvError::Disconnected) => return,
}
}
while let Ok(result) = notify_rx.try_recv() {
let Ok(event) = result else {
continue;
};
for event_path in &event.paths {
handle_event(
event_path,
&scan_root,
&projects,
&discovered,
&mut pending_disk,
&mut pending_new,
);
}
}
fire_disk_updates(&bg_tx, &projects, &mut pending_disk);
probe_new_projects(
&bg_tx,
&mut pending_new,
&mut discovered,
ci_run_count,
include_non_rust,
);
thread::sleep(POLL_INTERVAL);
}
}
fn handle_event(
event_path: &Path,
scan_root: &Path,
projects: &HashMap<PathBuf, ProjectEntry>,
discovered: &HashSet<PathBuf>,
pending_disk: &mut HashMap<String, (Instant, Instant)>,
pending_new: &mut HashMap<PathBuf, Instant>,
) {
let now = Instant::now();
if let Some((_, entry)) = projects
.iter()
.find(|(root, _)| event_path.starts_with(root))
{
let debounce_deadline = now + DEBOUNCE_DURATION;
let max_deadline = pending_disk
.get(&entry.project_path)
.map_or(now + MAX_WAIT, |(_, max)| *max);
pending_disk.insert(
entry.project_path.clone(),
(debounce_deadline, max_deadline),
);
return;
}
let Some(parent) = event_path.parent() else {
return;
};
if parent != scan_root {
return;
}
if !event_path.is_dir() || !discovered.contains(event_path) {
pending_new
.entry(event_path.to_path_buf())
.or_insert_with(|| now + NEW_PROJECT_DEBOUNCE);
}
}
fn fire_disk_updates(
bg_tx: &mpsc::Sender<BackgroundMsg>,
projects: &HashMap<PathBuf, ProjectEntry>,
pending_disk: &mut HashMap<String, (Instant, Instant)>,
) {
let now = Instant::now();
let ready: Vec<String> = pending_disk
.iter()
.filter(|(_, (debounce, max))| now >= *debounce || now >= *max)
.map(|(key, _)| key.clone())
.collect();
for project_path in ready {
pending_disk.remove(&project_path);
let Some(entry) = projects.values().find(|e| e.project_path == project_path) else {
continue;
};
let bytes = scan::dir_size(&entry.abs_path);
if bg_tx
.send(BackgroundMsg::DiskUsage {
path: project_path,
bytes,
})
.is_err()
{
return;
}
}
}
fn probe_new_projects(
bg_tx: &mpsc::Sender<BackgroundMsg>,
pending_new: &mut HashMap<PathBuf, Instant>,
discovered: &mut HashSet<PathBuf>,
ci_run_count: u32,
include_non_rust: bool,
) {
let now = Instant::now();
let ready: Vec<PathBuf> = pending_new
.iter()
.filter(|(_, deadline)| now >= **deadline)
.map(|(path, _)| path.clone())
.collect();
for dir in ready {
pending_new.remove(&dir);
if !dir.is_dir() {
discovered.remove(&dir);
let display_path = project::home_relative_path(&dir);
let _ = bg_tx.send(BackgroundMsg::DiskUsage {
path: display_path,
bytes: 0,
});
continue;
}
if discovered.contains(&dir) {
continue;
}
if let Some(project) = probe_project(&dir, include_non_rust) {
discovered.insert(dir.clone());
let abs_path = PathBuf::from(&project.abs_path);
let has_git = abs_path.join(".git").exists();
let _ = bg_tx.send(BackgroundMsg::ProjectDiscovered {
project: project.clone(),
});
let tx = bg_tx.clone();
let path = project.path.clone();
let name = project.name.clone();
rayon::spawn(move || {
scan::fetch_project_details(
&tx,
&path,
&abs_path,
name.as_ref(),
has_git,
ci_run_count,
);
});
}
}
}
fn probe_project(dir: &Path, include_non_rust: bool) -> Option<RustProject> {
let cargo_toml = dir.join("Cargo.toml");
if cargo_toml.exists() {
return RustProject::from_cargo_toml(&cargo_toml).ok();
}
if include_non_rust && dir.join(".git").is_dir() {
return Some(RustProject::from_git_dir(dir));
}
None
}