scud/
transcript_watcher.rs1use anyhow::Result;
7use notify::{Event, RecursiveMode, Watcher};
8use std::collections::HashSet;
9use std::path::{Path, PathBuf};
10use std::sync::{Arc, Mutex};
11use std::time::Duration;
12
13use crate::commands::swarm::transcript::{find_claude_project_dir, parse_transcript};
14use crate::db::Database;
15
16pub struct TranscriptWatcher {
17 db: Arc<Database>,
18 project_root: PathBuf,
19 imported_sessions: Arc<Mutex<HashSet<String>>>,
20}
21
22impl TranscriptWatcher {
23 pub fn new(project_root: &Path, db: Arc<Database>) -> Self {
24 Self {
25 db,
26 project_root: project_root.to_path_buf(),
27 imported_sessions: Arc::new(Mutex::new(HashSet::new())),
28 }
29 }
30
31 pub fn import_all(
34 &self,
35 scud_session_id: Option<&str>,
36 task_id: Option<&str>,
37 ) -> Result<usize> {
38 let claude_dir = match find_claude_project_dir(&self.project_root) {
39 Some(dir) => dir,
40 None => return Ok(0),
41 };
42
43 let mut count = 0;
44
45 for entry in std::fs::read_dir(&claude_dir)? {
46 let path = entry?.path();
47 if path.extension().map(|e| e == "jsonl").unwrap_or(false) {
48 let session_id = path
49 .file_stem()
50 .and_then(|s| s.to_str())
51 .unwrap_or("")
52 .to_string();
53
54 {
56 let imported = self.imported_sessions.lock().unwrap();
57 if imported.contains(&session_id) {
58 continue;
59 }
60 }
61
62 {
64 let guard = self.db.connection()?;
65 let conn = guard.as_ref().unwrap();
66 let exists: i64 = conn.query_row(
67 "SELECT COUNT(*) FROM transcript_messages WHERE claude_session_id = ?",
68 [&session_id],
69 |r| r.get(0),
70 )?;
71 if exists > 0 {
72 self.imported_sessions
73 .lock()
74 .unwrap()
75 .insert(session_id);
76 continue;
77 }
78 }
79
80 if let Ok(transcript) = parse_transcript(&path) {
82 let guard = self.db.connection()?;
83 let conn = guard.as_ref().unwrap();
84 crate::db::transcripts::insert_transcript(
85 conn,
86 &transcript,
87 scud_session_id,
88 task_id,
89 )?;
90 self.imported_sessions
91 .lock()
92 .unwrap()
93 .insert(session_id);
94 count += 1;
95 }
96 }
97 }
98 Ok(count)
99 }
100
101 pub fn watch(&self, scud_session_id: &str) -> Result<()> {
104 let claude_dir = match find_claude_project_dir(&self.project_root) {
105 Some(dir) => dir,
106 None => {
107 return Ok(());
109 }
110 };
111
112 let (tx, rx) = std::sync::mpsc::channel();
113
114 let mut watcher = notify::recommended_watcher(move |res: Result<Event, notify::Error>| {
115 if let Ok(event) = res {
116 let _ = tx.send(event);
117 }
118 })?;
119
120 watcher.watch(&claude_dir, RecursiveMode::NonRecursive)?;
121
122 while let Ok(event) = rx.recv() {
124 for path in event.paths {
125 if path.extension().map(|e| e == "jsonl").unwrap_or(false) {
126 std::thread::sleep(Duration::from_millis(500));
128 let _ = self.import_file(&path, Some(scud_session_id), None);
129 }
130 }
131 }
132 Ok(())
133 }
134
135 fn import_file(
136 &self,
137 path: &Path,
138 scud_session_id: Option<&str>,
139 task_id: Option<&str>,
140 ) -> Result<()> {
141 let session_id = path
142 .file_stem()
143 .and_then(|s| s.to_str())
144 .unwrap_or("")
145 .to_string();
146
147 if let Ok(transcript) = parse_transcript(path) {
148 let guard = self.db.connection()?;
149 let conn = guard.as_ref().unwrap();
150 conn.execute(
153 "DELETE FROM transcript_messages WHERE claude_session_id = ?",
154 [&session_id],
155 )?;
156 crate::db::transcripts::insert_transcript(
157 conn,
158 &transcript,
159 scud_session_id,
160 task_id,
161 )?;
162 self.imported_sessions
163 .lock()
164 .unwrap()
165 .insert(session_id);
166 }
167 Ok(())
168 }
169}