use crate::ClaudeConvo;
use crate::chain;
use crate::error::Result;
use crate::types::{Conversation, ConversationEntry, MessageRole};
use std::collections::HashSet;
#[derive(Debug)]
pub struct ConversationWatcher {
manager: ClaudeConvo,
project: String,
session_id: String,
seen_uuids: HashSet<String>,
role_filter: Option<MessageRole>,
successor_checked: bool,
pending_rotations: Vec<(String, String)>,
chain_index: chain::ChainIndex,
}
impl ConversationWatcher {
pub fn new(manager: ClaudeConvo, project: String, session_id: String) -> Self {
Self {
manager,
project,
session_id,
seen_uuids: HashSet::new(),
role_filter: None,
successor_checked: false,
pending_rotations: Vec::new(),
chain_index: chain::ChainIndex::new(),
}
}
pub fn with_role_filter(mut self, role: MessageRole) -> Self {
self.role_filter = Some(role);
self
}
pub fn project(&self) -> &str {
&self.project
}
pub fn session_id(&self) -> &str {
&self.session_id
}
pub fn seen_count(&self) -> usize {
self.seen_uuids.len()
}
pub fn poll(&mut self) -> Result<Vec<ConversationEntry>> {
let convo = self.manager.read_segment(&self.project, &self.session_id)?;
let new_entries = self.extract_new_entries(&convo)?;
if !new_entries.is_empty() {
self.successor_checked = false;
return Ok(new_entries);
}
if self.follow_rotation()? {
return self.poll();
}
Ok(new_entries)
}
pub fn poll_with_full(&mut self) -> Result<(Conversation, Vec<ConversationEntry>)> {
let convo = self.manager.read_segment(&self.project, &self.session_id)?;
let new_entries = self.extract_new_entries(&convo)?;
if !new_entries.is_empty() {
self.successor_checked = false;
return Ok((convo, new_entries));
}
if self.follow_rotation()? {
return self.poll_with_full();
}
Ok((convo, new_entries))
}
pub fn reset(&mut self) {
self.seen_uuids.clear();
self.successor_checked = false;
self.pending_rotations.clear();
}
pub fn mark_seen(&mut self, entries: &[ConversationEntry]) {
for entry in entries {
self.seen_uuids.insert(entry.uuid.clone());
}
}
pub fn skip_existing(&mut self) -> Result<usize> {
let convo = self.manager.read_segment(&self.project, &self.session_id)?;
let count = convo.entries.len();
for entry in &convo.entries {
self.seen_uuids.insert(entry.uuid.clone());
}
Ok(count)
}
pub fn take_pending_rotations(&mut self) -> Vec<(String, String)> {
std::mem::take(&mut self.pending_rotations)
}
fn follow_rotation(&mut self) -> Result<bool> {
if self.successor_checked {
return Ok(false);
}
self.successor_checked = true;
self.chain_index
.refresh(self.manager.resolver(), &self.project)?;
if let Some(successor) = self.chain_index.successor_of(&self.session_id) {
let successor = successor.to_string();
let old_id = self.session_id.clone();
self.pending_rotations.push((old_id, successor.clone()));
self.session_id = successor;
self.successor_checked = false;
return Ok(true);
}
Ok(false)
}
fn extract_new_entries(&mut self, convo: &Conversation) -> Result<Vec<ConversationEntry>> {
let mut new_entries = Vec::new();
for entry in &convo.entries {
if self.seen_uuids.contains(&entry.uuid) {
continue;
}
if chain::is_bridge_entry(entry, &self.session_id) {
self.seen_uuids.insert(entry.uuid.clone());
continue;
}
if let Some(role_filter) = self.role_filter {
if let Some(msg) = &entry.message {
if msg.role != role_filter {
self.seen_uuids.insert(entry.uuid.clone());
continue;
}
} else {
self.seen_uuids.insert(entry.uuid.clone());
continue;
}
}
new_entries.push(entry.clone());
self.seen_uuids.insert(entry.uuid.clone());
}
Ok(new_entries)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::PathResolver;
use std::fs;
use tempfile::TempDir;
fn create_test_jsonl(dir: &std::path::Path, session_id: &str, entries: &[&str]) {
let project_dir = dir.join("projects/-test-project");
fs::create_dir_all(&project_dir).unwrap();
let file_path = project_dir.join(format!("{}.jsonl", session_id));
fs::write(&file_path, entries.join("\n")).unwrap();
}
#[test]
fn test_watcher_tracks_seen() {
let temp = TempDir::new().unwrap();
let claude_dir = temp.path().join(".claude");
let entry1 = r#"{"uuid":"uuid-1","type":"user","timestamp":"2024-01-01T00:00:00Z","message":{"role":"user","content":"Hello"}}"#;
let entry2 = r#"{"uuid":"uuid-2","type":"assistant","timestamp":"2024-01-01T00:00:01Z","message":{"role":"assistant","content":"Hi there"}}"#;
create_test_jsonl(&claude_dir, "session-1", &[entry1, entry2]);
let resolver = PathResolver::new().with_claude_dir(&claude_dir);
let manager = ClaudeConvo::with_resolver(resolver);
let mut watcher = ConversationWatcher::new(
manager,
"/test/project".to_string(),
"session-1".to_string(),
);
let entries = watcher.poll().unwrap();
assert_eq!(entries.len(), 2);
assert_eq!(watcher.seen_count(), 2);
let entries = watcher.poll().unwrap();
assert_eq!(entries.len(), 0);
}
#[test]
fn test_watcher_skip_existing() {
let temp = TempDir::new().unwrap();
let claude_dir = temp.path().join(".claude");
let entry1 = r#"{"uuid":"uuid-1","type":"user","timestamp":"2024-01-01T00:00:00Z","message":{"role":"user","content":"Hello"}}"#;
create_test_jsonl(&claude_dir, "session-1", &[entry1]);
let resolver = PathResolver::new().with_claude_dir(&claude_dir);
let manager = ClaudeConvo::with_resolver(resolver);
let mut watcher = ConversationWatcher::new(
manager,
"/test/project".to_string(),
"session-1".to_string(),
);
let skipped = watcher.skip_existing().unwrap();
assert_eq!(skipped, 1);
let entries = watcher.poll().unwrap();
assert_eq!(entries.len(), 0);
}
#[test]
fn test_watcher_accessors() {
let temp = TempDir::new().unwrap();
let claude_dir = temp.path().join(".claude");
create_test_jsonl(&claude_dir, "session-1", &[]);
let resolver = PathResolver::new().with_claude_dir(&claude_dir);
let manager = ClaudeConvo::with_resolver(resolver);
let watcher = ConversationWatcher::new(
manager,
"/test/project".to_string(),
"session-1".to_string(),
);
assert_eq!(watcher.project(), "/test/project");
assert_eq!(watcher.session_id(), "session-1");
assert_eq!(watcher.seen_count(), 0);
}
#[test]
fn test_watcher_reset() {
let temp = TempDir::new().unwrap();
let claude_dir = temp.path().join(".claude");
let entry1 = r#"{"uuid":"uuid-1","type":"user","timestamp":"2024-01-01T00:00:00Z","message":{"role":"user","content":"Hello"}}"#;
create_test_jsonl(&claude_dir, "session-1", &[entry1]);
let resolver = PathResolver::new().with_claude_dir(&claude_dir);
let manager = ClaudeConvo::with_resolver(resolver);
let mut watcher = ConversationWatcher::new(
manager,
"/test/project".to_string(),
"session-1".to_string(),
);
let entries = watcher.poll().unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(watcher.seen_count(), 1);
watcher.reset();
assert_eq!(watcher.seen_count(), 0);
let entries = watcher.poll().unwrap();
assert_eq!(entries.len(), 1);
}
#[test]
fn test_watcher_mark_seen() {
let temp = TempDir::new().unwrap();
let claude_dir = temp.path().join(".claude");
let entry1 = r#"{"uuid":"uuid-1","type":"user","timestamp":"2024-01-01T00:00:00Z","message":{"role":"user","content":"Hello"}}"#;
let entry2 = r#"{"uuid":"uuid-2","type":"assistant","timestamp":"2024-01-01T00:00:01Z","message":{"role":"assistant","content":"Hi"}}"#;
create_test_jsonl(&claude_dir, "session-1", &[entry1, entry2]);
let resolver = PathResolver::new().with_claude_dir(&claude_dir);
let manager = ClaudeConvo::with_resolver(resolver);
let mut watcher = ConversationWatcher::new(
manager,
"/test/project".to_string(),
"session-1".to_string(),
);
let convo = watcher.poll().unwrap();
watcher.reset();
watcher.mark_seen(&convo[..1]);
assert_eq!(watcher.seen_count(), 1);
let entries = watcher.poll().unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].uuid, "uuid-2");
}
#[test]
fn test_watcher_with_role_filter() {
let temp = TempDir::new().unwrap();
let claude_dir = temp.path().join(".claude");
let entry1 = r#"{"uuid":"uuid-1","type":"user","timestamp":"2024-01-01T00:00:00Z","message":{"role":"user","content":"Hello"}}"#;
let entry2 = r#"{"uuid":"uuid-2","type":"assistant","timestamp":"2024-01-01T00:00:01Z","message":{"role":"assistant","content":"Hi"}}"#;
create_test_jsonl(&claude_dir, "session-1", &[entry1, entry2]);
let resolver = PathResolver::new().with_claude_dir(&claude_dir);
let manager = ClaudeConvo::with_resolver(resolver);
let mut watcher = ConversationWatcher::new(
manager,
"/test/project".to_string(),
"session-1".to_string(),
)
.with_role_filter(MessageRole::User);
let entries = watcher.poll().unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].uuid, "uuid-1");
assert_eq!(watcher.seen_count(), 2);
}
#[test]
fn test_watcher_follows_rotation() {
let temp = TempDir::new().unwrap();
let claude_dir = temp.path().join(".claude");
let project_dir = claude_dir.join("projects/-test-project");
fs::create_dir_all(&project_dir).unwrap();
let entry_a = r#"{"uuid":"a1","type":"user","timestamp":"2024-01-01T00:00:00Z","sessionId":"session-a","message":{"role":"user","content":"Hello"}}"#;
fs::write(
project_dir.join("session-a.jsonl"),
format!("{}\n", entry_a),
)
.unwrap();
let resolver = PathResolver::new().with_claude_dir(&claude_dir);
let manager = ClaudeConvo::with_resolver(resolver);
let mut watcher = ConversationWatcher::new(
manager,
"/test/project".to_string(),
"session-a".to_string(),
);
let entries = watcher.poll().unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].uuid, "a1");
assert_eq!(watcher.session_id(), "session-a");
let entries_b = [
r#"{"uuid":"b0","type":"user","timestamp":"2024-01-01T01:00:00Z","sessionId":"session-a","message":{"role":"user","content":"Bridge"}}"#,
r#"{"uuid":"b1","type":"user","timestamp":"2024-01-01T01:00:01Z","sessionId":"session-b","message":{"role":"user","content":"New content"}}"#,
];
fs::write(project_dir.join("session-b.jsonl"), entries_b.join("\n")).unwrap();
let entries = watcher.poll().unwrap();
assert_eq!(watcher.session_id(), "session-b");
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].uuid, "b1");
assert_eq!(entries[0].text(), "New content");
}
#[test]
fn test_watcher_follows_rotation_with_full() {
let temp = TempDir::new().unwrap();
let claude_dir = temp.path().join(".claude");
let project_dir = claude_dir.join("projects/-test-project");
fs::create_dir_all(&project_dir).unwrap();
let entry_a = r#"{"uuid":"a1","type":"user","timestamp":"2024-01-01T00:00:00Z","sessionId":"session-a","message":{"role":"user","content":"Hello"}}"#;
fs::write(
project_dir.join("session-a.jsonl"),
format!("{}\n", entry_a),
)
.unwrap();
let resolver = PathResolver::new().with_claude_dir(&claude_dir);
let manager = ClaudeConvo::with_resolver(resolver);
let mut watcher = ConversationWatcher::new(
manager,
"/test/project".to_string(),
"session-a".to_string(),
);
let (convo, new_entries) = watcher.poll_with_full().unwrap();
assert_eq!(new_entries.len(), 1);
assert_eq!(convo.session_id, "session-a");
let entries_b = [
r#"{"uuid":"b0","type":"user","timestamp":"2024-01-01T01:00:00Z","sessionId":"session-a","message":{"role":"user","content":"Bridge"}}"#,
r#"{"uuid":"b1","type":"assistant","timestamp":"2024-01-01T01:00:01Z","sessionId":"session-b","message":{"role":"assistant","content":"Continued"}}"#,
];
fs::write(project_dir.join("session-b.jsonl"), entries_b.join("\n")).unwrap();
let (convo2, new_entries2) = watcher.poll_with_full().unwrap();
assert_eq!(watcher.session_id(), "session-b");
assert_eq!(convo2.session_id, "session-b");
assert_eq!(new_entries2.len(), 1);
assert_eq!(new_entries2[0].uuid, "b1");
}
#[test]
fn test_watcher_reset_clears_rotation_state() {
let temp = TempDir::new().unwrap();
let claude_dir = temp.path().join(".claude");
let project_dir = claude_dir.join("projects/-test-project");
fs::create_dir_all(&project_dir).unwrap();
let entry_a = r#"{"uuid":"a1","type":"user","timestamp":"2024-01-01T00:00:00Z","sessionId":"session-a","message":{"role":"user","content":"Hello"}}"#;
fs::write(
project_dir.join("session-a.jsonl"),
format!("{}\n", entry_a),
)
.unwrap();
let entries_b = [
r#"{"uuid":"b0","type":"user","timestamp":"2024-01-01T01:00:00Z","sessionId":"session-a","message":{"role":"user","content":"Bridge"}}"#,
r#"{"uuid":"b1","type":"user","timestamp":"2024-01-01T01:00:01Z","sessionId":"session-b","message":{"role":"user","content":"New"}}"#,
];
fs::write(project_dir.join("session-b.jsonl"), entries_b.join("\n")).unwrap();
let resolver = PathResolver::new().with_claude_dir(&claude_dir);
let manager = ClaudeConvo::with_resolver(resolver);
let mut watcher = ConversationWatcher::new(
manager,
"/test/project".to_string(),
"session-a".to_string(),
);
let entries = watcher.poll().unwrap();
assert_eq!(entries.len(), 1); let entries = watcher.poll().unwrap();
assert_eq!(entries.len(), 1); assert_eq!(watcher.session_id(), "session-b");
watcher.reset();
assert_eq!(watcher.seen_count(), 0);
let entries = watcher.poll().unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].uuid, "b1");
}
#[test]
fn test_watcher_poll_with_full() {
let temp = TempDir::new().unwrap();
let claude_dir = temp.path().join(".claude");
let entry1 = r#"{"uuid":"uuid-1","type":"user","timestamp":"2024-01-01T00:00:00Z","message":{"role":"user","content":"Hello"}}"#;
create_test_jsonl(&claude_dir, "session-1", &[entry1]);
let resolver = PathResolver::new().with_claude_dir(&claude_dir);
let manager = ClaudeConvo::with_resolver(resolver);
let mut watcher = ConversationWatcher::new(
manager,
"/test/project".to_string(),
"session-1".to_string(),
);
let (convo, new_entries) = watcher.poll_with_full().unwrap();
assert_eq!(convo.entries.len(), 1);
assert_eq!(new_entries.len(), 1);
let (convo2, new_entries2) = watcher.poll_with_full().unwrap();
assert_eq!(convo2.entries.len(), 1);
assert!(new_entries2.is_empty());
}
}