Skip to main content

scud/
transcript_watcher.rs

1//! Background file watcher for Claude Code transcripts.
2//!
3//! Monitors `~/.claude/projects/<project>/` for new JSONL transcript files
4//! and imports them into the SQLite database in real-time during swarm execution.
5
6use anyhow::Result;
7use notify::{Event, RecursiveMode, Watcher};
8use std::collections::HashSet;
9use std::path::{Path, PathBuf};
10use std::sync::{Arc, Mutex};
11use std::time::Duration;
12
13use crate::commands::swarm::transcript::{find_claude_project_dir, parse_transcript};
14use crate::db::Database;
15
16pub struct TranscriptWatcher {
17    db: Arc<Database>,
18    project_root: PathBuf,
19    imported_sessions: Arc<Mutex<HashSet<String>>>,
20}
21
22impl TranscriptWatcher {
23    pub fn new(project_root: &Path, db: Arc<Database>) -> Self {
24        Self {
25            db,
26            project_root: project_root.to_path_buf(),
27            imported_sessions: Arc::new(Mutex::new(HashSet::new())),
28        }
29    }
30
31    /// Import all existing transcripts for the current project.
32    /// Returns the number of newly imported sessions.
33    pub fn import_all(
34        &self,
35        scud_session_id: Option<&str>,
36        task_id: Option<&str>,
37    ) -> Result<usize> {
38        let claude_dir = match find_claude_project_dir(&self.project_root) {
39            Some(dir) => dir,
40            None => return Ok(0),
41        };
42
43        let mut count = 0;
44
45        for entry in std::fs::read_dir(&claude_dir)? {
46            let path = entry?.path();
47            if path.extension().map(|e| e == "jsonl").unwrap_or(false) {
48                let session_id = path
49                    .file_stem()
50                    .and_then(|s| s.to_str())
51                    .unwrap_or("")
52                    .to_string();
53
54                // Skip if already imported this session
55                {
56                    let imported = self.imported_sessions.lock().unwrap();
57                    if imported.contains(&session_id) {
58                        continue;
59                    }
60                }
61
62                // Check if already in database
63                {
64                    let guard = self.db.connection()?;
65                    let conn = guard.as_ref().unwrap();
66                    let exists: i64 = conn.query_row(
67                        "SELECT COUNT(*) FROM transcript_messages WHERE claude_session_id = ?",
68                        [&session_id],
69                        |r| r.get(0),
70                    )?;
71                    if exists > 0 {
72                        self.imported_sessions.lock().unwrap().insert(session_id);
73                        continue;
74                    }
75                }
76
77                // Parse and import
78                if let Ok(transcript) = parse_transcript(&path) {
79                    let guard = self.db.connection()?;
80                    let conn = guard.as_ref().unwrap();
81                    crate::db::transcripts::insert_transcript(
82                        conn,
83                        &transcript,
84                        scud_session_id,
85                        task_id,
86                    )?;
87                    self.imported_sessions.lock().unwrap().insert(session_id);
88                    count += 1;
89                }
90            }
91        }
92        Ok(count)
93    }
94
95    /// Watch for new transcript files and import them as they appear.
96    /// This blocks the calling thread until the channel is closed.
97    pub fn watch(&self, scud_session_id: &str) -> Result<()> {
98        let claude_dir = match find_claude_project_dir(&self.project_root) {
99            Some(dir) => dir,
100            None => {
101                // No Claude project dir found - nothing to watch
102                return Ok(());
103            }
104        };
105
106        let (tx, rx) = std::sync::mpsc::channel();
107
108        let mut watcher = notify::recommended_watcher(move |res: Result<Event, notify::Error>| {
109            if let Ok(event) = res {
110                let _ = tx.send(event);
111            }
112        })?;
113
114        watcher.watch(&claude_dir, RecursiveMode::NonRecursive)?;
115
116        // Process file change events
117        while let Ok(event) = rx.recv() {
118            for path in event.paths {
119                if path.extension().map(|e| e == "jsonl").unwrap_or(false) {
120                    // Small delay to let the file finish writing
121                    std::thread::sleep(Duration::from_millis(500));
122                    let _ = self.import_file(&path, Some(scud_session_id), None);
123                }
124            }
125        }
126        Ok(())
127    }
128
129    fn import_file(
130        &self,
131        path: &Path,
132        scud_session_id: Option<&str>,
133        task_id: Option<&str>,
134    ) -> Result<()> {
135        let session_id = path
136            .file_stem()
137            .and_then(|s| s.to_str())
138            .unwrap_or("")
139            .to_string();
140
141        if let Ok(transcript) = parse_transcript(path) {
142            let guard = self.db.connection()?;
143            let conn = guard.as_ref().unwrap();
144            // Delete old records for this session before re-importing
145            // (transcript files are append-only, so we re-import the full file)
146            conn.execute(
147                "DELETE FROM transcript_messages WHERE claude_session_id = ?",
148                [&session_id],
149            )?;
150            crate::db::transcripts::insert_transcript(conn, &transcript, scud_session_id, task_id)?;
151            self.imported_sessions.lock().unwrap().insert(session_id);
152        }
153        Ok(())
154    }
155}