Skip to main content

flow_server/
watcher.rs

1use notify::{RecommendedWatcher, RecursiveMode, Watcher};
2use serde::Serialize;
3use std::path::Path;
4use tokio::sync::broadcast;
5use tracing::{error, info, warn};
6
7#[derive(Debug, Clone, Serialize)]
8struct SsePayload {
9    #[serde(rename = "type")]
10    event_type: String,
11    #[serde(skip_serializing_if = "Option::is_none")]
12    event: Option<String>,
13    #[serde(skip_serializing_if = "Option::is_none", rename = "sessionId")]
14    session_id: Option<String>,
15    #[serde(skip_serializing_if = "Option::is_none")]
16    file: Option<String>,
17}
18
19/// Set up file watchers for tasks and projects directories
20///
21/// # Panics
22///
23/// May panic if the fallback dummy watcher cannot be created.
24#[allow(clippy::cognitive_complexity)]
25pub fn setup_file_watcher(
26    tasks_dir: &Path,
27    projects_dir: &Path,
28    tx: broadcast::Sender<String>,
29) -> Option<(RecommendedWatcher, RecommendedWatcher)> {
30    let tasks_dir_clone = tasks_dir.to_path_buf();
31
32    // Tasks watcher
33    let tx_tasks = tx.clone();
34    let tasks_watcher_result =
35        notify::recommended_watcher(move |res: Result<notify::Event, notify::Error>| {
36            if let Ok(event) = res {
37                for path in &event.paths {
38                    if path.extension().is_some_and(|e| e == "json") {
39                        let relative = path.strip_prefix(&tasks_dir_clone).unwrap_or(path);
40                        let session_id = relative
41                            .components()
42                            .next()
43                            .map(|c| c.as_os_str().to_string_lossy().to_string())
44                            .unwrap_or_default();
45                        let file_name = path
46                            .file_name()
47                            .map(|f| f.to_string_lossy().to_string())
48                            .unwrap_or_default();
49
50                        let event_name = if matches!(event.kind, notify::EventKind::Create(_)) {
51                            "add"
52                        } else {
53                            "change"
54                        };
55
56                        let payload = SsePayload {
57                            event_type: "update".to_string(),
58                            event: Some(event_name.to_string()),
59                            session_id: Some(session_id),
60                            file: Some(file_name),
61                        };
62
63                        if let Ok(json) = serde_json::to_string(&payload) {
64                            let _ = tx_tasks.send(json);
65                        }
66                    }
67                }
68            }
69        });
70
71    let mut tasks_watcher = match tasks_watcher_result {
72        Ok(w) => w,
73        Err(e) => {
74            error!("Failed to create tasks watcher: {e}");
75            return None;
76        }
77    };
78
79    // Ensure tasks dir exists before watching
80    if tasks_dir.exists() {
81        if let Err(e) = tasks_watcher.watch(tasks_dir, RecursiveMode::Recursive) {
82            warn!("Failed to watch tasks dir (will retry): {e}");
83        }
84    } else {
85        info!("Tasks directory doesn't exist yet, will watch parent");
86        // Watch the parent (.claude) so we catch when tasks/ is created
87        if let Some(parent) = tasks_dir.parent() {
88            if parent.exists() {
89                let _ = tasks_watcher.watch(parent, RecursiveMode::NonRecursive);
90            }
91        }
92    }
93
94    // Projects watcher (for metadata changes)
95    let tx_projects = tx;
96    let projects_watcher_result =
97        notify::recommended_watcher(move |res: Result<notify::Event, notify::Error>| {
98            if let Ok(event) = res {
99                for path in &event.paths {
100                    if path.extension().is_some_and(|e| e == "jsonl") {
101                        let payload = serde_json::json!({ "type": "metadata-update" });
102                        let _ = tx_projects.send(payload.to_string());
103                    }
104                }
105            }
106        });
107
108    let mut projects_watcher = match projects_watcher_result {
109        Ok(w) => w,
110        Err(e) => {
111            error!("Failed to create projects watcher: {e}");
112            return Some((tasks_watcher, notify::recommended_watcher(|_| {}).unwrap()));
113        }
114    };
115
116    if projects_dir.exists() {
117        if let Err(e) = projects_watcher.watch(projects_dir, RecursiveMode::Recursive) {
118            warn!("Failed to watch projects dir: {e}");
119        }
120    }
121
122    Some((tasks_watcher, projects_watcher))
123}