1use notify::{RecommendedWatcher, RecursiveMode, Watcher};
2use serde::Serialize;
3use std::path::Path;
4use tokio::sync::broadcast;
5use tracing::{error, info, warn};
6
7#[derive(Debug, Clone, Serialize)]
8struct SsePayload {
9 #[serde(rename = "type")]
10 event_type: String,
11 #[serde(skip_serializing_if = "Option::is_none")]
12 event: Option<String>,
13 #[serde(skip_serializing_if = "Option::is_none", rename = "sessionId")]
14 session_id: Option<String>,
15 #[serde(skip_serializing_if = "Option::is_none")]
16 file: Option<String>,
17}
18
19#[allow(clippy::cognitive_complexity)]
25pub fn setup_file_watcher(
26 tasks_dir: &Path,
27 projects_dir: &Path,
28 tx: broadcast::Sender<String>,
29) -> Option<(RecommendedWatcher, RecommendedWatcher)> {
30 let tasks_dir_clone = tasks_dir.to_path_buf();
31
32 let tx_tasks = tx.clone();
34 let tasks_watcher_result = notify::recommended_watcher(
35 move |res: Result<notify::Event, notify::Error>| {
36 if let Ok(event) = res {
37 for path in &event.paths {
38 if path.extension().is_some_and(|e| e == "json") {
39 let relative = path
40 .strip_prefix(&tasks_dir_clone)
41 .unwrap_or(path);
42 let session_id = relative
43 .components()
44 .next()
45 .map(|c| c.as_os_str().to_string_lossy().to_string())
46 .unwrap_or_default();
47 let file_name = path
48 .file_name()
49 .map(|f| f.to_string_lossy().to_string())
50 .unwrap_or_default();
51
52 let event_name = if matches!(event.kind, notify::EventKind::Create(_)) {
53 "add"
54 } else {
55 "change"
56 };
57
58 let payload = SsePayload {
59 event_type: "update".to_string(),
60 event: Some(event_name.to_string()),
61 session_id: Some(session_id),
62 file: Some(file_name),
63 };
64
65 if let Ok(json) = serde_json::to_string(&payload) {
66 let _ = tx_tasks.send(json);
67 }
68 }
69 }
70 }
71 },
72 );
73
74 let mut tasks_watcher = match tasks_watcher_result {
75 Ok(w) => w,
76 Err(e) => {
77 error!("Failed to create tasks watcher: {e}");
78 return None;
79 }
80 };
81
82 if tasks_dir.exists() {
84 if let Err(e) = tasks_watcher.watch(tasks_dir, RecursiveMode::Recursive) {
85 warn!("Failed to watch tasks dir (will retry): {e}");
86 }
87 } else {
88 info!("Tasks directory doesn't exist yet, will watch parent");
89 if let Some(parent) = tasks_dir.parent() {
91 if parent.exists() {
92 let _ = tasks_watcher.watch(parent, RecursiveMode::NonRecursive);
93 }
94 }
95 }
96
97 let tx_projects = tx;
99 let projects_watcher_result = notify::recommended_watcher(
100 move |res: Result<notify::Event, notify::Error>| {
101 if let Ok(event) = res {
102 for path in &event.paths {
103 if path.extension().is_some_and(|e| e == "jsonl") {
104 let payload = serde_json::json!({ "type": "metadata-update" });
105 let _ = tx_projects.send(payload.to_string());
106 }
107 }
108 }
109 },
110 );
111
112 let mut projects_watcher = match projects_watcher_result {
113 Ok(w) => w,
114 Err(e) => {
115 error!("Failed to create projects watcher: {e}");
116 return Some((tasks_watcher, notify::recommended_watcher(|_| {}).unwrap()));
117 }
118 };
119
120 if projects_dir.exists() {
121 if let Err(e) = projects_watcher.watch(projects_dir, RecursiveMode::Recursive) {
122 warn!("Failed to watch projects dir: {e}");
123 }
124 }
125
126 Some((tasks_watcher, projects_watcher))
127}