1use crate::ClaudeConvo;
7use crate::chain;
8use crate::error::Result;
9use crate::types::{Conversation, ConversationEntry, MessageRole};
10use std::collections::HashSet;
11
12#[derive(Debug)]
46pub struct ConversationWatcher {
47 manager: ClaudeConvo,
48 project: String,
49 session_id: String,
50 seen_uuids: HashSet<String>,
51 role_filter: Option<MessageRole>,
52 successor_checked: bool,
54 pending_rotations: Vec<(String, String)>,
57 chain_index: chain::ChainIndex,
59}
60
61impl ConversationWatcher {
62 pub fn new(manager: ClaudeConvo, project: String, session_id: String) -> Self {
64 Self {
65 manager,
66 project,
67 session_id,
68 seen_uuids: HashSet::new(),
69 role_filter: None,
70 successor_checked: false,
71 pending_rotations: Vec::new(),
72 chain_index: chain::ChainIndex::new(),
73 }
74 }
75
76 pub fn with_role_filter(mut self, role: MessageRole) -> Self {
78 self.role_filter = Some(role);
79 self
80 }
81
82 pub fn project(&self) -> &str {
84 &self.project
85 }
86
87 pub fn session_id(&self) -> &str {
89 &self.session_id
90 }
91
92 pub fn seen_count(&self) -> usize {
94 self.seen_uuids.len()
95 }
96
97 pub fn poll(&mut self) -> Result<Vec<ConversationEntry>> {
107 let convo = self.manager.read_segment(&self.project, &self.session_id)?;
108 let new_entries = self.extract_new_entries(&convo)?;
109
110 if !new_entries.is_empty() {
111 self.successor_checked = false;
112 return Ok(new_entries);
113 }
114
115 if self.follow_rotation()? {
117 return self.poll();
118 }
119
120 Ok(new_entries)
121 }
122
123 pub fn poll_with_full(&mut self) -> Result<(Conversation, Vec<ConversationEntry>)> {
128 let convo = self.manager.read_segment(&self.project, &self.session_id)?;
129 let new_entries = self.extract_new_entries(&convo)?;
130
131 if !new_entries.is_empty() {
132 self.successor_checked = false;
133 return Ok((convo, new_entries));
134 }
135
136 if self.follow_rotation()? {
138 return self.poll_with_full();
139 }
140
141 Ok((convo, new_entries))
142 }
143
144 pub fn reset(&mut self) {
150 self.seen_uuids.clear();
151 self.successor_checked = false;
152 self.pending_rotations.clear();
153 }
154
155 pub fn mark_seen(&mut self, entries: &[ConversationEntry]) {
159 for entry in entries {
160 self.seen_uuids.insert(entry.uuid.clone());
161 }
162 }
163
164 pub fn skip_existing(&mut self) -> Result<usize> {
166 let convo = self.manager.read_segment(&self.project, &self.session_id)?;
167 let count = convo.entries.len();
168 for entry in &convo.entries {
169 self.seen_uuids.insert(entry.uuid.clone());
170 }
171 Ok(count)
172 }
173
174 pub fn take_pending_rotations(&mut self) -> Vec<(String, String)> {
178 std::mem::take(&mut self.pending_rotations)
179 }
180
181 fn follow_rotation(&mut self) -> Result<bool> {
184 if self.successor_checked {
185 return Ok(false);
186 }
187 self.successor_checked = true;
188
189 self.chain_index
190 .refresh(self.manager.resolver(), &self.project)?;
191
192 if let Some(successor) = self.chain_index.successor_of(&self.session_id) {
193 let successor = successor.to_string();
194 let old_id = self.session_id.clone();
195 self.pending_rotations.push((old_id, successor.clone()));
196 self.session_id = successor;
197 self.successor_checked = false;
198 return Ok(true);
199 }
200
201 Ok(false)
202 }
203
204 fn extract_new_entries(&mut self, convo: &Conversation) -> Result<Vec<ConversationEntry>> {
205 let mut new_entries = Vec::new();
206
207 for entry in &convo.entries {
208 if self.seen_uuids.contains(&entry.uuid) {
209 continue;
210 }
211
212 if chain::is_bridge_entry(entry, &self.session_id) {
214 self.seen_uuids.insert(entry.uuid.clone());
215 continue;
216 }
217
218 if let Some(role_filter) = self.role_filter {
220 if let Some(msg) = &entry.message {
221 if msg.role != role_filter {
222 self.seen_uuids.insert(entry.uuid.clone());
223 continue;
224 }
225 } else {
226 self.seen_uuids.insert(entry.uuid.clone());
227 continue;
228 }
229 }
230
231 new_entries.push(entry.clone());
232 self.seen_uuids.insert(entry.uuid.clone());
233 }
234
235 Ok(new_entries)
236 }
237}
238
239#[cfg(test)]
240mod tests {
241 use super::*;
242 use crate::PathResolver;
243 use std::fs;
244 use tempfile::TempDir;
245
246 fn create_test_jsonl(dir: &std::path::Path, session_id: &str, entries: &[&str]) {
247 let project_dir = dir.join("projects/-test-project");
248 fs::create_dir_all(&project_dir).unwrap();
249 let file_path = project_dir.join(format!("{}.jsonl", session_id));
250 fs::write(&file_path, entries.join("\n")).unwrap();
251 }
252
253 #[test]
254 fn test_watcher_tracks_seen() {
255 let temp = TempDir::new().unwrap();
256 let claude_dir = temp.path().join(".claude");
257
258 let entry1 = r#"{"uuid":"uuid-1","type":"user","timestamp":"2024-01-01T00:00:00Z","message":{"role":"user","content":"Hello"}}"#;
259 let entry2 = r#"{"uuid":"uuid-2","type":"assistant","timestamp":"2024-01-01T00:00:01Z","message":{"role":"assistant","content":"Hi there"}}"#;
260
261 create_test_jsonl(&claude_dir, "session-1", &[entry1, entry2]);
262
263 let resolver = PathResolver::new().with_claude_dir(&claude_dir);
264 let manager = ClaudeConvo::with_resolver(resolver);
265
266 let mut watcher = ConversationWatcher::new(
267 manager,
268 "/test/project".to_string(),
269 "session-1".to_string(),
270 );
271
272 let entries = watcher.poll().unwrap();
274 assert_eq!(entries.len(), 2);
275 assert_eq!(watcher.seen_count(), 2);
276
277 let entries = watcher.poll().unwrap();
279 assert_eq!(entries.len(), 0);
280 }
281
282 #[test]
283 fn test_watcher_skip_existing() {
284 let temp = TempDir::new().unwrap();
285 let claude_dir = temp.path().join(".claude");
286
287 let entry1 = r#"{"uuid":"uuid-1","type":"user","timestamp":"2024-01-01T00:00:00Z","message":{"role":"user","content":"Hello"}}"#;
288
289 create_test_jsonl(&claude_dir, "session-1", &[entry1]);
290
291 let resolver = PathResolver::new().with_claude_dir(&claude_dir);
292 let manager = ClaudeConvo::with_resolver(resolver);
293
294 let mut watcher = ConversationWatcher::new(
295 manager,
296 "/test/project".to_string(),
297 "session-1".to_string(),
298 );
299
300 let skipped = watcher.skip_existing().unwrap();
302 assert_eq!(skipped, 1);
303
304 let entries = watcher.poll().unwrap();
306 assert_eq!(entries.len(), 0);
307 }
308
309 #[test]
310 fn test_watcher_accessors() {
311 let temp = TempDir::new().unwrap();
312 let claude_dir = temp.path().join(".claude");
313 create_test_jsonl(&claude_dir, "session-1", &[]);
314
315 let resolver = PathResolver::new().with_claude_dir(&claude_dir);
316 let manager = ClaudeConvo::with_resolver(resolver);
317
318 let watcher = ConversationWatcher::new(
319 manager,
320 "/test/project".to_string(),
321 "session-1".to_string(),
322 );
323
324 assert_eq!(watcher.project(), "/test/project");
325 assert_eq!(watcher.session_id(), "session-1");
326 assert_eq!(watcher.seen_count(), 0);
327 }
328
329 #[test]
330 fn test_watcher_reset() {
331 let temp = TempDir::new().unwrap();
332 let claude_dir = temp.path().join(".claude");
333
334 let entry1 = r#"{"uuid":"uuid-1","type":"user","timestamp":"2024-01-01T00:00:00Z","message":{"role":"user","content":"Hello"}}"#;
335 create_test_jsonl(&claude_dir, "session-1", &[entry1]);
336
337 let resolver = PathResolver::new().with_claude_dir(&claude_dir);
338 let manager = ClaudeConvo::with_resolver(resolver);
339
340 let mut watcher = ConversationWatcher::new(
341 manager,
342 "/test/project".to_string(),
343 "session-1".to_string(),
344 );
345
346 let entries = watcher.poll().unwrap();
348 assert_eq!(entries.len(), 1);
349 assert_eq!(watcher.seen_count(), 1);
350
351 watcher.reset();
353 assert_eq!(watcher.seen_count(), 0);
354
355 let entries = watcher.poll().unwrap();
357 assert_eq!(entries.len(), 1);
358 }
359
360 #[test]
361 fn test_watcher_mark_seen() {
362 let temp = TempDir::new().unwrap();
363 let claude_dir = temp.path().join(".claude");
364
365 let entry1 = r#"{"uuid":"uuid-1","type":"user","timestamp":"2024-01-01T00:00:00Z","message":{"role":"user","content":"Hello"}}"#;
366 let entry2 = r#"{"uuid":"uuid-2","type":"assistant","timestamp":"2024-01-01T00:00:01Z","message":{"role":"assistant","content":"Hi"}}"#;
367 create_test_jsonl(&claude_dir, "session-1", &[entry1, entry2]);
368
369 let resolver = PathResolver::new().with_claude_dir(&claude_dir);
370 let manager = ClaudeConvo::with_resolver(resolver);
371
372 let mut watcher = ConversationWatcher::new(
373 manager,
374 "/test/project".to_string(),
375 "session-1".to_string(),
376 );
377
378 let convo = watcher.poll().unwrap();
380 watcher.reset();
381
382 watcher.mark_seen(&convo[..1]);
384 assert_eq!(watcher.seen_count(), 1);
385
386 let entries = watcher.poll().unwrap();
388 assert_eq!(entries.len(), 1);
389 assert_eq!(entries[0].uuid, "uuid-2");
390 }
391
392 #[test]
393 fn test_watcher_with_role_filter() {
394 let temp = TempDir::new().unwrap();
395 let claude_dir = temp.path().join(".claude");
396
397 let entry1 = r#"{"uuid":"uuid-1","type":"user","timestamp":"2024-01-01T00:00:00Z","message":{"role":"user","content":"Hello"}}"#;
398 let entry2 = r#"{"uuid":"uuid-2","type":"assistant","timestamp":"2024-01-01T00:00:01Z","message":{"role":"assistant","content":"Hi"}}"#;
399 create_test_jsonl(&claude_dir, "session-1", &[entry1, entry2]);
400
401 let resolver = PathResolver::new().with_claude_dir(&claude_dir);
402 let manager = ClaudeConvo::with_resolver(resolver);
403
404 let mut watcher = ConversationWatcher::new(
405 manager,
406 "/test/project".to_string(),
407 "session-1".to_string(),
408 )
409 .with_role_filter(MessageRole::User);
410
411 let entries = watcher.poll().unwrap();
412 assert_eq!(entries.len(), 1);
413 assert_eq!(entries[0].uuid, "uuid-1");
414 assert_eq!(watcher.seen_count(), 2);
416 }
417
418 #[test]
419 fn test_watcher_follows_rotation() {
420 let temp = TempDir::new().unwrap();
421 let claude_dir = temp.path().join(".claude");
422 let project_dir = claude_dir.join("projects/-test-project");
423 fs::create_dir_all(&project_dir).unwrap();
424
425 let entry_a = r#"{"uuid":"a1","type":"user","timestamp":"2024-01-01T00:00:00Z","sessionId":"session-a","message":{"role":"user","content":"Hello"}}"#;
427 fs::write(
428 project_dir.join("session-a.jsonl"),
429 format!("{}\n", entry_a),
430 )
431 .unwrap();
432
433 let resolver = PathResolver::new().with_claude_dir(&claude_dir);
434 let manager = ClaudeConvo::with_resolver(resolver);
435
436 let mut watcher = ConversationWatcher::new(
437 manager,
438 "/test/project".to_string(),
439 "session-a".to_string(),
440 );
441
442 let entries = watcher.poll().unwrap();
444 assert_eq!(entries.len(), 1);
445 assert_eq!(entries[0].uuid, "a1");
446 assert_eq!(watcher.session_id(), "session-a");
447
448 let entries_b = vec![
450 r#"{"uuid":"b0","type":"user","timestamp":"2024-01-01T01:00:00Z","sessionId":"session-a","message":{"role":"user","content":"Bridge"}}"#,
451 r#"{"uuid":"b1","type":"user","timestamp":"2024-01-01T01:00:01Z","sessionId":"session-b","message":{"role":"user","content":"New content"}}"#,
452 ];
453 fs::write(project_dir.join("session-b.jsonl"), entries_b.join("\n")).unwrap();
454
455 let entries = watcher.poll().unwrap();
457 assert_eq!(watcher.session_id(), "session-b");
458
459 assert_eq!(entries.len(), 1);
461 assert_eq!(entries[0].uuid, "b1");
462 assert_eq!(entries[0].text(), "New content");
463 }
464
465 #[test]
466 fn test_watcher_follows_rotation_with_full() {
467 let temp = TempDir::new().unwrap();
468 let claude_dir = temp.path().join(".claude");
469 let project_dir = claude_dir.join("projects/-test-project");
470 fs::create_dir_all(&project_dir).unwrap();
471
472 let entry_a = r#"{"uuid":"a1","type":"user","timestamp":"2024-01-01T00:00:00Z","sessionId":"session-a","message":{"role":"user","content":"Hello"}}"#;
473 fs::write(
474 project_dir.join("session-a.jsonl"),
475 format!("{}\n", entry_a),
476 )
477 .unwrap();
478
479 let resolver = PathResolver::new().with_claude_dir(&claude_dir);
480 let manager = ClaudeConvo::with_resolver(resolver);
481
482 let mut watcher = ConversationWatcher::new(
483 manager,
484 "/test/project".to_string(),
485 "session-a".to_string(),
486 );
487
488 let (convo, new_entries) = watcher.poll_with_full().unwrap();
490 assert_eq!(new_entries.len(), 1);
491 assert_eq!(convo.session_id, "session-a");
492
493 let entries_b = vec![
495 r#"{"uuid":"b0","type":"user","timestamp":"2024-01-01T01:00:00Z","sessionId":"session-a","message":{"role":"user","content":"Bridge"}}"#,
496 r#"{"uuid":"b1","type":"assistant","timestamp":"2024-01-01T01:00:01Z","sessionId":"session-b","message":{"role":"assistant","content":"Continued"}}"#,
497 ];
498 fs::write(project_dir.join("session-b.jsonl"), entries_b.join("\n")).unwrap();
499
500 let (convo2, new_entries2) = watcher.poll_with_full().unwrap();
502 assert_eq!(watcher.session_id(), "session-b");
503 assert_eq!(convo2.session_id, "session-b");
504 assert_eq!(new_entries2.len(), 1);
506 assert_eq!(new_entries2[0].uuid, "b1");
507 }
508
509 #[test]
510 fn test_watcher_reset_clears_rotation_state() {
511 let temp = TempDir::new().unwrap();
512 let claude_dir = temp.path().join(".claude");
513 let project_dir = claude_dir.join("projects/-test-project");
514 fs::create_dir_all(&project_dir).unwrap();
515
516 let entry_a = r#"{"uuid":"a1","type":"user","timestamp":"2024-01-01T00:00:00Z","sessionId":"session-a","message":{"role":"user","content":"Hello"}}"#;
517 fs::write(
518 project_dir.join("session-a.jsonl"),
519 format!("{}\n", entry_a),
520 )
521 .unwrap();
522
523 let entries_b = vec![
525 r#"{"uuid":"b0","type":"user","timestamp":"2024-01-01T01:00:00Z","sessionId":"session-a","message":{"role":"user","content":"Bridge"}}"#,
526 r#"{"uuid":"b1","type":"user","timestamp":"2024-01-01T01:00:01Z","sessionId":"session-b","message":{"role":"user","content":"New"}}"#,
527 ];
528 fs::write(project_dir.join("session-b.jsonl"), entries_b.join("\n")).unwrap();
529
530 let resolver = PathResolver::new().with_claude_dir(&claude_dir);
531 let manager = ClaudeConvo::with_resolver(resolver);
532
533 let mut watcher = ConversationWatcher::new(
534 manager,
535 "/test/project".to_string(),
536 "session-a".to_string(),
537 );
538
539 let entries = watcher.poll().unwrap();
541 assert_eq!(entries.len(), 1); let entries = watcher.poll().unwrap();
543 assert_eq!(entries.len(), 1); assert_eq!(watcher.session_id(), "session-b");
545
546 watcher.reset();
548 assert_eq!(watcher.seen_count(), 0);
549
550 let entries = watcher.poll().unwrap();
552 assert_eq!(entries.len(), 1);
554 assert_eq!(entries[0].uuid, "b1");
555 }
556
557 #[test]
558 fn test_watcher_poll_with_full() {
559 let temp = TempDir::new().unwrap();
560 let claude_dir = temp.path().join(".claude");
561
562 let entry1 = r#"{"uuid":"uuid-1","type":"user","timestamp":"2024-01-01T00:00:00Z","message":{"role":"user","content":"Hello"}}"#;
563 create_test_jsonl(&claude_dir, "session-1", &[entry1]);
564
565 let resolver = PathResolver::new().with_claude_dir(&claude_dir);
566 let manager = ClaudeConvo::with_resolver(resolver);
567
568 let mut watcher = ConversationWatcher::new(
569 manager,
570 "/test/project".to_string(),
571 "session-1".to_string(),
572 );
573
574 let (convo, new_entries) = watcher.poll_with_full().unwrap();
575 assert_eq!(convo.entries.len(), 1);
576 assert_eq!(new_entries.len(), 1);
577
578 let (convo2, new_entries2) = watcher.poll_with_full().unwrap();
580 assert_eq!(convo2.entries.len(), 1);
581 assert!(new_entries2.is_empty());
582 }
583}