toolpath_claude/
async_watcher.rs1use crate::error::Result;
7use crate::reader::ConversationReader;
8use crate::types::ConversationEntry;
9use notify::{Event, RecommendedWatcher, RecursiveMode, Watcher};
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::sync::Mutex;
14use tokio::sync::mpsc;
15
16#[derive(Debug, Clone)]
18pub struct WatcherConfig {
19 pub poll_interval: Duration,
21 pub debounce: Duration,
23}
24
25impl Default for WatcherConfig {
26 fn default() -> Self {
27 Self {
28 poll_interval: Duration::from_secs(5),
29 debounce: Duration::from_millis(100),
30 }
31 }
32}
33
34pub struct AsyncConversationWatcher {
37 file_path: PathBuf,
39 byte_offset: Arc<Mutex<u64>>,
41 config: WatcherConfig,
43}
44
45impl AsyncConversationWatcher {
46 pub fn new(file_path: PathBuf, config: Option<WatcherConfig>) -> Self {
52 Self {
53 file_path,
54 byte_offset: Arc::new(Mutex::new(0)),
55 config: config.unwrap_or_default(),
56 }
57 }
58
59 pub fn with_offset(file_path: PathBuf, offset: u64, config: Option<WatcherConfig>) -> Self {
62 Self {
63 file_path,
64 byte_offset: Arc::new(Mutex::new(offset)),
65 config: config.unwrap_or_default(),
66 }
67 }
68
69 pub async fn offset(&self) -> u64 {
71 *self.byte_offset.lock().await
72 }
73
74 pub async fn poll(&self) -> Result<Vec<ConversationEntry>> {
77 let mut offset = self.byte_offset.lock().await;
78 let (entries, new_offset) = ConversationReader::read_from_offset(&self.file_path, *offset)?;
79 *offset = new_offset;
80 Ok(entries)
81 }
82
83 pub async fn start(self, tx: mpsc::Sender<Vec<ConversationEntry>>) -> Result<WatcherHandle> {
90 let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1);
91 let file_path = self.file_path.clone();
92 let byte_offset = self.byte_offset.clone();
93 let poll_interval = self.config.poll_interval;
94 let debounce = self.config.debounce;
95
96 let (event_tx, mut event_rx) = mpsc::channel::<()>(16);
98
99 let event_tx_clone = event_tx.clone();
101 let file_path_clone = file_path.clone();
102
103 let watcher_result: std::result::Result<RecommendedWatcher, notify::Error> =
105 notify::recommended_watcher(move |res: std::result::Result<Event, notify::Error>| {
106 if let Ok(event) = res {
107 if event.kind.is_modify() {
109 for path in &event.paths {
110 if path == &file_path_clone {
111 let _ = event_tx_clone.blocking_send(());
112 break;
113 }
114 }
115 }
116 }
117 });
118
119 let mut watcher = match watcher_result {
121 Ok(mut w) => {
122 if let Some(parent) = file_path.parent() {
123 let _ = w.watch(parent, RecursiveMode::NonRecursive);
124 }
125 Some(w)
126 }
127 Err(e) => {
128 eprintln!(
129 "Warning: Failed to create file watcher: {}. Using poll-only mode.",
130 e
131 );
132 None
133 }
134 };
135
136 let handle = tokio::spawn(async move {
138 let mut poll_timer = tokio::time::interval(poll_interval);
139 let mut last_event = std::time::Instant::now();
140
141 loop {
142 tokio::select! {
143 _ = stop_rx.recv() => {
145 break;
146 }
147
148 Some(()) = event_rx.recv() => {
150 let now = std::time::Instant::now();
151 if now.duration_since(last_event) >= debounce {
152 last_event = now;
153 if let Ok(entries) = read_new_entries(&file_path, &byte_offset).await
154 && !entries.is_empty() && tx.send(entries).await.is_err()
155 {
156 break; }
158 }
159 }
160
161 _ = poll_timer.tick() => {
163 if let Ok(entries) = read_new_entries(&file_path, &byte_offset).await
164 && !entries.is_empty() && tx.send(entries).await.is_err()
165 {
166 break; }
168 }
169 }
170 }
171
172 drop(watcher.take());
174 });
175
176 Ok(WatcherHandle {
177 stop_tx,
178 _task: handle,
179 })
180 }
181}
182
183async fn read_new_entries(
185 file_path: &PathBuf,
186 byte_offset: &Arc<Mutex<u64>>,
187) -> Result<Vec<ConversationEntry>> {
188 let mut offset = byte_offset.lock().await;
189 let (entries, new_offset) = ConversationReader::read_from_offset(file_path, *offset)?;
190 *offset = new_offset;
191 Ok(entries)
192}
193
194pub struct WatcherHandle {
196 stop_tx: mpsc::Sender<()>,
197 _task: tokio::task::JoinHandle<()>,
198}
199
200impl WatcherHandle {
201 pub async fn stop(self) {
203 let _ = self.stop_tx.send(()).await;
204 }
205}
206
207#[cfg(test)]
208mod tests {
209 use super::*;
210 use std::io::Write;
211 use tempfile::NamedTempFile;
212 use tokio::time::timeout;
213
214 #[tokio::test]
215 async fn test_poll_basic() {
216 let mut temp = NamedTempFile::new().unwrap();
217 writeln!(
218 temp,
219 r#"{{"type":"user","uuid":"123","timestamp":"2024-01-01T00:00:00Z","sessionId":"test","message":{{"role":"user","content":"Hello"}}}}"#
220 )
221 .unwrap();
222 temp.flush().unwrap();
223
224 let watcher = AsyncConversationWatcher::new(temp.path().to_path_buf(), None);
225
226 let entries = watcher.poll().await.unwrap();
228 assert_eq!(entries.len(), 1);
229
230 let entries = watcher.poll().await.unwrap();
232 assert!(entries.is_empty());
233
234 writeln!(
236 temp,
237 r#"{{"type":"assistant","uuid":"456","timestamp":"2024-01-01T00:00:01Z","sessionId":"test","message":{{"role":"assistant","content":"Hi"}}}}"#
238 )
239 .unwrap();
240 temp.flush().unwrap();
241
242 let entries = watcher.poll().await.unwrap();
244 assert_eq!(entries.len(), 1);
245 }
246
247 #[tokio::test]
248 async fn test_watcher_start_and_stop() {
249 let mut temp = NamedTempFile::new().unwrap();
250 writeln!(
251 temp,
252 r#"{{"type":"user","uuid":"123","timestamp":"2024-01-01T00:00:00Z","sessionId":"test","message":{{"role":"user","content":"Hello"}}}}"#
253 )
254 .unwrap();
255 temp.flush().unwrap();
256
257 let config = WatcherConfig {
258 poll_interval: Duration::from_millis(50),
259 debounce: Duration::from_millis(10),
260 };
261
262 let watcher = AsyncConversationWatcher::new(temp.path().to_path_buf(), Some(config));
263 let (tx, mut rx) = mpsc::channel(16);
264
265 let handle = watcher.start(tx).await.unwrap();
266
267 let entries = timeout(Duration::from_secs(1), rx.recv())
269 .await
270 .expect("timeout")
271 .expect("channel closed");
272 assert_eq!(entries.len(), 1);
273
274 handle.stop().await;
276 }
277
278 #[tokio::test]
279 async fn test_offset_persistence() {
280 let mut temp = NamedTempFile::new().unwrap();
281 writeln!(
282 temp,
283 r#"{{"type":"user","uuid":"123","timestamp":"2024-01-01T00:00:00Z","sessionId":"test","message":{{"role":"user","content":"Hello"}}}}"#
284 )
285 .unwrap();
286 temp.flush().unwrap();
287
288 let watcher1 = AsyncConversationWatcher::new(temp.path().to_path_buf(), None);
290 let _ = watcher1.poll().await.unwrap();
291 let offset = watcher1.offset().await;
292 assert!(offset > 0);
293
294 writeln!(
296 temp,
297 r#"{{"type":"assistant","uuid":"456","timestamp":"2024-01-01T00:00:01Z","sessionId":"test","message":{{"role":"assistant","content":"Hi"}}}}"#
298 )
299 .unwrap();
300 temp.flush().unwrap();
301
302 let watcher2 =
304 AsyncConversationWatcher::with_offset(temp.path().to_path_buf(), offset, None);
305
306 let entries = watcher2.poll().await.unwrap();
308 assert_eq!(entries.len(), 1);
309 assert_eq!(entries[0].uuid, "456");
310 }
311}