Skip to main content

toolpath_claude/
async_watcher.rs

1//! Async Conversation Watcher
2//!
3//! Provides event-driven file watching for Claude conversation JSONL files.
4//! Uses the `notify` crate for filesystem events with a periodic fallback poll.
5
6use crate::error::Result;
7use crate::reader::ConversationReader;
8use crate::types::ConversationEntry;
9use notify::{Event, RecommendedWatcher, RecursiveMode, Watcher};
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::sync::Mutex;
14use tokio::sync::mpsc;
15
16/// Configuration for the async watcher
17#[derive(Debug, Clone)]
18pub struct WatcherConfig {
19    /// Fallback poll interval (safety net for missed events)
20    pub poll_interval: Duration,
21    /// Debounce duration for rapid file changes
22    pub debounce: Duration,
23}
24
25impl Default for WatcherConfig {
26    fn default() -> Self {
27        Self {
28            poll_interval: Duration::from_secs(5),
29            debounce: Duration::from_millis(100),
30        }
31    }
32}
33
34/// Async conversation watcher that uses filesystem events
35/// with a periodic fallback poll for reliability.
36pub struct AsyncConversationWatcher {
37    /// Path to the JSONL conversation file
38    file_path: PathBuf,
39    /// Current byte offset in the file
40    byte_offset: Arc<Mutex<u64>>,
41    /// Configuration
42    config: WatcherConfig,
43}
44
45impl AsyncConversationWatcher {
46    /// Create a new async watcher for a conversation file.
47    ///
48    /// # Arguments
49    /// * `file_path` - Path to the JSONL conversation file
50    /// * `config` - Optional configuration (uses defaults if None)
51    pub fn new(file_path: PathBuf, config: Option<WatcherConfig>) -> Self {
52        Self {
53            file_path,
54            byte_offset: Arc::new(Mutex::new(0)),
55            config: config.unwrap_or_default(),
56        }
57    }
58
59    /// Create a watcher starting from a specific byte offset.
60    /// Useful for resuming watching after a restart.
61    pub fn with_offset(file_path: PathBuf, offset: u64, config: Option<WatcherConfig>) -> Self {
62        Self {
63            file_path,
64            byte_offset: Arc::new(Mutex::new(offset)),
65            config: config.unwrap_or_default(),
66        }
67    }
68
69    /// Get the current byte offset
70    pub async fn offset(&self) -> u64 {
71        *self.byte_offset.lock().await
72    }
73
74    /// Check for new entries since last read (non-blocking poll).
75    /// Returns new entries and updates internal offset.
76    pub async fn poll(&self) -> Result<Vec<ConversationEntry>> {
77        let mut offset = self.byte_offset.lock().await;
78        let (entries, new_offset) = ConversationReader::read_from_offset(&self.file_path, *offset)?;
79        *offset = new_offset;
80        Ok(entries)
81    }
82
83    /// Start watching the file and send new entries to the provided channel.
84    /// This spawns a background task that:
85    /// 1. Watches for filesystem modify events
86    /// 2. Polls periodically as a safety fallback
87    ///
88    /// Returns a handle that can be used to stop the watcher.
89    pub async fn start(self, tx: mpsc::Sender<Vec<ConversationEntry>>) -> Result<WatcherHandle> {
90        let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1);
91        let file_path = self.file_path.clone();
92        let byte_offset = self.byte_offset.clone();
93        let poll_interval = self.config.poll_interval;
94        let debounce = self.config.debounce;
95
96        // Channel for filesystem events
97        let (event_tx, mut event_rx) = mpsc::channel::<()>(16);
98
99        // Set up the filesystem watcher
100        let event_tx_clone = event_tx.clone();
101        let file_path_clone = file_path.clone();
102
103        // Create the watcher in a blocking context since notify isn't async
104        let watcher_result: std::result::Result<RecommendedWatcher, notify::Error> =
105            notify::recommended_watcher(move |res: std::result::Result<Event, notify::Error>| {
106                if let Ok(event) = res {
107                    // Only trigger on modify events for our file
108                    if event.kind.is_modify() {
109                        for path in &event.paths {
110                            if path == &file_path_clone {
111                                let _ = event_tx_clone.blocking_send(());
112                                break;
113                            }
114                        }
115                    }
116                }
117            });
118
119        // Watch the parent directory (notify works better with directories)
120        let mut watcher = match watcher_result {
121            Ok(mut w) => {
122                if let Some(parent) = file_path.parent() {
123                    let _ = w.watch(parent, RecursiveMode::NonRecursive);
124                }
125                Some(w)
126            }
127            Err(e) => {
128                eprintln!(
129                    "Warning: Failed to create file watcher: {}. Using poll-only mode.",
130                    e
131                );
132                None
133            }
134        };
135
136        // Spawn the main watch loop
137        let handle = tokio::spawn(async move {
138            let mut poll_timer = tokio::time::interval(poll_interval);
139            let mut last_event = std::time::Instant::now();
140
141            loop {
142                tokio::select! {
143                    // Stop signal
144                    _ = stop_rx.recv() => {
145                        break;
146                    }
147
148                    // Filesystem event (debounced)
149                    Some(()) = event_rx.recv() => {
150                        let now = std::time::Instant::now();
151                        if now.duration_since(last_event) >= debounce {
152                            last_event = now;
153                            if let Ok(entries) = read_new_entries(&file_path, &byte_offset).await
154                                && !entries.is_empty() && tx.send(entries).await.is_err()
155                            {
156                                break; // Receiver dropped
157                            }
158                        }
159                    }
160
161                    // Periodic fallback poll
162                    _ = poll_timer.tick() => {
163                        if let Ok(entries) = read_new_entries(&file_path, &byte_offset).await
164                            && !entries.is_empty() && tx.send(entries).await.is_err()
165                        {
166                            break; // Receiver dropped
167                        }
168                    }
169                }
170            }
171
172            // Clean up watcher
173            drop(watcher.take());
174        });
175
176        Ok(WatcherHandle {
177            stop_tx,
178            _task: handle,
179        })
180    }
181}
182
183/// Read new entries from offset and update the offset
184async fn read_new_entries(
185    file_path: &PathBuf,
186    byte_offset: &Arc<Mutex<u64>>,
187) -> Result<Vec<ConversationEntry>> {
188    let mut offset = byte_offset.lock().await;
189    let (entries, new_offset) = ConversationReader::read_from_offset(file_path, *offset)?;
190    *offset = new_offset;
191    Ok(entries)
192}
193
194/// Handle to control a running watcher
195pub struct WatcherHandle {
196    stop_tx: mpsc::Sender<()>,
197    _task: tokio::task::JoinHandle<()>,
198}
199
200impl WatcherHandle {
201    /// Stop the watcher
202    pub async fn stop(self) {
203        let _ = self.stop_tx.send(()).await;
204    }
205}
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210    use std::io::Write;
211    use tempfile::NamedTempFile;
212    use tokio::time::timeout;
213
214    #[tokio::test]
215    async fn test_poll_basic() {
216        let mut temp = NamedTempFile::new().unwrap();
217        writeln!(
218            temp,
219            r#"{{"type":"user","uuid":"123","timestamp":"2024-01-01T00:00:00Z","sessionId":"test","message":{{"role":"user","content":"Hello"}}}}"#
220        )
221        .unwrap();
222        temp.flush().unwrap();
223
224        let watcher = AsyncConversationWatcher::new(temp.path().to_path_buf(), None);
225
226        // First poll should get the entry
227        let entries = watcher.poll().await.unwrap();
228        assert_eq!(entries.len(), 1);
229
230        // Second poll should get nothing
231        let entries = watcher.poll().await.unwrap();
232        assert!(entries.is_empty());
233
234        // Add another entry
235        writeln!(
236            temp,
237            r#"{{"type":"assistant","uuid":"456","timestamp":"2024-01-01T00:00:01Z","sessionId":"test","message":{{"role":"assistant","content":"Hi"}}}}"#
238        )
239        .unwrap();
240        temp.flush().unwrap();
241
242        // Third poll should get the new entry
243        let entries = watcher.poll().await.unwrap();
244        assert_eq!(entries.len(), 1);
245    }
246
247    #[tokio::test]
248    async fn test_watcher_start_and_stop() {
249        let mut temp = NamedTempFile::new().unwrap();
250        writeln!(
251            temp,
252            r#"{{"type":"user","uuid":"123","timestamp":"2024-01-01T00:00:00Z","sessionId":"test","message":{{"role":"user","content":"Hello"}}}}"#
253        )
254        .unwrap();
255        temp.flush().unwrap();
256
257        let config = WatcherConfig {
258            poll_interval: Duration::from_millis(50),
259            debounce: Duration::from_millis(10),
260        };
261
262        let watcher = AsyncConversationWatcher::new(temp.path().to_path_buf(), Some(config));
263        let (tx, mut rx) = mpsc::channel(16);
264
265        let handle = watcher.start(tx).await.unwrap();
266
267        // Should receive initial entries from first poll
268        let entries = timeout(Duration::from_secs(1), rx.recv())
269            .await
270            .expect("timeout")
271            .expect("channel closed");
272        assert_eq!(entries.len(), 1);
273
274        // Stop the watcher
275        handle.stop().await;
276    }
277
278    #[tokio::test]
279    async fn test_offset_persistence() {
280        let mut temp = NamedTempFile::new().unwrap();
281        writeln!(
282            temp,
283            r#"{{"type":"user","uuid":"123","timestamp":"2024-01-01T00:00:00Z","sessionId":"test","message":{{"role":"user","content":"Hello"}}}}"#
284        )
285        .unwrap();
286        temp.flush().unwrap();
287
288        // Read once to get offset
289        let watcher1 = AsyncConversationWatcher::new(temp.path().to_path_buf(), None);
290        let _ = watcher1.poll().await.unwrap();
291        let offset = watcher1.offset().await;
292        assert!(offset > 0);
293
294        // Add more content
295        writeln!(
296            temp,
297            r#"{{"type":"assistant","uuid":"456","timestamp":"2024-01-01T00:00:01Z","sessionId":"test","message":{{"role":"assistant","content":"Hi"}}}}"#
298        )
299        .unwrap();
300        temp.flush().unwrap();
301
302        // Create new watcher starting from saved offset
303        let watcher2 =
304            AsyncConversationWatcher::with_offset(temp.path().to_path_buf(), offset, None);
305
306        // Should only get the new entry
307        let entries = watcher2.poll().await.unwrap();
308        assert_eq!(entries.len(), 1);
309        assert_eq!(entries[0].uuid, "456");
310    }
311}