Skip to main content

tmai_core/transcript/
watcher.rs

1//! Transcript file watcher — monitors JSONL files for changes
2//! and maintains parsed transcript state per agent.
3
4use std::collections::HashMap;
5use std::io::{BufRead, Seek, SeekFrom};
6use std::path::Path;
7use std::sync::Arc;
8
9use parking_lot::RwLock;
10use tracing::debug;
11
12use super::parser::parse_jsonl_line;
13use super::renderer::render_preview;
14use super::types::TranscriptState;
15
16/// Shared transcript registry: pane_id → TranscriptState
17pub type TranscriptRegistry = Arc<RwLock<HashMap<String, TranscriptState>>>;
18
19/// Create a new empty transcript registry
20pub fn new_transcript_registry() -> TranscriptRegistry {
21    Arc::new(RwLock::new(HashMap::new()))
22}
23
24/// Maximum preview lines to render
25const MAX_PREVIEW_LINES: usize = 80;
26
27/// Number of tail lines to read on initial file open
28const INITIAL_TAIL_LINES: usize = 100;
29
30/// Transcript watcher that monitors JSONL files for changes
31pub struct TranscriptWatcher {
32    /// Shared transcript state
33    registry: TranscriptRegistry,
34}
35
36impl TranscriptWatcher {
37    /// Create a new TranscriptWatcher
38    pub fn new(registry: TranscriptRegistry) -> Self {
39        Self { registry }
40    }
41
42    /// Start watching a transcript file for a given pane_id
43    ///
44    /// Performs initial tail read, then subsequent calls to `poll_updates()`
45    /// will read new content.
46    pub fn start_watching(&self, pane_id: &str, path: &str, session_id: &str) {
47        // Check if already watching
48        {
49            let reg = self.registry.read();
50            if reg.contains_key(pane_id) {
51                return;
52            }
53        }
54
55        debug!(pane_id, path, "Starting transcript watch");
56
57        let mut state = TranscriptState::new(
58            path.to_string(),
59            session_id.to_string(),
60            pane_id.to_string(),
61        );
62
63        // Initial read: tail of file
64        if let Err(e) = read_tail_lines(path, &mut state) {
65            debug!(path, error = %e, "Failed initial transcript read (file may not exist yet)");
66        }
67
68        // Generate initial preview
69        state.preview_text = render_preview(&state.recent_records, MAX_PREVIEW_LINES);
70
71        let mut reg = self.registry.write();
72        reg.insert(pane_id.to_string(), state);
73    }
74
75    /// Stop watching a transcript file
76    pub fn stop_watching(&self, pane_id: &str) {
77        let mut reg = self.registry.write();
78        if reg.remove(pane_id).is_some() {
79            debug!(pane_id, "Stopped transcript watch");
80        }
81    }
82
83    /// Poll for updates on all watched transcripts
84    ///
85    /// Reads new lines since last_read_pos and updates preview text.
86    pub fn poll_updates(&self) {
87        let pane_ids: Vec<String> = {
88            let reg = self.registry.read();
89            reg.keys().cloned().collect()
90        };
91
92        for pane_id in pane_ids {
93            let mut reg = self.registry.write();
94            if let Some(state) = reg.get_mut(&pane_id) {
95                if let Err(e) = read_new_lines(state) {
96                    debug!(
97                        pane_id,
98                        path = %state.path,
99                        error = %e,
100                        "Failed to read transcript updates"
101                    );
102                }
103            }
104        }
105    }
106
107    /// Get the shared registry
108    pub fn registry(&self) -> &TranscriptRegistry {
109        &self.registry
110    }
111}
112
113/// Read tail lines from a file for initial display
114fn read_tail_lines(path: &str, state: &mut TranscriptState) -> std::io::Result<()> {
115    let file = std::fs::File::open(path)?;
116    let metadata = file.metadata()?;
117    let file_size = metadata.len();
118
119    // Read last N lines by seeking backward
120    let reader = std::io::BufReader::new(&file);
121    let all_lines: Vec<String> = reader.lines().collect::<Result<_, _>>()?;
122
123    let start = all_lines.len().saturating_sub(INITIAL_TAIL_LINES);
124    let mut records = Vec::new();
125    for line in &all_lines[start..] {
126        if let Some(record) = parse_jsonl_line(line) {
127            records.push(record);
128        }
129    }
130
131    state.push_records(records);
132    state.last_read_pos = file_size;
133
134    Ok(())
135}
136
137/// Read new lines since last_read_pos
138fn read_new_lines(state: &mut TranscriptState) -> std::io::Result<()> {
139    let path = Path::new(&state.path);
140    if !path.exists() {
141        return Ok(());
142    }
143
144    let file = std::fs::File::open(path)?;
145    let metadata = file.metadata()?;
146    let current_size = metadata.len();
147
148    // No new data
149    if current_size <= state.last_read_pos {
150        // File might have been truncated/rotated
151        if current_size < state.last_read_pos {
152            debug!(
153                path = %state.path,
154                "Transcript file truncated, resetting position"
155            );
156            state.last_read_pos = 0;
157            state.recent_records.clear();
158        } else {
159            return Ok(());
160        }
161    }
162
163    let mut reader = std::io::BufReader::new(file);
164    reader.seek(SeekFrom::Start(state.last_read_pos))?;
165
166    let mut new_records = Vec::new();
167    let mut line = String::new();
168    loop {
169        line.clear();
170        let bytes_read = reader.read_line(&mut line)?;
171        if bytes_read == 0 {
172            break;
173        }
174        if let Some(record) = parse_jsonl_line(&line) {
175            new_records.push(record);
176        }
177    }
178
179    if !new_records.is_empty() {
180        debug!(
181            path = %state.path,
182            new_records = new_records.len(),
183            "Read new transcript records"
184        );
185        state.push_records(new_records);
186        state.preview_text = render_preview(&state.recent_records, MAX_PREVIEW_LINES);
187    }
188
189    state.last_read_pos = current_size;
190    Ok(())
191}
192
193#[cfg(test)]
194mod tests {
195    use super::*;
196    use std::io::Write;
197
198    #[test]
199    fn test_transcript_watcher_start_stop() {
200        let registry = new_transcript_registry();
201        let watcher = TranscriptWatcher::new(registry.clone());
202
203        // Start watching a non-existent file (should not crash)
204        watcher.start_watching("5", "/tmp/nonexistent_transcript.jsonl", "sess1");
205
206        {
207            let reg = registry.read();
208            assert!(reg.contains_key("5"));
209        }
210
211        watcher.stop_watching("5");
212
213        {
214            let reg = registry.read();
215            assert!(!reg.contains_key("5"));
216        }
217    }
218
219    #[test]
220    fn test_transcript_watcher_reads_file() {
221        let tmp = tempfile::NamedTempFile::new().unwrap();
222        let path = tmp.path().to_str().unwrap().to_string();
223
224        // Write some JSONL content
225        {
226            let mut file = std::fs::File::create(&path).unwrap();
227            writeln!(file, r#"{{"type":"user","message":{{"content":"Hello"}}}}"#).unwrap();
228            writeln!(file, r#"{{"type":"assistant","message":{{"content":[{{"type":"text","text":"Hi there!"}}]}}}}"#).unwrap();
229        }
230
231        let registry = new_transcript_registry();
232        let watcher = TranscriptWatcher::new(registry.clone());
233        watcher.start_watching("5", &path, "sess1");
234
235        {
236            let reg = registry.read();
237            let state = reg.get("5").unwrap();
238            assert_eq!(state.recent_records.len(), 2);
239            assert!(state.preview_text.contains("▶ User: Hello"));
240            assert!(state.preview_text.contains("◀ Hi there!"));
241        }
242    }
243
244    #[test]
245    fn test_transcript_watcher_incremental_read() {
246        let tmp = tempfile::NamedTempFile::new().unwrap();
247        let path = tmp.path().to_str().unwrap().to_string();
248
249        // Write initial content
250        {
251            let mut file = std::fs::File::create(&path).unwrap();
252            writeln!(file, r#"{{"type":"user","message":{{"content":"First"}}}}"#).unwrap();
253        }
254
255        let registry = new_transcript_registry();
256        let watcher = TranscriptWatcher::new(registry.clone());
257        watcher.start_watching("5", &path, "sess1");
258
259        {
260            let reg = registry.read();
261            assert_eq!(reg.get("5").unwrap().recent_records.len(), 1);
262        }
263
264        // Append new content
265        {
266            let mut file = std::fs::OpenOptions::new()
267                .append(true)
268                .open(&path)
269                .unwrap();
270            writeln!(
271                file,
272                r#"{{"type":"user","message":{{"content":"Second"}}}}"#
273            )
274            .unwrap();
275        }
276
277        // Poll for updates
278        watcher.poll_updates();
279
280        {
281            let reg = registry.read();
282            let state = reg.get("5").unwrap();
283            assert_eq!(state.recent_records.len(), 2);
284            assert!(state.preview_text.contains("Second"));
285        }
286    }
287}