#![cfg(target_os = "windows")]
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::sync::mpsc::{Receiver, RecvTimeoutError};
use std::sync::{Arc, RwLock};
use std::thread::JoinHandle;
use std::time::{Duration, UNIX_EPOCH};
use crate::config::Config;
use crate::core_service::CoreService;
use crate::discovery::DiscoveryExclusionPolicy;
use crate::file_watcher::{DirectoryWatcher, WatcherConfig, WatcherEvent, WatcherEventKind};
use crate::model::SearchItem;
const CONSUMER_FLUSH_INTERVAL: Duration = Duration::from_millis(500);
const CONSUMER_BATCH_CAP: usize = 4096;
struct WatcherEntry {
_watcher: DirectoryWatcher,
_consumer: JoinHandle<()>,
}
pub(crate) struct FileWatcherHandle {
entries: Vec<WatcherEntry>,
}
impl FileWatcherHandle {
pub(crate) fn start(
roots: Vec<PathBuf>,
excluded_roots: Vec<PathBuf>,
service: Arc<RwLock<CoreService>>,
) -> Self {
let mut entries = Vec::new();
for root in roots {
let config = WatcherConfig::new(root.clone(), excluded_roots.clone());
let start_result = DirectoryWatcher::start(config);
let (watcher, rx) = match start_result {
Ok(pair) => pair,
Err(error) => {
crate::runtime::log_warn(&format!(
"[nex] directory_watcher root=\"{}\" start failed: {error}",
root.display()
));
continue;
}
};
let consumer = spawn_consumer(root, rx, Arc::clone(&service), excluded_roots.clone());
entries.push(WatcherEntry {
_watcher: watcher,
_consumer: consumer,
});
}
Self { entries }
}
pub(crate) fn active_roots(&self) -> usize {
self.entries.len()
}
}
impl Drop for FileWatcherHandle {
fn drop(&mut self) {
for entry in self.entries.drain(..) {
drop(entry._watcher);
let _ = entry._consumer.join();
}
}
}
fn spawn_consumer(
root: PathBuf,
rx: Receiver<Vec<WatcherEvent>>,
service: Arc<RwLock<CoreService>>,
excluded_roots: Vec<PathBuf>,
) -> JoinHandle<()> {
std::thread::Builder::new()
.name(format!("nex-watcher-consumer[{}]", root.display()))
.spawn(move || {
let outcome = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
run_consumer(root, rx, service, excluded_roots)
}));
if let Err(payload) = outcome {
let message = panic_message_to_string(&payload);
crate::runtime::log_warn(&format!(
"[nex] directory_watcher consumer panicked: {message}"
));
}
})
.expect("watcher consumer thread should spawn")
}
fn run_consumer(
root: PathBuf,
rx: Receiver<Vec<WatcherEvent>>,
service: Arc<RwLock<CoreService>>,
excluded_roots: Vec<PathBuf>,
) {
let mut pending_added: HashMap<PathBuf, ()> = HashMap::new();
let mut pending_removed: HashSet<PathBuf> = HashSet::new();
let mut total_dropped: usize = 0;
loop {
let recv = rx.recv_timeout(CONSUMER_FLUSH_INTERVAL);
match recv {
Ok(batch) => {
for event in batch {
if pending_added.len() + pending_removed.len() >= CONSUMER_BATCH_CAP {
total_dropped = total_dropped.saturating_add(1);
continue;
}
apply_event_to_pending(event, &mut pending_added, &mut pending_removed);
}
}
Err(RecvTimeoutError::Timeout) => {
}
Err(RecvTimeoutError::Disconnected) => {
flush_pending(
&root,
&service,
&excluded_roots,
&mut pending_added,
&mut pending_removed,
);
if total_dropped > 0 {
crate::runtime::log_warn(&format!(
"[nex] directory_watcher root=\"{}\" dropped {total_dropped} events due to batch cap",
root.display()
));
}
return;
}
}
if !pending_added.is_empty() || !pending_removed.is_empty() {
flush_pending(
&root,
&service,
&excluded_roots,
&mut pending_added,
&mut pending_removed,
);
}
}
}
fn apply_event_to_pending(
event: WatcherEvent,
pending_added: &mut HashMap<PathBuf, ()>,
pending_removed: &mut HashSet<PathBuf>,
) {
match event.kind {
WatcherEventKind::Added | WatcherEventKind::Modified => {
if pending_removed.remove(&event.path) {
pending_added.remove(&event.path);
} else {
pending_added.insert(event.path, ());
}
}
WatcherEventKind::Removed => {
if pending_added.remove(&event.path).is_none() {
pending_removed.insert(event.path);
}
}
WatcherEventKind::Renamed => {
pending_added.insert(event.path, ());
}
}
}
fn flush_pending(
root: &Path,
service: &Arc<RwLock<CoreService>>,
excluded_roots: &[PathBuf],
pending_added: &mut HashMap<PathBuf, ()>,
pending_removed: &mut HashSet<PathBuf>,
) {
if pending_added.is_empty() && pending_removed.is_empty() {
return;
}
let added: Vec<PathBuf> = pending_added.drain().map(|(p, ())| p).collect();
let removed: Vec<PathBuf> = pending_removed.drain().collect();
let exclusion = DiscoveryExclusionPolicy::new(excluded_roots);
let mut upsert_items: Vec<SearchItem> = Vec::with_capacity(added.len());
{
let guard = match service.read() {
Ok(g) => g,
Err(poisoned) => poisoned.into_inner(),
};
let cfg = guard.config_snapshot();
let show_files = cfg.show_files;
let show_folders = cfg.show_folders;
for path in &added {
if !is_under_root(path, root) {
continue;
}
if exclusion.should_exclude_path_under_root(path, root) {
continue;
}
if let Some(item) =
path_to_search_item(path, root, &cfg, show_files, show_folders)
{
upsert_items.push(item);
}
}
}
let removed_ids: Vec<String> = removed
.iter()
.filter(|path| is_under_root(path, root))
.map(|path| id_for_path(path))
.collect();
let mut deleted = 0usize;
let mut upserted = 0usize;
{
let guard = match service.write() {
Ok(g) => g,
Err(poisoned) => poisoned.into_inner(),
};
for id in &removed_ids {
if guard.delete_item_by_id(id).is_ok() {
deleted += 1;
}
}
for item in &upsert_items {
if guard.upsert_item(item).is_ok() {
upserted += 1;
}
}
}
if upserted > 0 || deleted > 0 {
crate::runtime::log_info(&format!(
"[nex] directory_watcher root=\"{}\" flush added={} removed={}",
root.display(),
upserted,
deleted
));
}
}
fn is_under_root(path: &Path, root: &Path) -> bool {
path.strip_prefix(root).is_ok() || path == root
}
fn id_for_path(path: &Path) -> String {
let lowercased = path.to_string_lossy();
let kind = if path.is_dir() { "folder" } else { "file" };
format!("{kind}:{lowercased}")
}
fn path_to_search_item(
path: &Path,
root: &Path,
config: &Config,
show_files: bool,
show_folders: bool,
) -> Option<SearchItem> {
if path == root {
return None;
}
let metadata = match std::fs::metadata(path) {
Ok(m) => m,
Err(_) => return None,
};
let is_dir = metadata.is_dir();
if is_dir {
if !show_folders {
return None;
}
} else if !metadata.is_file() {
return None;
} else if !show_files {
return None;
}
let kind = if is_dir { "folder" } else { "file" };
let title = path
.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_else(|| path.to_string_lossy().to_string());
let path_text = path.to_string_lossy();
let id = format!("{kind}:{path_text}");
let last_accessed_epoch_secs = metadata
.modified()
.ok()
.and_then(|mtime| mtime.duration_since(UNIX_EPOCH).ok())
.map(|d| d.as_secs() as i64)
.unwrap_or(0);
let _ = config; Some(SearchItem::new(&id, kind, &title, &path_text).with_usage(0, last_accessed_epoch_secs))
}
fn panic_message_to_string(payload: &Box<dyn std::any::Any + Send>) -> String {
if let Some(message) = payload.downcast_ref::<&'static str>() {
(*message).to_string()
} else if let Some(message) = payload.downcast_ref::<String>() {
message.clone()
} else {
"non-string panic payload".to_string()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::file_watcher::{WatcherEvent, WatcherEventKind};
fn evt(kind: WatcherEventKind, p: &str) -> WatcherEvent {
WatcherEvent {
kind,
path: PathBuf::from(p),
}
}
#[test]
fn added_then_removed_collapses_to_noop() {
let mut added = HashMap::new();
let mut removed = HashSet::new();
apply_event_to_pending(evt(WatcherEventKind::Added, "/a"), &mut added, &mut removed);
apply_event_to_pending(evt(WatcherEventKind::Removed, "/a"), &mut added, &mut removed);
assert!(added.is_empty());
assert!(removed.is_empty());
}
#[test]
fn removed_then_added_collapses_to_noop() {
let mut added = HashMap::new();
let mut removed = HashSet::new();
apply_event_to_pending(
evt(WatcherEventKind::Removed, "/a"),
&mut added,
&mut removed,
);
apply_event_to_pending(evt(WatcherEventKind::Added, "/a"), &mut added, &mut removed);
assert!(added.is_empty());
assert!(removed.is_empty());
}
#[test]
fn added_only_keeps_add() {
let mut added = HashMap::new();
let mut removed = HashSet::new();
apply_event_to_pending(evt(WatcherEventKind::Added, "/a"), &mut added, &mut removed);
assert!(added.contains_key(&PathBuf::from("/a")));
assert!(removed.is_empty());
}
#[test]
fn removed_only_keeps_remove() {
let mut added = HashMap::new();
let mut removed = HashSet::new();
apply_event_to_pending(
evt(WatcherEventKind::Removed, "/a"),
&mut added,
&mut removed,
);
assert!(added.is_empty());
assert!(removed.contains(&PathBuf::from("/a")));
}
#[test]
fn modified_treated_as_added() {
let mut added = HashMap::new();
let mut removed = HashSet::new();
apply_event_to_pending(
evt(WatcherEventKind::Modified, "/a"),
&mut added,
&mut removed,
);
assert!(added.contains_key(&PathBuf::from("/a")));
}
#[test]
fn renamed_inserts_as_added() {
let mut added = HashMap::new();
let mut removed = HashSet::new();
apply_event_to_pending(
evt(WatcherEventKind::Renamed, "/a/new"),
&mut added,
&mut removed,
);
assert!(added.contains_key(&PathBuf::from("/a/new")));
}
#[test]
fn id_for_path_uses_kind_prefix() {
assert_eq!(
id_for_path(&PathBuf::from("/some/file.txt")),
"file:/some/file.txt"
);
}
#[test]
fn is_under_root_accepts_descendants() {
let root = PathBuf::from("/a");
assert!(is_under_root(&PathBuf::from("/a/b/c"), &root));
assert!(is_under_root(&PathBuf::from("/a"), &root));
assert!(!is_under_root(&PathBuf::from("/b"), &root));
}
#[test]
fn batch_cap_drops_excess_events() {
let mut added: HashMap<PathBuf, ()> = HashMap::new();
let mut removed: HashSet<PathBuf> = HashSet::new();
let mut total_dropped = 0usize;
for i in 0..(CONSUMER_BATCH_CAP + 5) {
if added.len() + removed.len() >= CONSUMER_BATCH_CAP {
total_dropped += 1;
continue;
}
apply_event_to_pending(
evt(WatcherEventKind::Added, &format!("/p/{i}")),
&mut added,
&mut removed,
);
}
assert_eq!(added.len() + removed.len(), CONSUMER_BATCH_CAP);
assert_eq!(total_dropped, 5);
}
#[test]
fn time_to_unix_handles_pre_epoch() {
let before_epoch = std::time::SystemTime::UNIX_EPOCH
.checked_sub(Duration::from_secs(60))
.expect("system time before epoch");
let secs = before_epoch
.duration_since(UNIX_EPOCH)
.ok()
.map(|d| d.as_secs() as i64)
.unwrap_or(0);
assert_eq!(secs, 0);
}
}