use crate::event::{ConfigScope, DataEvent, EventBus};
use crate::store::DataStore;
use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tracing::{debug, error, info, trace};
#[derive(Debug, Clone)]
pub struct WatcherConfig {
pub debounce_delay: Duration,
pub max_debounce_delay: Duration,
pub burst_threshold: u32,
pub extra_watch_paths: Vec<PathBuf>,
}
impl Default for WatcherConfig {
fn default() -> Self {
let extra_watch_paths = dirs::home_dir()
.map(|h| {
let ccboard_dir = h.join(".ccboard");
if ccboard_dir.exists() {
vec![ccboard_dir]
} else {
vec![]
}
})
.unwrap_or_default();
Self {
debounce_delay: Duration::from_millis(500),
max_debounce_delay: Duration::from_secs(3),
burst_threshold: 10,
extra_watch_paths,
}
}
}
pub struct FileWatcher {
_watcher: RecommendedWatcher,
shutdown_tx: mpsc::Sender<()>,
}
impl FileWatcher {
pub async fn start(
claude_home: PathBuf,
project_path: Option<PathBuf>,
store: Arc<DataStore>,
config: WatcherConfig,
) -> Result<Self, notify::Error> {
let (event_tx, mut event_rx) = mpsc::channel::<notify::Result<Event>>(100);
let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
let watcher = RecommendedWatcher::new(
move |res| {
let _ = event_tx.blocking_send(res);
},
Config::default().with_poll_interval(Duration::from_secs(2)),
)?;
let mut file_watcher = Self {
_watcher: watcher,
shutdown_tx,
};
file_watcher.watch_path(&claude_home, RecursiveMode::NonRecursive)?;
let projects_dir = claude_home.join("projects");
if projects_dir.exists() {
file_watcher.watch_path(&projects_dir, RecursiveMode::NonRecursive)?;
if let Ok(entries) = std::fs::read_dir(&projects_dir) {
for entry in entries.flatten() {
if entry.path().is_dir() {
let _ = file_watcher.watch_path(&entry.path(), RecursiveMode::NonRecursive);
}
}
}
}
let cache_dir = claude_home.join("cache");
if cache_dir.exists() {
file_watcher.watch_path(&cache_dir, RecursiveMode::NonRecursive)?;
}
if let Some(ref proj) = project_path {
let claude_dir = proj.join(".claude");
if claude_dir.exists() {
file_watcher.watch_path(&claude_dir, RecursiveMode::NonRecursive)?;
for subdir in ["agents", "commands", "skills", "hooks"].iter() {
let path = claude_dir.join(subdir);
if path.exists() {
let _ = file_watcher.watch_path(&path, RecursiveMode::Recursive);
}
}
}
}
for extra_path in &config.extra_watch_paths {
if extra_path.exists() {
let _ = file_watcher.watch_path(extra_path, RecursiveMode::NonRecursive);
debug!(path = %extra_path.display(), "Watching extra path for live sessions");
}
}
info!(claude_home = %claude_home.display(), "File watcher started");
let event_bus = store.event_bus().clone();
tokio::spawn(async move {
let mut debounce_state = DebounceState::new(config);
loop {
tokio::select! {
Some(result) = event_rx.recv() => {
match result {
Ok(event) => {
if let Some((data_event, path)) = Self::process_event(&event, &claude_home, project_path.as_deref()) {
if debounce_state.should_emit(&data_event) {
debug!(?data_event, "Emitting file change event");
Self::handle_event(data_event, Some(&path), &store, &event_bus).await;
}
}
}
Err(e) => {
error!(error = %e, "File watcher error");
event_bus.publish(DataEvent::WatcherError(e.to_string()));
}
}
}
_ = shutdown_rx.recv() => {
info!("File watcher shutting down");
break;
}
}
}
});
Ok(file_watcher)
}
fn watch_path(&mut self, path: &Path, mode: RecursiveMode) -> Result<(), notify::Error> {
self._watcher.watch(path, mode)?;
debug!(path = %path.display(), "Watching path");
Ok(())
}
fn process_event(
event: &Event,
claude_home: &Path,
project_path: Option<&Path>,
) -> Option<(DataEvent, PathBuf)> {
match event.kind {
EventKind::Create(_) | EventKind::Modify(_) => {}
_ => return None,
}
let path = event.paths.first()?;
let path_str = path.to_string_lossy();
trace!(path = %path_str, "Processing file event");
if path
.file_name()
.map(|n| n == "stats-cache.json")
.unwrap_or(false)
{
return Some((DataEvent::StatsUpdated, path.clone()));
}
if path
.file_name()
.map(|n| n == "live-sessions.json")
.unwrap_or(false)
{
return Some((DataEvent::LiveSessionStatusChanged, path.clone()));
}
if path.extension().map(|e| e == "jsonl").unwrap_or(false) && path_str.contains("projects")
{
let session_id = path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("unknown")
.to_string();
return Some((DataEvent::SessionUpdated(session_id.into()), path.clone()));
}
if *path == claude_home.join("settings.json") {
return Some((DataEvent::ConfigChanged(ConfigScope::Global), path.clone()));
}
if let Some(proj) = project_path {
if *path == proj.join(".claude").join("settings.json") {
return Some((
DataEvent::ConfigChanged(ConfigScope::Project(
proj.to_string_lossy().to_string(),
)),
path.clone(),
));
}
if *path == proj.join(".claude").join("settings.local.json") {
return Some((
DataEvent::ConfigChanged(ConfigScope::Local(
proj.to_string_lossy().to_string(),
)),
path.clone(),
));
}
}
if path
.file_name()
.map(|n| n == "claude_desktop_config.json")
.unwrap_or(false)
{
return Some((DataEvent::ConfigChanged(ConfigScope::Mcp), path.clone()));
}
None
}
async fn handle_event(
event: DataEvent,
path: Option<&Path>,
store: &DataStore,
event_bus: &EventBus,
) {
match &event {
DataEvent::StatsUpdated => {
store.reload_stats().await;
}
DataEvent::SessionUpdated(_id) | DataEvent::SessionCreated(_id) => {
if let Some(p) = path {
store.update_session(p).await;
}
}
DataEvent::ConfigChanged(_scope) => {
store.reload_settings().await;
}
DataEvent::LiveSessionStatusChanged => {
if let Some(p) = path {
store.reload_live_hook_sessions(p).await;
}
}
_ => {}
}
event_bus.publish(event);
}
pub async fn stop(&self) {
let _ = self.shutdown_tx.send(()).await;
}
}
struct DebounceState {
config: WatcherConfig,
last_events: std::collections::HashMap<String, std::time::Instant>,
event_count_window: std::collections::VecDeque<std::time::Instant>,
}
impl DebounceState {
fn new(config: WatcherConfig) -> Self {
Self {
config,
last_events: std::collections::HashMap::new(),
event_count_window: std::collections::VecDeque::new(),
}
}
fn should_emit(&mut self, event: &DataEvent) -> bool {
let now = std::time::Instant::now();
let key = Self::event_key(event);
self.event_count_window.push_back(now);
while self
.event_count_window
.front()
.map(|t| now.duration_since(*t) > Duration::from_secs(1))
.unwrap_or(false)
{
self.event_count_window.pop_front();
}
let delay = if self.event_count_window.len() as u32 > self.config.burst_threshold {
self.config.max_debounce_delay
} else {
self.config.debounce_delay
};
if let Some(last) = self.last_events.get(&key) {
if now.duration_since(*last) < delay {
trace!(key = %key, "Debouncing event");
return false;
}
}
self.last_events.insert(key, now);
true
}
fn event_key(event: &DataEvent) -> String {
match event {
DataEvent::StatsUpdated => "stats".to_string(),
DataEvent::SessionCreated(id) | DataEvent::SessionUpdated(id) => {
format!("session:{}", id)
}
DataEvent::ConfigChanged(scope) => format!("config:{:?}", scope),
DataEvent::AnalyticsUpdated => "analytics".to_string(),
DataEvent::LoadCompleted => "load".to_string(),
DataEvent::WatcherError(_) => "error".to_string(),
DataEvent::LiveSessionStatusChanged => "live_sessions".to_string(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_debounce_state_basic() {
let config = WatcherConfig {
debounce_delay: Duration::from_millis(100),
max_debounce_delay: Duration::from_millis(500),
burst_threshold: 5,
extra_watch_paths: vec![],
};
let mut state = DebounceState::new(config);
assert!(state.should_emit(&DataEvent::StatsUpdated));
assert!(!state.should_emit(&DataEvent::StatsUpdated));
assert!(state.should_emit(&DataEvent::SessionUpdated("test".into())));
}
#[test]
fn test_process_event_stats() {
let claude_home = PathBuf::from("/home/user/.claude");
let event = Event {
kind: EventKind::Modify(notify::event::ModifyKind::Data(
notify::event::DataChange::Content,
)),
paths: vec![PathBuf::from("/home/user/.claude/stats-cache.json")],
..Default::default()
};
let result = FileWatcher::process_event(&event, &claude_home, None);
assert!(matches!(result, Some((DataEvent::StatsUpdated, _))));
}
#[test]
fn test_process_event_session() {
let claude_home = PathBuf::from("/home/user/.claude");
let event = Event {
kind: EventKind::Modify(notify::event::ModifyKind::Data(
notify::event::DataChange::Content,
)),
paths: vec![PathBuf::from(
"/home/user/.claude/projects/-test/abc123.jsonl",
)],
..Default::default()
};
let result = FileWatcher::process_event(&event, &claude_home, None);
assert!(matches!(result, Some((DataEvent::SessionUpdated(id), _)) if id == "abc123"));
}
}