use anyhow::Result;
use notify::{Event, RecursiveMode, Watcher};
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use crate::commands::swarm::transcript::{find_claude_project_dir, parse_transcript};
use crate::db::Database;
pub struct TranscriptWatcher {
db: Arc<Database>,
project_root: PathBuf,
imported_sessions: Arc<Mutex<HashSet<String>>>,
}
impl TranscriptWatcher {
pub fn new(project_root: &Path, db: Arc<Database>) -> Self {
Self {
db,
project_root: project_root.to_path_buf(),
imported_sessions: Arc::new(Mutex::new(HashSet::new())),
}
}
pub fn import_all(
&self,
scud_session_id: Option<&str>,
task_id: Option<&str>,
) -> Result<usize> {
let claude_dir = match find_claude_project_dir(&self.project_root) {
Some(dir) => dir,
None => return Ok(0),
};
let mut count = 0;
for entry in std::fs::read_dir(&claude_dir)? {
let path = entry?.path();
if path.extension().map(|e| e == "jsonl").unwrap_or(false) {
let session_id = path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("")
.to_string();
{
let imported = self.imported_sessions.lock().unwrap();
if imported.contains(&session_id) {
continue;
}
}
{
let guard = self.db.connection()?;
let conn = guard.as_ref().unwrap();
let exists: i64 = conn.query_row(
"SELECT COUNT(*) FROM transcript_messages WHERE claude_session_id = ?",
[&session_id],
|r| r.get(0),
)?;
if exists > 0 {
self.imported_sessions.lock().unwrap().insert(session_id);
continue;
}
}
if let Ok(transcript) = parse_transcript(&path) {
let guard = self.db.connection()?;
let conn = guard.as_ref().unwrap();
crate::db::transcripts::insert_transcript(
conn,
&transcript,
scud_session_id,
task_id,
)?;
self.imported_sessions.lock().unwrap().insert(session_id);
count += 1;
}
}
}
Ok(count)
}
pub fn watch(&self, scud_session_id: &str) -> Result<()> {
let claude_dir = match find_claude_project_dir(&self.project_root) {
Some(dir) => dir,
None => {
return Ok(());
}
};
let (tx, rx) = std::sync::mpsc::channel();
let mut watcher = notify::recommended_watcher(move |res: Result<Event, notify::Error>| {
if let Ok(event) = res {
let _ = tx.send(event);
}
})?;
watcher.watch(&claude_dir, RecursiveMode::NonRecursive)?;
while let Ok(event) = rx.recv() {
for path in event.paths {
if path.extension().map(|e| e == "jsonl").unwrap_or(false) {
std::thread::sleep(Duration::from_millis(500));
let _ = self.import_file(&path, Some(scud_session_id), None);
}
}
}
Ok(())
}
fn import_file(
&self,
path: &Path,
scud_session_id: Option<&str>,
task_id: Option<&str>,
) -> Result<()> {
let session_id = path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("")
.to_string();
if let Ok(transcript) = parse_transcript(path) {
let guard = self.db.connection()?;
let conn = guard.as_ref().unwrap();
conn.execute(
"DELETE FROM transcript_messages WHERE claude_session_id = ?",
[&session_id],
)?;
crate::db::transcripts::insert_transcript(conn, &transcript, scud_session_id, task_id)?;
self.imported_sessions.lock().unwrap().insert(session_id);
}
Ok(())
}
}