use crate::utils::error::{Error, Result};
use notify::event::{ModifyKind, RenameMode};
use notify::{Event, EventKind, RecursiveMode};
use notify_debouncer_full::{new_debouncer, DebounceEventResult};
use std::path::{Path, PathBuf};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
use std::time::Duration;
pub struct FileWatcher {
watch_paths: Vec<PathBuf>,
pub debounce_ms: u64,
pub queue_capacity: usize,
}
#[derive(Debug, Clone)]
pub struct WatchEvent {
pub path: PathBuf,
pub timestamp: std::time::Instant,
pub kind: WatchEventKind,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WatchEventKind {
Created,
Modified,
Removed,
Renamed,
}
impl FileWatcher {
pub fn new<P: AsRef<Path>>(watch_paths: Vec<P>) -> Self {
Self {
watch_paths: watch_paths
.iter()
.map(|p| p.as_ref().to_path_buf())
.collect(),
debounce_ms: 500,
queue_capacity: 10,
}
}
pub fn with_debounce_ms(mut self, debounce_ms: u64) -> Self {
self.debounce_ms = debounce_ms;
self
}
pub fn with_queue_capacity(mut self, capacity: usize) -> Self {
self.queue_capacity = capacity;
self
}
pub fn start(self) -> Result<Receiver<WatchEvent>> {
let (tx, rx) = channel();
for path in &self.watch_paths {
if !path.exists() {
return Err(Error::new(&format!(
"error[E0009]: Watch path does not exist\n --> path: '{}'\n |\n = help: Create the directory or update ggen.toml to remove from watch list\n = help: Watch paths are collected from: ontology.source, ontology.imports, and generation.rules[].query",
path.display()
)));
}
}
let debounce_duration = Duration::from_millis(self.debounce_ms);
let watch_paths = self.watch_paths.clone();
std::thread::spawn(move || {
if let Err(e) = Self::watch_loop(watch_paths, debounce_duration, tx) {
eprintln!("Watch error: {}", e);
}
});
Ok(rx)
}
fn watch_loop(
watch_paths: Vec<PathBuf>, debounce_duration: Duration, tx: Sender<WatchEvent>,
) -> Result<()> {
let tx_clone = tx.clone();
let mut debouncer = new_debouncer(
debounce_duration,
None,
move |result: DebounceEventResult| {
match result {
Ok(events) => {
for event in events {
if let Some(watch_event) = Self::convert_event(event.event) {
let _ = tx_clone.send(watch_event);
}
}
}
Err(errors) => {
for error in errors {
eprintln!("Watch error: {:?}", error);
}
}
}
},
)
.map_err(|e| Error::new(&format!("Failed to create file watcher: {}", e)))?;
for path in &watch_paths {
let watch_mode = if path.is_dir() {
RecursiveMode::Recursive
} else {
RecursiveMode::NonRecursive
};
debouncer
.watch(path, watch_mode)
.map_err(|e| Error::new(&format!("Failed to watch {}: {}", path.display(), e)))?;
}
loop {
std::thread::sleep(Duration::from_secs(1));
}
}
fn convert_event(event: Event) -> Option<WatchEvent> {
let kind = match event.kind {
EventKind::Create(_) => WatchEventKind::Created,
EventKind::Modify(ModifyKind::Data(_) | ModifyKind::Any) => WatchEventKind::Modified,
EventKind::Remove(_) => WatchEventKind::Removed,
EventKind::Modify(ModifyKind::Name(RenameMode::Both | RenameMode::To)) => {
WatchEventKind::Renamed
}
_ => return None, };
event.paths.into_iter().next().map(|path| WatchEvent {
path,
timestamp: std::time::Instant::now(),
kind,
})
}
pub fn wait_for_change(
rx: &Receiver<WatchEvent>, timeout: Duration,
) -> Result<Option<WatchEvent>> {
match rx.recv_timeout(timeout) {
Ok(event) => Ok(Some(event)),
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => Ok(None),
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
Err(Error::new(
"error[E0007]: Watch system stopped unexpectedly\n |\n = help: This usually indicates a crash in the watch thread\n = help: Check logs above for panic or error messages\n = help: Try restarting: ggen sync --watch\n = help: If issue persists, run without --watch to debug",
))
}
}
}
}
pub fn collect_watch_paths(
manifest_path: &Path, manifest: &crate::manifest::GgenManifest, base_path: &Path,
) -> Vec<PathBuf> {
use crate::manifest::{QuerySource, TemplateSource};
let mut paths = Vec::new();
paths.push(manifest_path.to_path_buf());
paths.push(base_path.join(&manifest.ontology.source));
for import in &manifest.ontology.imports {
paths.push(base_path.join(import));
}
for rule in &manifest.generation.rules {
if let QuerySource::File { file } = &rule.query {
paths.push(base_path.join(file));
}
}
for rule in &manifest.generation.rules {
if let TemplateSource::File { file } = &rule.template {
paths.push(base_path.join(file));
}
}
paths
}
pub fn install_shutdown_handler() -> Result<Arc<std::sync::atomic::AtomicBool>> {
use signal_hook::consts::SIGINT;
use signal_hook::flag;
use std::sync::atomic::AtomicBool;
let shutdown = Arc::new(AtomicBool::new(false));
flag::register(SIGINT, Arc::clone(&shutdown))
.map_err(|e| Error::new(&format!("Failed to register SIGINT handler: {}", e)))?;
Ok(shutdown)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_file_watcher_creation() {
let paths = vec![PathBuf::from(".")];
let watcher = FileWatcher::new(paths);
assert_eq!(watcher.debounce_ms, 500); assert_eq!(watcher.queue_capacity, 10);
}
#[test]
fn test_file_watcher_configuration() {
let paths = vec![PathBuf::from(".")];
let watcher = FileWatcher::new(paths)
.with_debounce_ms(1000)
.with_queue_capacity(20);
assert_eq!(watcher.debounce_ms, 1000);
assert_eq!(watcher.queue_capacity, 20);
}
#[test]
fn test_watch_event_kind() {
let event = WatchEvent {
path: PathBuf::from("test.txt"),
timestamp: std::time::Instant::now(),
kind: WatchEventKind::Modified,
};
assert_eq!(event.kind, WatchEventKind::Modified);
}
#[test]
fn test_collect_watch_paths_empty() {
use crate::manifest::{
GenerationConfig, GgenManifest, InferenceConfig, OntologyConfig, ProjectConfig,
ValidationConfig,
};
use std::collections::BTreeMap;
use std::path::PathBuf;
let manifest = GgenManifest {
project: ProjectConfig {
name: "test".to_string(),
version: "1.0.0".to_string(),
description: None,
},
ontology: OntologyConfig {
source: PathBuf::from("ontology.ttl"),
imports: vec![],
base_iri: None,
prefixes: BTreeMap::new(),
},
inference: InferenceConfig {
rules: vec![],
max_reasoning_timeout_ms: 5000,
},
generation: GenerationConfig {
rules: vec![],
max_sparql_timeout_ms: 5000,
require_audit_trail: false,
determinism_salt: None,
output_dir: PathBuf::from("."),
enable_llm: false,
llm_model: None,
llm_provider: None,
},
validation: ValidationConfig::default(),
};
let manifest_path = Path::new("ggen.toml");
let base_path = Path::new(".");
let paths = collect_watch_paths(manifest_path, &manifest, base_path);
assert!(paths.len() >= 2);
assert!(paths.contains(&PathBuf::from("ggen.toml")));
assert!(
paths
.iter()
.any(|p| p.to_string_lossy().ends_with("ontology.ttl")),
"Should contain ontology.ttl path (possibly joined with base_path)"
);
}
}