Skip to main content

tmai_core/transcript/
watcher.rs

1//! Transcript file watcher — monitors JSONL files for changes
2//! and maintains parsed transcript state per agent.
3
4use std::collections::HashMap;
5use std::io::{BufRead, Seek, SeekFrom};
6use std::path::Path;
7use std::sync::Arc;
8
9use parking_lot::RwLock;
10use tracing::debug;
11
12use super::parser::parse_jsonl_line;
13use super::renderer::render_preview;
14use super::types::TranscriptState;
15
16/// Shared transcript registry: pane_id → TranscriptState
17pub type TranscriptRegistry = Arc<RwLock<HashMap<String, TranscriptState>>>;
18
19/// Create a new empty transcript registry
20pub fn new_transcript_registry() -> TranscriptRegistry {
21    Arc::new(RwLock::new(HashMap::new()))
22}
23
24/// Maximum preview lines to render
25const MAX_PREVIEW_LINES: usize = 80;
26
27/// Number of tail lines to read on initial file open.
28/// Set high to load full session history for hybrid scrollback preview.
29const INITIAL_TAIL_LINES: usize = 50_000;
30
31/// Transcript watcher that monitors JSONL files for changes
32pub struct TranscriptWatcher {
33    /// Shared transcript state
34    registry: TranscriptRegistry,
35}
36
37impl TranscriptWatcher {
38    /// Create a new TranscriptWatcher
39    pub fn new(registry: TranscriptRegistry) -> Self {
40        Self { registry }
41    }
42
43    /// Start watching a transcript file for a given pane_id
44    ///
45    /// Performs initial tail read, then subsequent calls to `poll_updates()`
46    /// will read new content.
47    pub fn start_watching(&self, pane_id: &str, path: &str, session_id: &str) {
48        // Check if already watching
49        {
50            let reg = self.registry.read();
51            if reg.contains_key(pane_id) {
52                return;
53            }
54        }
55
56        debug!(pane_id, path, "Starting transcript watch");
57
58        let mut state = TranscriptState::new(
59            path.to_string(),
60            session_id.to_string(),
61            pane_id.to_string(),
62        );
63
64        // Initial read: tail of file
65        if let Err(e) = read_tail_lines(path, &mut state) {
66            debug!(path, error = %e, "Failed initial transcript read (file may not exist yet)");
67        }
68
69        // Generate initial preview
70        state.preview_text = render_preview(&state.recent_records, MAX_PREVIEW_LINES);
71
72        let mut reg = self.registry.write();
73        reg.insert(pane_id.to_string(), state);
74    }
75
76    /// Stop watching a transcript file
77    pub fn stop_watching(&self, pane_id: &str) {
78        let mut reg = self.registry.write();
79        if reg.remove(pane_id).is_some() {
80            debug!(pane_id, "Stopped transcript watch");
81        }
82    }
83
84    /// Poll for updates on all watched transcripts
85    ///
86    /// Reads new lines since last_read_pos and updates preview text.
87    pub fn poll_updates(&self) {
88        let pane_ids: Vec<String> = {
89            let reg = self.registry.read();
90            reg.keys().cloned().collect()
91        };
92
93        for pane_id in pane_ids {
94            let mut reg = self.registry.write();
95            if let Some(state) = reg.get_mut(&pane_id) {
96                if let Err(e) = read_new_lines(state) {
97                    debug!(
98                        pane_id,
99                        path = %state.path,
100                        error = %e,
101                        "Failed to read transcript updates"
102                    );
103                }
104            }
105        }
106    }
107
108    /// Get the shared registry
109    pub fn registry(&self) -> &TranscriptRegistry {
110        &self.registry
111    }
112}
113
114/// Read tail lines from a file for initial display
115fn read_tail_lines(path: &str, state: &mut TranscriptState) -> std::io::Result<()> {
116    let file = std::fs::File::open(path)?;
117    let metadata = file.metadata()?;
118    let file_size = metadata.len();
119
120    // Read last N lines by seeking backward
121    let reader = std::io::BufReader::new(&file);
122    let all_lines: Vec<String> = reader.lines().collect::<Result<_, _>>()?;
123
124    let start = all_lines.len().saturating_sub(INITIAL_TAIL_LINES);
125    let mut records = Vec::new();
126    for line in &all_lines[start..] {
127        records.extend(parse_jsonl_line(line));
128    }
129
130    state.push_records(records);
131    state.last_read_pos = file_size;
132
133    Ok(())
134}
135
136/// Read new lines since last_read_pos
137fn read_new_lines(state: &mut TranscriptState) -> std::io::Result<()> {
138    let path = Path::new(&state.path);
139    if !path.exists() {
140        return Ok(());
141    }
142
143    let file = std::fs::File::open(path)?;
144    let metadata = file.metadata()?;
145    let current_size = metadata.len();
146
147    // No new data
148    if current_size <= state.last_read_pos {
149        // File might have been truncated/rotated
150        if current_size < state.last_read_pos {
151            debug!(
152                path = %state.path,
153                "Transcript file truncated, resetting position"
154            );
155            state.last_read_pos = 0;
156            state.recent_records.clear();
157        } else {
158            return Ok(());
159        }
160    }
161
162    let mut reader = std::io::BufReader::new(file);
163    reader.seek(SeekFrom::Start(state.last_read_pos))?;
164
165    let mut new_records = Vec::new();
166    let mut line = String::new();
167    loop {
168        line.clear();
169        let bytes_read = reader.read_line(&mut line)?;
170        if bytes_read == 0 {
171            break;
172        }
173        new_records.extend(parse_jsonl_line(&line));
174    }
175
176    if !new_records.is_empty() {
177        debug!(
178            path = %state.path,
179            new_records = new_records.len(),
180            "Read new transcript records"
181        );
182        state.push_records(new_records);
183        state.preview_text = render_preview(&state.recent_records, MAX_PREVIEW_LINES);
184    }
185
186    state.last_read_pos = current_size;
187    Ok(())
188}
189
190#[cfg(test)]
191mod tests {
192    use super::*;
193    use std::io::Write;
194
195    #[test]
196    fn test_transcript_watcher_start_stop() {
197        let registry = new_transcript_registry();
198        let watcher = TranscriptWatcher::new(registry.clone());
199
200        // Start watching a non-existent file (should not crash)
201        watcher.start_watching("5", "/tmp/nonexistent_transcript.jsonl", "sess1");
202
203        {
204            let reg = registry.read();
205            assert!(reg.contains_key("5"));
206        }
207
208        watcher.stop_watching("5");
209
210        {
211            let reg = registry.read();
212            assert!(!reg.contains_key("5"));
213        }
214    }
215
216    #[test]
217    fn test_transcript_watcher_reads_file() {
218        let tmp = tempfile::NamedTempFile::new().unwrap();
219        let path = tmp.path().to_str().unwrap().to_string();
220
221        // Write some JSONL content
222        {
223            let mut file = std::fs::File::create(&path).unwrap();
224            writeln!(file, r#"{{"type":"user","message":{{"content":"Hello"}}}}"#).unwrap();
225            writeln!(file, r#"{{"type":"assistant","message":{{"content":[{{"type":"text","text":"Hi there!"}}]}}}}"#).unwrap();
226        }
227
228        let registry = new_transcript_registry();
229        let watcher = TranscriptWatcher::new(registry.clone());
230        watcher.start_watching("5", &path, "sess1");
231
232        {
233            let reg = registry.read();
234            let state = reg.get("5").unwrap();
235            assert_eq!(state.recent_records.len(), 2);
236            assert!(state.preview_text.contains("▶ User: Hello"));
237            assert!(state.preview_text.contains("◀ Hi there!"));
238        }
239    }
240
241    #[test]
242    fn test_transcript_watcher_incremental_read() {
243        let tmp = tempfile::NamedTempFile::new().unwrap();
244        let path = tmp.path().to_str().unwrap().to_string();
245
246        // Write initial content
247        {
248            let mut file = std::fs::File::create(&path).unwrap();
249            writeln!(file, r#"{{"type":"user","message":{{"content":"First"}}}}"#).unwrap();
250        }
251
252        let registry = new_transcript_registry();
253        let watcher = TranscriptWatcher::new(registry.clone());
254        watcher.start_watching("5", &path, "sess1");
255
256        {
257            let reg = registry.read();
258            assert_eq!(reg.get("5").unwrap().recent_records.len(), 1);
259        }
260
261        // Append new content
262        {
263            let mut file = std::fs::OpenOptions::new()
264                .append(true)
265                .open(&path)
266                .unwrap();
267            writeln!(
268                file,
269                r#"{{"type":"user","message":{{"content":"Second"}}}}"#
270            )
271            .unwrap();
272        }
273
274        // Poll for updates
275        watcher.poll_updates();
276
277        {
278            let reg = registry.read();
279            let state = reg.get("5").unwrap();
280            assert_eq!(state.recent_records.len(), 2);
281            assert!(state.preview_text.contains("Second"));
282        }
283    }
284}