Skip to main content

flow_server/
watcher.rs

1use notify::{RecommendedWatcher, RecursiveMode, Watcher};
2use serde::Serialize;
3use std::path::Path;
4use tokio::sync::broadcast;
5use tracing::{error, info, warn};
6
7#[derive(Debug, Clone, Serialize)]
8struct SsePayload {
9    #[serde(rename = "type")]
10    event_type: String,
11    #[serde(skip_serializing_if = "Option::is_none")]
12    event: Option<String>,
13    #[serde(skip_serializing_if = "Option::is_none", rename = "sessionId")]
14    session_id: Option<String>,
15    #[serde(skip_serializing_if = "Option::is_none")]
16    file: Option<String>,
17}
18
19/// Set up file watchers for tasks and projects directories
20///
21/// # Panics
22///
23/// May panic if the fallback dummy watcher cannot be created.
24#[allow(clippy::cognitive_complexity)]
25pub fn setup_file_watcher(
26    tasks_dir: &Path,
27    projects_dir: &Path,
28    tx: broadcast::Sender<String>,
29) -> Option<(RecommendedWatcher, RecommendedWatcher)> {
30    let tasks_dir_clone = tasks_dir.to_path_buf();
31
32    // Tasks watcher
33    let tx_tasks = tx.clone();
34    let tasks_watcher_result = notify::recommended_watcher(
35        move |res: Result<notify::Event, notify::Error>| {
36            if let Ok(event) = res {
37                for path in &event.paths {
38                    if path.extension().is_some_and(|e| e == "json") {
39                        let relative = path
40                            .strip_prefix(&tasks_dir_clone)
41                            .unwrap_or(path);
42                        let session_id = relative
43                            .components()
44                            .next()
45                            .map(|c| c.as_os_str().to_string_lossy().to_string())
46                            .unwrap_or_default();
47                        let file_name = path
48                            .file_name()
49                            .map(|f| f.to_string_lossy().to_string())
50                            .unwrap_or_default();
51
52                        let event_name = if matches!(event.kind, notify::EventKind::Create(_)) {
53                            "add"
54                        } else {
55                            "change"
56                        };
57
58                        let payload = SsePayload {
59                            event_type: "update".to_string(),
60                            event: Some(event_name.to_string()),
61                            session_id: Some(session_id),
62                            file: Some(file_name),
63                        };
64
65                        if let Ok(json) = serde_json::to_string(&payload) {
66                            let _ = tx_tasks.send(json);
67                        }
68                    }
69                }
70            }
71        },
72    );
73
74    let mut tasks_watcher = match tasks_watcher_result {
75        Ok(w) => w,
76        Err(e) => {
77            error!("Failed to create tasks watcher: {e}");
78            return None;
79        }
80    };
81
82    // Ensure tasks dir exists before watching
83    if tasks_dir.exists() {
84        if let Err(e) = tasks_watcher.watch(tasks_dir, RecursiveMode::Recursive) {
85            warn!("Failed to watch tasks dir (will retry): {e}");
86        }
87    } else {
88        info!("Tasks directory doesn't exist yet, will watch parent");
89        // Watch the parent (.claude) so we catch when tasks/ is created
90        if let Some(parent) = tasks_dir.parent() {
91            if parent.exists() {
92                let _ = tasks_watcher.watch(parent, RecursiveMode::NonRecursive);
93            }
94        }
95    }
96
97    // Projects watcher (for metadata changes)
98    let tx_projects = tx;
99    let projects_watcher_result = notify::recommended_watcher(
100        move |res: Result<notify::Event, notify::Error>| {
101            if let Ok(event) = res {
102                for path in &event.paths {
103                    if path.extension().is_some_and(|e| e == "jsonl") {
104                        let payload = serde_json::json!({ "type": "metadata-update" });
105                        let _ = tx_projects.send(payload.to_string());
106                    }
107                }
108            }
109        },
110    );
111
112    let mut projects_watcher = match projects_watcher_result {
113        Ok(w) => w,
114        Err(e) => {
115            error!("Failed to create projects watcher: {e}");
116            return Some((tasks_watcher, notify::recommended_watcher(|_| {}).unwrap()));
117        }
118    };
119
120    if projects_dir.exists() {
121        if let Err(e) = projects_watcher.watch(projects_dir, RecursiveMode::Recursive) {
122            warn!("Failed to watch projects dir: {e}");
123        }
124    }
125
126    Some((tasks_watcher, projects_watcher))
127}