Skip to main content

toolpath_claude/
watcher.rs

1//! Conversation watching/tailing functionality.
2//!
3//! Provides a way to watch a conversation for new entries without re-processing
4//! entries that have already been seen.
5
6use crate::ClaudeConvo;
7use crate::chain;
8use crate::error::Result;
9use crate::types::{Conversation, ConversationEntry, MessageRole};
10use std::collections::HashSet;
11
12/// Watches a conversation for new entries.
13///
14/// Tracks which entries have been seen (by UUID) and only returns new entries
15/// on subsequent polls.
16///
17/// Uses `read_segment` (single-file) internally — the watcher tails
18/// individual files and follows rotations via `ChainIndex`.
19///
20/// # Example
21///
22/// ```rust,no_run
23/// use toolpath_claude::{ClaudeConvo, ConversationWatcher};
24///
25/// let manager = ClaudeConvo::new();
26/// let mut watcher = ConversationWatcher::new(
27///     manager,
28///     "/path/to/project".to_string(),
29///     "session-uuid".to_string(),
30/// );
31///
32/// // First poll returns all existing entries
33/// let entries = watcher.poll().unwrap();
34/// println!("Initial entries: {}", entries.len());
35///
36/// // Subsequent polls return only new entries
37/// loop {
38///     std::thread::sleep(std::time::Duration::from_secs(1));
39///     let new_entries = watcher.poll().unwrap();
40///     for entry in new_entries {
41///         println!("New entry: {:?}", entry.uuid);
42///     }
43/// }
44/// ```
45#[derive(Debug)]
46pub struct ConversationWatcher {
47    manager: ClaudeConvo,
48    project: String,
49    session_id: String,
50    seen_uuids: HashSet<String>,
51    role_filter: Option<MessageRole>,
52    /// Avoids repeated successor scans when the current session is idle.
53    successor_checked: bool,
54    /// Rotations detected during the last `poll()`, consumed by
55    /// `take_pending_rotations`. Each entry is `(from_session, to_session)`.
56    pending_rotations: Vec<(String, String)>,
57    /// Cached chain index for incremental successor lookup.
58    chain_index: chain::ChainIndex,
59}
60
61impl ConversationWatcher {
62    /// Creates a new watcher for the given conversation.
63    pub fn new(manager: ClaudeConvo, project: String, session_id: String) -> Self {
64        Self {
65            manager,
66            project,
67            session_id,
68            seen_uuids: HashSet::new(),
69            role_filter: None,
70            successor_checked: false,
71            pending_rotations: Vec::new(),
72            chain_index: chain::ChainIndex::new(),
73        }
74    }
75
76    /// Sets a role filter - only entries with this role will be returned.
77    pub fn with_role_filter(mut self, role: MessageRole) -> Self {
78        self.role_filter = Some(role);
79        self
80    }
81
82    /// Returns the project path being watched.
83    pub fn project(&self) -> &str {
84        &self.project
85    }
86
87    /// Returns the session ID being watched.
88    pub fn session_id(&self) -> &str {
89        &self.session_id
90    }
91
92    /// Returns the number of entries that have been seen.
93    pub fn seen_count(&self) -> usize {
94        self.seen_uuids.len()
95    }
96
97    /// Polls for new conversation entries.
98    ///
99    /// On the first call, returns all existing entries (optionally filtered by role).
100    /// On subsequent calls, returns only entries that haven't been seen before.
101    ///
102    /// When the current session has been rotated to a successor file, the
103    /// watcher automatically follows the chain and returns entries from the
104    /// new file. Call [`Self::take_pending_rotations`] after poll to check whether
105    /// a rotation occurred.
106    pub fn poll(&mut self) -> Result<Vec<ConversationEntry>> {
107        let convo = self.manager.read_segment(&self.project, &self.session_id)?;
108        let new_entries = self.extract_new_entries(&convo)?;
109
110        if !new_entries.is_empty() {
111            self.successor_checked = false;
112            return Ok(new_entries);
113        }
114
115        // No new entries — check for a successor session
116        if self.follow_rotation()? {
117            return self.poll();
118        }
119
120        Ok(new_entries)
121    }
122
123    /// Polls and returns the full conversation along with just the new entries.
124    ///
125    /// Useful when you need both the full state and the delta.
126    /// Follows session rotations the same way [`Self::poll`] does.
127    pub fn poll_with_full(&mut self) -> Result<(Conversation, Vec<ConversationEntry>)> {
128        let convo = self.manager.read_segment(&self.project, &self.session_id)?;
129        let new_entries = self.extract_new_entries(&convo)?;
130
131        if !new_entries.is_empty() {
132            self.successor_checked = false;
133            return Ok((convo, new_entries));
134        }
135
136        // No new entries — check for rotation
137        if self.follow_rotation()? {
138            return self.poll_with_full();
139        }
140
141        Ok((convo, new_entries))
142    }
143
144    /// Resets the watcher, clearing all seen UUIDs and rotation state.
145    ///
146    /// The next poll will return all entries as if it were the first call.
147    /// Does **not** revert `session_id` — if a rotation was already followed,
148    /// the watcher stays on the current (latest) session.
149    pub fn reset(&mut self) {
150        self.seen_uuids.clear();
151        self.successor_checked = false;
152        self.pending_rotations.clear();
153    }
154
155    /// Pre-marks entries as seen without returning them.
156    ///
157    /// Useful for initializing the watcher to only return future entries.
158    pub fn mark_seen(&mut self, entries: &[ConversationEntry]) {
159        for entry in entries {
160            self.seen_uuids.insert(entry.uuid.clone());
161        }
162    }
163
164    /// Skips existing entries - next poll will only return new entries.
165    pub fn skip_existing(&mut self) -> Result<usize> {
166        let convo = self.manager.read_segment(&self.project, &self.session_id)?;
167        let count = convo.entries.len();
168        for entry in &convo.entries {
169            self.seen_uuids.insert(entry.uuid.clone());
170        }
171        Ok(count)
172    }
173
174    /// Returns all rotations detected during the last `poll()`, consuming
175    /// them. Each entry is `(from_session, to_session)`. Multi-hop chains
176    /// produce multiple entries in traversal order.
177    pub fn take_pending_rotations(&mut self) -> Vec<(String, String)> {
178        std::mem::take(&mut self.pending_rotations)
179    }
180
181    /// Check for and follow a session rotation. Returns `true` if a
182    /// successor was found and the watcher switched to it.
183    fn follow_rotation(&mut self) -> Result<bool> {
184        if self.successor_checked {
185            return Ok(false);
186        }
187        self.successor_checked = true;
188
189        self.chain_index
190            .refresh(self.manager.resolver(), &self.project)?;
191
192        if let Some(successor) = self.chain_index.successor_of(&self.session_id) {
193            let successor = successor.to_string();
194            let old_id = self.session_id.clone();
195            self.pending_rotations.push((old_id, successor.clone()));
196            self.session_id = successor;
197            self.successor_checked = false;
198            return Ok(true);
199        }
200
201        Ok(false)
202    }
203
204    fn extract_new_entries(&mut self, convo: &Conversation) -> Result<Vec<ConversationEntry>> {
205        let mut new_entries = Vec::new();
206
207        for entry in &convo.entries {
208            if self.seen_uuids.contains(&entry.uuid) {
209                continue;
210            }
211
212            // Skip bridge entries — they link sessions, not content
213            if chain::is_bridge_entry(entry, &self.session_id) {
214                self.seen_uuids.insert(entry.uuid.clone());
215                continue;
216            }
217
218            // Apply role filter if set
219            if let Some(role_filter) = self.role_filter {
220                if let Some(msg) = &entry.message {
221                    if msg.role != role_filter {
222                        self.seen_uuids.insert(entry.uuid.clone());
223                        continue;
224                    }
225                } else {
226                    self.seen_uuids.insert(entry.uuid.clone());
227                    continue;
228                }
229            }
230
231            new_entries.push(entry.clone());
232            self.seen_uuids.insert(entry.uuid.clone());
233        }
234
235        Ok(new_entries)
236    }
237}
238
239#[cfg(test)]
240mod tests {
241    use super::*;
242    use crate::PathResolver;
243    use std::fs;
244    use tempfile::TempDir;
245
246    fn create_test_jsonl(dir: &std::path::Path, session_id: &str, entries: &[&str]) {
247        let project_dir = dir.join("projects/-test-project");
248        fs::create_dir_all(&project_dir).unwrap();
249        let file_path = project_dir.join(format!("{}.jsonl", session_id));
250        fs::write(&file_path, entries.join("\n")).unwrap();
251    }
252
253    #[test]
254    fn test_watcher_tracks_seen() {
255        let temp = TempDir::new().unwrap();
256        let claude_dir = temp.path().join(".claude");
257
258        let entry1 = r#"{"uuid":"uuid-1","type":"user","timestamp":"2024-01-01T00:00:00Z","message":{"role":"user","content":"Hello"}}"#;
259        let entry2 = r#"{"uuid":"uuid-2","type":"assistant","timestamp":"2024-01-01T00:00:01Z","message":{"role":"assistant","content":"Hi there"}}"#;
260
261        create_test_jsonl(&claude_dir, "session-1", &[entry1, entry2]);
262
263        let resolver = PathResolver::new().with_claude_dir(&claude_dir);
264        let manager = ClaudeConvo::with_resolver(resolver);
265
266        let mut watcher = ConversationWatcher::new(
267            manager,
268            "/test/project".to_string(),
269            "session-1".to_string(),
270        );
271
272        // First poll returns all entries
273        let entries = watcher.poll().unwrap();
274        assert_eq!(entries.len(), 2);
275        assert_eq!(watcher.seen_count(), 2);
276
277        // Second poll returns nothing (no new entries)
278        let entries = watcher.poll().unwrap();
279        assert_eq!(entries.len(), 0);
280    }
281
282    #[test]
283    fn test_watcher_skip_existing() {
284        let temp = TempDir::new().unwrap();
285        let claude_dir = temp.path().join(".claude");
286
287        let entry1 = r#"{"uuid":"uuid-1","type":"user","timestamp":"2024-01-01T00:00:00Z","message":{"role":"user","content":"Hello"}}"#;
288
289        create_test_jsonl(&claude_dir, "session-1", &[entry1]);
290
291        let resolver = PathResolver::new().with_claude_dir(&claude_dir);
292        let manager = ClaudeConvo::with_resolver(resolver);
293
294        let mut watcher = ConversationWatcher::new(
295            manager,
296            "/test/project".to_string(),
297            "session-1".to_string(),
298        );
299
300        // Skip existing
301        let skipped = watcher.skip_existing().unwrap();
302        assert_eq!(skipped, 1);
303
304        // Poll returns nothing
305        let entries = watcher.poll().unwrap();
306        assert_eq!(entries.len(), 0);
307    }
308
309    #[test]
310    fn test_watcher_accessors() {
311        let temp = TempDir::new().unwrap();
312        let claude_dir = temp.path().join(".claude");
313        create_test_jsonl(&claude_dir, "session-1", &[]);
314
315        let resolver = PathResolver::new().with_claude_dir(&claude_dir);
316        let manager = ClaudeConvo::with_resolver(resolver);
317
318        let watcher = ConversationWatcher::new(
319            manager,
320            "/test/project".to_string(),
321            "session-1".to_string(),
322        );
323
324        assert_eq!(watcher.project(), "/test/project");
325        assert_eq!(watcher.session_id(), "session-1");
326        assert_eq!(watcher.seen_count(), 0);
327    }
328
329    #[test]
330    fn test_watcher_reset() {
331        let temp = TempDir::new().unwrap();
332        let claude_dir = temp.path().join(".claude");
333
334        let entry1 = r#"{"uuid":"uuid-1","type":"user","timestamp":"2024-01-01T00:00:00Z","message":{"role":"user","content":"Hello"}}"#;
335        create_test_jsonl(&claude_dir, "session-1", &[entry1]);
336
337        let resolver = PathResolver::new().with_claude_dir(&claude_dir);
338        let manager = ClaudeConvo::with_resolver(resolver);
339
340        let mut watcher = ConversationWatcher::new(
341            manager,
342            "/test/project".to_string(),
343            "session-1".to_string(),
344        );
345
346        // First poll
347        let entries = watcher.poll().unwrap();
348        assert_eq!(entries.len(), 1);
349        assert_eq!(watcher.seen_count(), 1);
350
351        // Reset
352        watcher.reset();
353        assert_eq!(watcher.seen_count(), 0);
354
355        // Poll again should return entries
356        let entries = watcher.poll().unwrap();
357        assert_eq!(entries.len(), 1);
358    }
359
360    #[test]
361    fn test_watcher_mark_seen() {
362        let temp = TempDir::new().unwrap();
363        let claude_dir = temp.path().join(".claude");
364
365        let entry1 = r#"{"uuid":"uuid-1","type":"user","timestamp":"2024-01-01T00:00:00Z","message":{"role":"user","content":"Hello"}}"#;
366        let entry2 = r#"{"uuid":"uuid-2","type":"assistant","timestamp":"2024-01-01T00:00:01Z","message":{"role":"assistant","content":"Hi"}}"#;
367        create_test_jsonl(&claude_dir, "session-1", &[entry1, entry2]);
368
369        let resolver = PathResolver::new().with_claude_dir(&claude_dir);
370        let manager = ClaudeConvo::with_resolver(resolver);
371
372        let mut watcher = ConversationWatcher::new(
373            manager,
374            "/test/project".to_string(),
375            "session-1".to_string(),
376        );
377
378        // Read conversation to get entries
379        let convo = watcher.poll().unwrap();
380        watcher.reset();
381
382        // Mark first entry as seen
383        watcher.mark_seen(&convo[..1]);
384        assert_eq!(watcher.seen_count(), 1);
385
386        // Poll should return only the second entry
387        let entries = watcher.poll().unwrap();
388        assert_eq!(entries.len(), 1);
389        assert_eq!(entries[0].uuid, "uuid-2");
390    }
391
392    #[test]
393    fn test_watcher_with_role_filter() {
394        let temp = TempDir::new().unwrap();
395        let claude_dir = temp.path().join(".claude");
396
397        let entry1 = r#"{"uuid":"uuid-1","type":"user","timestamp":"2024-01-01T00:00:00Z","message":{"role":"user","content":"Hello"}}"#;
398        let entry2 = r#"{"uuid":"uuid-2","type":"assistant","timestamp":"2024-01-01T00:00:01Z","message":{"role":"assistant","content":"Hi"}}"#;
399        create_test_jsonl(&claude_dir, "session-1", &[entry1, entry2]);
400
401        let resolver = PathResolver::new().with_claude_dir(&claude_dir);
402        let manager = ClaudeConvo::with_resolver(resolver);
403
404        let mut watcher = ConversationWatcher::new(
405            manager,
406            "/test/project".to_string(),
407            "session-1".to_string(),
408        )
409        .with_role_filter(MessageRole::User);
410
411        let entries = watcher.poll().unwrap();
412        assert_eq!(entries.len(), 1);
413        assert_eq!(entries[0].uuid, "uuid-1");
414        // Both entries should be marked as seen (the assistant one was filtered but still seen)
415        assert_eq!(watcher.seen_count(), 2);
416    }
417
418    #[test]
419    fn test_watcher_follows_rotation() {
420        let temp = TempDir::new().unwrap();
421        let claude_dir = temp.path().join(".claude");
422        let project_dir = claude_dir.join("projects/-test-project");
423        fs::create_dir_all(&project_dir).unwrap();
424
425        // Session A: original conversation
426        let entry_a = r#"{"uuid":"a1","type":"user","timestamp":"2024-01-01T00:00:00Z","sessionId":"session-a","message":{"role":"user","content":"Hello"}}"#;
427        fs::write(
428            project_dir.join("session-a.jsonl"),
429            format!("{}\n", entry_a),
430        )
431        .unwrap();
432
433        let resolver = PathResolver::new().with_claude_dir(&claude_dir);
434        let manager = ClaudeConvo::with_resolver(resolver);
435
436        let mut watcher = ConversationWatcher::new(
437            manager,
438            "/test/project".to_string(),
439            "session-a".to_string(),
440        );
441
442        // First poll: get entry from session-a
443        let entries = watcher.poll().unwrap();
444        assert_eq!(entries.len(), 1);
445        assert_eq!(entries[0].uuid, "a1");
446        assert_eq!(watcher.session_id(), "session-a");
447
448        // Now create session-b with a bridge entry pointing to session-a
449        let entries_b = vec![
450            r#"{"uuid":"b0","type":"user","timestamp":"2024-01-01T01:00:00Z","sessionId":"session-a","message":{"role":"user","content":"Bridge"}}"#,
451            r#"{"uuid":"b1","type":"user","timestamp":"2024-01-01T01:00:01Z","sessionId":"session-b","message":{"role":"user","content":"New content"}}"#,
452        ];
453        fs::write(project_dir.join("session-b.jsonl"), entries_b.join("\n")).unwrap();
454
455        // Second poll: should auto-follow to session-b
456        let entries = watcher.poll().unwrap();
457        assert_eq!(watcher.session_id(), "session-b");
458
459        // Should get only b1 — bridge entry b0 is filtered out
460        assert_eq!(entries.len(), 1);
461        assert_eq!(entries[0].uuid, "b1");
462        assert_eq!(entries[0].text(), "New content");
463    }
464
465    #[test]
466    fn test_watcher_follows_rotation_with_full() {
467        let temp = TempDir::new().unwrap();
468        let claude_dir = temp.path().join(".claude");
469        let project_dir = claude_dir.join("projects/-test-project");
470        fs::create_dir_all(&project_dir).unwrap();
471
472        let entry_a = r#"{"uuid":"a1","type":"user","timestamp":"2024-01-01T00:00:00Z","sessionId":"session-a","message":{"role":"user","content":"Hello"}}"#;
473        fs::write(
474            project_dir.join("session-a.jsonl"),
475            format!("{}\n", entry_a),
476        )
477        .unwrap();
478
479        let resolver = PathResolver::new().with_claude_dir(&claude_dir);
480        let manager = ClaudeConvo::with_resolver(resolver);
481
482        let mut watcher = ConversationWatcher::new(
483            manager,
484            "/test/project".to_string(),
485            "session-a".to_string(),
486        );
487
488        // Consume session-a via poll_with_full
489        let (convo, new_entries) = watcher.poll_with_full().unwrap();
490        assert_eq!(new_entries.len(), 1);
491        assert_eq!(convo.session_id, "session-a");
492
493        // Create successor
494        let entries_b = vec![
495            r#"{"uuid":"b0","type":"user","timestamp":"2024-01-01T01:00:00Z","sessionId":"session-a","message":{"role":"user","content":"Bridge"}}"#,
496            r#"{"uuid":"b1","type":"assistant","timestamp":"2024-01-01T01:00:01Z","sessionId":"session-b","message":{"role":"assistant","content":"Continued"}}"#,
497        ];
498        fs::write(project_dir.join("session-b.jsonl"), entries_b.join("\n")).unwrap();
499
500        // poll_with_full follows rotation too
501        let (convo2, new_entries2) = watcher.poll_with_full().unwrap();
502        assert_eq!(watcher.session_id(), "session-b");
503        assert_eq!(convo2.session_id, "session-b");
504        // Only b1 — bridge b0 filtered
505        assert_eq!(new_entries2.len(), 1);
506        assert_eq!(new_entries2[0].uuid, "b1");
507    }
508
509    #[test]
510    fn test_watcher_reset_clears_rotation_state() {
511        let temp = TempDir::new().unwrap();
512        let claude_dir = temp.path().join(".claude");
513        let project_dir = claude_dir.join("projects/-test-project");
514        fs::create_dir_all(&project_dir).unwrap();
515
516        let entry_a = r#"{"uuid":"a1","type":"user","timestamp":"2024-01-01T00:00:00Z","sessionId":"session-a","message":{"role":"user","content":"Hello"}}"#;
517        fs::write(
518            project_dir.join("session-a.jsonl"),
519            format!("{}\n", entry_a),
520        )
521        .unwrap();
522
523        // Session B (successor of A)
524        let entries_b = vec![
525            r#"{"uuid":"b0","type":"user","timestamp":"2024-01-01T01:00:00Z","sessionId":"session-a","message":{"role":"user","content":"Bridge"}}"#,
526            r#"{"uuid":"b1","type":"user","timestamp":"2024-01-01T01:00:01Z","sessionId":"session-b","message":{"role":"user","content":"New"}}"#,
527        ];
528        fs::write(project_dir.join("session-b.jsonl"), entries_b.join("\n")).unwrap();
529
530        let resolver = PathResolver::new().with_claude_dir(&claude_dir);
531        let manager = ClaudeConvo::with_resolver(resolver);
532
533        let mut watcher = ConversationWatcher::new(
534            manager,
535            "/test/project".to_string(),
536            "session-a".to_string(),
537        );
538
539        // Consume everything (a1, then follow to b1)
540        let entries = watcher.poll().unwrap();
541        assert_eq!(entries.len(), 1); // a1
542        let entries = watcher.poll().unwrap();
543        assert_eq!(entries.len(), 1); // b1
544        assert_eq!(watcher.session_id(), "session-b");
545
546        // Reset: seen UUIDs and rotation flags cleared
547        watcher.reset();
548        assert_eq!(watcher.seen_count(), 0);
549
550        // Re-poll: should replay entries from session-b (current session)
551        let entries = watcher.poll().unwrap();
552        // b0 is a bridge (session_id != session-b), so only b1
553        assert_eq!(entries.len(), 1);
554        assert_eq!(entries[0].uuid, "b1");
555    }
556
557    #[test]
558    fn test_watcher_poll_with_full() {
559        let temp = TempDir::new().unwrap();
560        let claude_dir = temp.path().join(".claude");
561
562        let entry1 = r#"{"uuid":"uuid-1","type":"user","timestamp":"2024-01-01T00:00:00Z","message":{"role":"user","content":"Hello"}}"#;
563        create_test_jsonl(&claude_dir, "session-1", &[entry1]);
564
565        let resolver = PathResolver::new().with_claude_dir(&claude_dir);
566        let manager = ClaudeConvo::with_resolver(resolver);
567
568        let mut watcher = ConversationWatcher::new(
569            manager,
570            "/test/project".to_string(),
571            "session-1".to_string(),
572        );
573
574        let (convo, new_entries) = watcher.poll_with_full().unwrap();
575        assert_eq!(convo.entries.len(), 1);
576        assert_eq!(new_entries.len(), 1);
577
578        // Second call should return full convo but no new entries
579        let (convo2, new_entries2) = watcher.poll_with_full().unwrap();
580        assert_eq!(convo2.entries.len(), 1);
581        assert!(new_entries2.is_empty());
582    }
583}