scud/
transcript_watcher.rs1use anyhow::Result;
7use notify::{Event, RecursiveMode, Watcher};
8use std::collections::HashSet;
9use std::path::{Path, PathBuf};
10use std::sync::{Arc, Mutex};
11use std::time::Duration;
12
13use crate::commands::swarm::transcript::{find_claude_project_dir, parse_transcript};
14use crate::db::Database;
15
16pub struct TranscriptWatcher {
17 db: Arc<Database>,
18 project_root: PathBuf,
19 imported_sessions: Arc<Mutex<HashSet<String>>>,
20}
21
22impl TranscriptWatcher {
23 pub fn new(project_root: &Path, db: Arc<Database>) -> Self {
24 Self {
25 db,
26 project_root: project_root.to_path_buf(),
27 imported_sessions: Arc::new(Mutex::new(HashSet::new())),
28 }
29 }
30
31 pub fn import_all(
34 &self,
35 scud_session_id: Option<&str>,
36 task_id: Option<&str>,
37 ) -> Result<usize> {
38 let claude_dir = match find_claude_project_dir(&self.project_root) {
39 Some(dir) => dir,
40 None => return Ok(0),
41 };
42
43 let mut count = 0;
44
45 for entry in std::fs::read_dir(&claude_dir)? {
46 let path = entry?.path();
47 if path.extension().map(|e| e == "jsonl").unwrap_or(false) {
48 let session_id = path
49 .file_stem()
50 .and_then(|s| s.to_str())
51 .unwrap_or("")
52 .to_string();
53
54 {
56 let imported = self.imported_sessions.lock().unwrap();
57 if imported.contains(&session_id) {
58 continue;
59 }
60 }
61
62 {
64 let guard = self.db.connection()?;
65 let conn = guard.as_ref().unwrap();
66 let exists: i64 = conn.query_row(
67 "SELECT COUNT(*) FROM transcript_messages WHERE claude_session_id = ?",
68 [&session_id],
69 |r| r.get(0),
70 )?;
71 if exists > 0 {
72 self.imported_sessions.lock().unwrap().insert(session_id);
73 continue;
74 }
75 }
76
77 if let Ok(transcript) = parse_transcript(&path) {
79 let guard = self.db.connection()?;
80 let conn = guard.as_ref().unwrap();
81 crate::db::transcripts::insert_transcript(
82 conn,
83 &transcript,
84 scud_session_id,
85 task_id,
86 )?;
87 self.imported_sessions.lock().unwrap().insert(session_id);
88 count += 1;
89 }
90 }
91 }
92 Ok(count)
93 }
94
95 pub fn watch(&self, scud_session_id: &str) -> Result<()> {
98 let claude_dir = match find_claude_project_dir(&self.project_root) {
99 Some(dir) => dir,
100 None => {
101 return Ok(());
103 }
104 };
105
106 let (tx, rx) = std::sync::mpsc::channel();
107
108 let mut watcher = notify::recommended_watcher(move |res: Result<Event, notify::Error>| {
109 if let Ok(event) = res {
110 let _ = tx.send(event);
111 }
112 })?;
113
114 watcher.watch(&claude_dir, RecursiveMode::NonRecursive)?;
115
116 while let Ok(event) = rx.recv() {
118 for path in event.paths {
119 if path.extension().map(|e| e == "jsonl").unwrap_or(false) {
120 std::thread::sleep(Duration::from_millis(500));
122 let _ = self.import_file(&path, Some(scud_session_id), None);
123 }
124 }
125 }
126 Ok(())
127 }
128
129 fn import_file(
130 &self,
131 path: &Path,
132 scud_session_id: Option<&str>,
133 task_id: Option<&str>,
134 ) -> Result<()> {
135 let session_id = path
136 .file_stem()
137 .and_then(|s| s.to_str())
138 .unwrap_or("")
139 .to_string();
140
141 if let Ok(transcript) = parse_transcript(path) {
142 let guard = self.db.connection()?;
143 let conn = guard.as_ref().unwrap();
144 conn.execute(
147 "DELETE FROM transcript_messages WHERE claude_session_id = ?",
148 [&session_id],
149 )?;
150 crate::db::transcripts::insert_transcript(conn, &transcript, scud_session_id, task_id)?;
151 self.imported_sessions.lock().unwrap().insert(session_id);
152 }
153 Ok(())
154 }
155}