Skip to main content

scud/
transcript_watcher.rs

1//! Background file watcher for Claude Code transcripts.
2//!
3//! Monitors `~/.claude/projects/<project>/` for new JSONL transcript files
4//! and imports them into the SQLite database in real-time during swarm execution.
5
6use anyhow::Result;
7use notify::{Event, RecursiveMode, Watcher};
8use std::collections::HashSet;
9use std::path::{Path, PathBuf};
10use std::sync::{Arc, Mutex};
11use std::time::Duration;
12
13use crate::commands::swarm::transcript::{find_claude_project_dir, parse_transcript};
14use crate::db::Database;
15
16pub struct TranscriptWatcher {
17    db: Arc<Database>,
18    project_root: PathBuf,
19    imported_sessions: Arc<Mutex<HashSet<String>>>,
20}
21
22impl TranscriptWatcher {
23    pub fn new(project_root: &Path, db: Arc<Database>) -> Self {
24        Self {
25            db,
26            project_root: project_root.to_path_buf(),
27            imported_sessions: Arc::new(Mutex::new(HashSet::new())),
28        }
29    }
30
31    /// Import all existing transcripts for the current project.
32    /// Returns the number of newly imported sessions.
33    pub fn import_all(
34        &self,
35        scud_session_id: Option<&str>,
36        task_id: Option<&str>,
37    ) -> Result<usize> {
38        let claude_dir = match find_claude_project_dir(&self.project_root) {
39            Some(dir) => dir,
40            None => return Ok(0),
41        };
42
43        let mut count = 0;
44
45        for entry in std::fs::read_dir(&claude_dir)? {
46            let path = entry?.path();
47            if path.extension().map(|e| e == "jsonl").unwrap_or(false) {
48                let session_id = path
49                    .file_stem()
50                    .and_then(|s| s.to_str())
51                    .unwrap_or("")
52                    .to_string();
53
54                // Skip if already imported this session
55                {
56                    let imported = self.imported_sessions.lock().unwrap();
57                    if imported.contains(&session_id) {
58                        continue;
59                    }
60                }
61
62                // Check if already in database
63                {
64                    let guard = self.db.connection()?;
65                    let conn = guard.as_ref().unwrap();
66                    let exists: i64 = conn.query_row(
67                        "SELECT COUNT(*) FROM transcript_messages WHERE claude_session_id = ?",
68                        [&session_id],
69                        |r| r.get(0),
70                    )?;
71                    if exists > 0 {
72                        self.imported_sessions
73                            .lock()
74                            .unwrap()
75                            .insert(session_id);
76                        continue;
77                    }
78                }
79
80                // Parse and import
81                if let Ok(transcript) = parse_transcript(&path) {
82                    let guard = self.db.connection()?;
83                    let conn = guard.as_ref().unwrap();
84                    crate::db::transcripts::insert_transcript(
85                        conn,
86                        &transcript,
87                        scud_session_id,
88                        task_id,
89                    )?;
90                    self.imported_sessions
91                        .lock()
92                        .unwrap()
93                        .insert(session_id);
94                    count += 1;
95                }
96            }
97        }
98        Ok(count)
99    }
100
101    /// Watch for new transcript files and import them as they appear.
102    /// This blocks the calling thread until the channel is closed.
103    pub fn watch(&self, scud_session_id: &str) -> Result<()> {
104        let claude_dir = match find_claude_project_dir(&self.project_root) {
105            Some(dir) => dir,
106            None => {
107                // No Claude project dir found - nothing to watch
108                return Ok(());
109            }
110        };
111
112        let (tx, rx) = std::sync::mpsc::channel();
113
114        let mut watcher = notify::recommended_watcher(move |res: Result<Event, notify::Error>| {
115            if let Ok(event) = res {
116                let _ = tx.send(event);
117            }
118        })?;
119
120        watcher.watch(&claude_dir, RecursiveMode::NonRecursive)?;
121
122        // Process file change events
123        while let Ok(event) = rx.recv() {
124            for path in event.paths {
125                if path.extension().map(|e| e == "jsonl").unwrap_or(false) {
126                    // Small delay to let the file finish writing
127                    std::thread::sleep(Duration::from_millis(500));
128                    let _ = self.import_file(&path, Some(scud_session_id), None);
129                }
130            }
131        }
132        Ok(())
133    }
134
135    fn import_file(
136        &self,
137        path: &Path,
138        scud_session_id: Option<&str>,
139        task_id: Option<&str>,
140    ) -> Result<()> {
141        let session_id = path
142            .file_stem()
143            .and_then(|s| s.to_str())
144            .unwrap_or("")
145            .to_string();
146
147        if let Ok(transcript) = parse_transcript(path) {
148            let guard = self.db.connection()?;
149            let conn = guard.as_ref().unwrap();
150            // Delete old records for this session before re-importing
151            // (transcript files are append-only, so we re-import the full file)
152            conn.execute(
153                "DELETE FROM transcript_messages WHERE claude_session_id = ?",
154                [&session_id],
155            )?;
156            crate::db::transcripts::insert_transcript(
157                conn,
158                &transcript,
159                scud_session_id,
160                task_id,
161            )?;
162            self.imported_sessions
163                .lock()
164                .unwrap()
165                .insert(session_id);
166        }
167        Ok(())
168    }
169}