Skip to main content

tmai_core/transcript/
watcher.rs

1//! Transcript file watcher — monitors JSONL files for changes
2//! and maintains parsed transcript state per agent.
3
4use std::collections::HashMap;
5use std::io::{BufRead, Seek, SeekFrom};
6use std::path::Path;
7use std::sync::Arc;
8
9use parking_lot::RwLock;
10use tracing::debug;
11
12use super::parser::parse_jsonl_line;
13use super::renderer::render_preview;
14use super::types::TranscriptState;
15
16/// Shared transcript registry: pane_id → TranscriptState
17pub type TranscriptRegistry = Arc<RwLock<HashMap<String, TranscriptState>>>;
18
19/// Create a new empty transcript registry
20pub fn new_transcript_registry() -> TranscriptRegistry {
21    Arc::new(RwLock::new(HashMap::new()))
22}
23
24/// Maximum preview lines to render
25const MAX_PREVIEW_LINES: usize = 80;
26
27/// Number of tail lines to read on initial file open.
28/// Set high to load full session history for hybrid scrollback preview.
29const INITIAL_TAIL_LINES: usize = 50_000;
30
31/// Transcript watcher that monitors JSONL files for changes
32pub struct TranscriptWatcher {
33    /// Shared transcript state
34    registry: TranscriptRegistry,
35}
36
37impl TranscriptWatcher {
38    /// Create a new TranscriptWatcher
39    pub fn new(registry: TranscriptRegistry) -> Self {
40        Self { registry }
41    }
42
43    /// Start watching a transcript file for a given pane_id
44    ///
45    /// Performs initial tail read, then subsequent calls to `poll_updates()`
46    /// will read new content.
47    pub fn start_watching(&self, pane_id: &str, path: &str, session_id: &str) {
48        // Check if already watching
49        {
50            let reg = self.registry.read();
51            if reg.contains_key(pane_id) {
52                return;
53            }
54        }
55
56        debug!(pane_id, path, "Starting transcript watch");
57
58        let mut state = TranscriptState::new(
59            path.to_string(),
60            session_id.to_string(),
61            pane_id.to_string(),
62        );
63
64        // Initial read: tail of file
65        if let Err(e) = read_tail_lines(path, &mut state) {
66            debug!(path, error = %e, "Failed initial transcript read (file may not exist yet)");
67        }
68
69        // Generate initial preview
70        state.preview_text = render_preview(&state.recent_records, MAX_PREVIEW_LINES);
71
72        let mut reg = self.registry.write();
73        reg.insert(pane_id.to_string(), state);
74    }
75
76    /// Stop watching a transcript file
77    pub fn stop_watching(&self, pane_id: &str) {
78        let mut reg = self.registry.write();
79        if reg.remove(pane_id).is_some() {
80            debug!(pane_id, "Stopped transcript watch");
81        }
82    }
83
84    /// Poll for updates on all watched transcripts
85    ///
86    /// Reads new lines since last_read_pos and updates preview text.
87    pub fn poll_updates(&self) {
88        let pane_ids: Vec<String> = {
89            let reg = self.registry.read();
90            reg.keys().cloned().collect()
91        };
92
93        for pane_id in pane_ids {
94            let mut reg = self.registry.write();
95            if let Some(state) = reg.get_mut(&pane_id) {
96                if let Err(e) = read_new_lines(state) {
97                    debug!(
98                        pane_id,
99                        path = %state.path,
100                        error = %e,
101                        "Failed to read transcript updates"
102                    );
103                }
104            }
105        }
106    }
107
108    /// Get the shared registry
109    pub fn registry(&self) -> &TranscriptRegistry {
110        &self.registry
111    }
112}
113
114/// Read tail lines from a file for initial display
115fn read_tail_lines(path: &str, state: &mut TranscriptState) -> std::io::Result<()> {
116    let file = std::fs::File::open(path)?;
117    let metadata = file.metadata()?;
118    let file_size = metadata.len();
119
120    // Read last N lines by seeking backward
121    let reader = std::io::BufReader::new(&file);
122    let all_lines: Vec<String> = reader.lines().collect::<Result<_, _>>()?;
123
124    let start = all_lines.len().saturating_sub(INITIAL_TAIL_LINES);
125    let mut records = Vec::new();
126    for line in &all_lines[start..] {
127        if let Some(record) = parse_jsonl_line(line) {
128            records.push(record);
129        }
130    }
131
132    state.push_records(records);
133    state.last_read_pos = file_size;
134
135    Ok(())
136}
137
138/// Read new lines since last_read_pos
139fn read_new_lines(state: &mut TranscriptState) -> std::io::Result<()> {
140    let path = Path::new(&state.path);
141    if !path.exists() {
142        return Ok(());
143    }
144
145    let file = std::fs::File::open(path)?;
146    let metadata = file.metadata()?;
147    let current_size = metadata.len();
148
149    // No new data
150    if current_size <= state.last_read_pos {
151        // File might have been truncated/rotated
152        if current_size < state.last_read_pos {
153            debug!(
154                path = %state.path,
155                "Transcript file truncated, resetting position"
156            );
157            state.last_read_pos = 0;
158            state.recent_records.clear();
159        } else {
160            return Ok(());
161        }
162    }
163
164    let mut reader = std::io::BufReader::new(file);
165    reader.seek(SeekFrom::Start(state.last_read_pos))?;
166
167    let mut new_records = Vec::new();
168    let mut line = String::new();
169    loop {
170        line.clear();
171        let bytes_read = reader.read_line(&mut line)?;
172        if bytes_read == 0 {
173            break;
174        }
175        if let Some(record) = parse_jsonl_line(&line) {
176            new_records.push(record);
177        }
178    }
179
180    if !new_records.is_empty() {
181        debug!(
182            path = %state.path,
183            new_records = new_records.len(),
184            "Read new transcript records"
185        );
186        state.push_records(new_records);
187        state.preview_text = render_preview(&state.recent_records, MAX_PREVIEW_LINES);
188    }
189
190    state.last_read_pos = current_size;
191    Ok(())
192}
193
194#[cfg(test)]
195mod tests {
196    use super::*;
197    use std::io::Write;
198
199    #[test]
200    fn test_transcript_watcher_start_stop() {
201        let registry = new_transcript_registry();
202        let watcher = TranscriptWatcher::new(registry.clone());
203
204        // Start watching a non-existent file (should not crash)
205        watcher.start_watching("5", "/tmp/nonexistent_transcript.jsonl", "sess1");
206
207        {
208            let reg = registry.read();
209            assert!(reg.contains_key("5"));
210        }
211
212        watcher.stop_watching("5");
213
214        {
215            let reg = registry.read();
216            assert!(!reg.contains_key("5"));
217        }
218    }
219
220    #[test]
221    fn test_transcript_watcher_reads_file() {
222        let tmp = tempfile::NamedTempFile::new().unwrap();
223        let path = tmp.path().to_str().unwrap().to_string();
224
225        // Write some JSONL content
226        {
227            let mut file = std::fs::File::create(&path).unwrap();
228            writeln!(file, r#"{{"type":"user","message":{{"content":"Hello"}}}}"#).unwrap();
229            writeln!(file, r#"{{"type":"assistant","message":{{"content":[{{"type":"text","text":"Hi there!"}}]}}}}"#).unwrap();
230        }
231
232        let registry = new_transcript_registry();
233        let watcher = TranscriptWatcher::new(registry.clone());
234        watcher.start_watching("5", &path, "sess1");
235
236        {
237            let reg = registry.read();
238            let state = reg.get("5").unwrap();
239            assert_eq!(state.recent_records.len(), 2);
240            assert!(state.preview_text.contains("▶ User: Hello"));
241            assert!(state.preview_text.contains("◀ Hi there!"));
242        }
243    }
244
245    #[test]
246    fn test_transcript_watcher_incremental_read() {
247        let tmp = tempfile::NamedTempFile::new().unwrap();
248        let path = tmp.path().to_str().unwrap().to_string();
249
250        // Write initial content
251        {
252            let mut file = std::fs::File::create(&path).unwrap();
253            writeln!(file, r#"{{"type":"user","message":{{"content":"First"}}}}"#).unwrap();
254        }
255
256        let registry = new_transcript_registry();
257        let watcher = TranscriptWatcher::new(registry.clone());
258        watcher.start_watching("5", &path, "sess1");
259
260        {
261            let reg = registry.read();
262            assert_eq!(reg.get("5").unwrap().recent_records.len(), 1);
263        }
264
265        // Append new content
266        {
267            let mut file = std::fs::OpenOptions::new()
268                .append(true)
269                .open(&path)
270                .unwrap();
271            writeln!(
272                file,
273                r#"{{"type":"user","message":{{"content":"Second"}}}}"#
274            )
275            .unwrap();
276        }
277
278        // Poll for updates
279        watcher.poll_updates();
280
281        {
282            let reg = registry.read();
283            let state = reg.get("5").unwrap();
284            assert_eq!(state.recent_records.len(), 2);
285            assert!(state.preview_text.contains("Second"));
286        }
287    }
288}