tmai_core/transcript/
watcher.rs1use std::collections::HashMap;
5use std::io::{BufRead, Seek, SeekFrom};
6use std::path::Path;
7use std::sync::Arc;
8
9use parking_lot::RwLock;
10use tracing::debug;
11
12use super::parser::parse_jsonl_line;
13use super::renderer::render_preview;
14use super::types::TranscriptState;
15
16pub type TranscriptRegistry = Arc<RwLock<HashMap<String, TranscriptState>>>;
18
19pub fn new_transcript_registry() -> TranscriptRegistry {
21 Arc::new(RwLock::new(HashMap::new()))
22}
23
24const MAX_PREVIEW_LINES: usize = 80;
26
27const INITIAL_TAIL_LINES: usize = 100;
29
30pub struct TranscriptWatcher {
32 registry: TranscriptRegistry,
34}
35
36impl TranscriptWatcher {
37 pub fn new(registry: TranscriptRegistry) -> Self {
39 Self { registry }
40 }
41
42 pub fn start_watching(&self, pane_id: &str, path: &str, session_id: &str) {
47 {
49 let reg = self.registry.read();
50 if reg.contains_key(pane_id) {
51 return;
52 }
53 }
54
55 debug!(pane_id, path, "Starting transcript watch");
56
57 let mut state = TranscriptState::new(
58 path.to_string(),
59 session_id.to_string(),
60 pane_id.to_string(),
61 );
62
63 if let Err(e) = read_tail_lines(path, &mut state) {
65 debug!(path, error = %e, "Failed initial transcript read (file may not exist yet)");
66 }
67
68 state.preview_text = render_preview(&state.recent_records, MAX_PREVIEW_LINES);
70
71 let mut reg = self.registry.write();
72 reg.insert(pane_id.to_string(), state);
73 }
74
75 pub fn stop_watching(&self, pane_id: &str) {
77 let mut reg = self.registry.write();
78 if reg.remove(pane_id).is_some() {
79 debug!(pane_id, "Stopped transcript watch");
80 }
81 }
82
83 pub fn poll_updates(&self) {
87 let pane_ids: Vec<String> = {
88 let reg = self.registry.read();
89 reg.keys().cloned().collect()
90 };
91
92 for pane_id in pane_ids {
93 let mut reg = self.registry.write();
94 if let Some(state) = reg.get_mut(&pane_id) {
95 if let Err(e) = read_new_lines(state) {
96 debug!(
97 pane_id,
98 path = %state.path,
99 error = %e,
100 "Failed to read transcript updates"
101 );
102 }
103 }
104 }
105 }
106
107 pub fn registry(&self) -> &TranscriptRegistry {
109 &self.registry
110 }
111}
112
113fn read_tail_lines(path: &str, state: &mut TranscriptState) -> std::io::Result<()> {
115 let file = std::fs::File::open(path)?;
116 let metadata = file.metadata()?;
117 let file_size = metadata.len();
118
119 let reader = std::io::BufReader::new(&file);
121 let all_lines: Vec<String> = reader.lines().collect::<Result<_, _>>()?;
122
123 let start = all_lines.len().saturating_sub(INITIAL_TAIL_LINES);
124 let mut records = Vec::new();
125 for line in &all_lines[start..] {
126 if let Some(record) = parse_jsonl_line(line) {
127 records.push(record);
128 }
129 }
130
131 state.push_records(records);
132 state.last_read_pos = file_size;
133
134 Ok(())
135}
136
137fn read_new_lines(state: &mut TranscriptState) -> std::io::Result<()> {
139 let path = Path::new(&state.path);
140 if !path.exists() {
141 return Ok(());
142 }
143
144 let file = std::fs::File::open(path)?;
145 let metadata = file.metadata()?;
146 let current_size = metadata.len();
147
148 if current_size <= state.last_read_pos {
150 if current_size < state.last_read_pos {
152 debug!(
153 path = %state.path,
154 "Transcript file truncated, resetting position"
155 );
156 state.last_read_pos = 0;
157 state.recent_records.clear();
158 } else {
159 return Ok(());
160 }
161 }
162
163 let mut reader = std::io::BufReader::new(file);
164 reader.seek(SeekFrom::Start(state.last_read_pos))?;
165
166 let mut new_records = Vec::new();
167 let mut line = String::new();
168 loop {
169 line.clear();
170 let bytes_read = reader.read_line(&mut line)?;
171 if bytes_read == 0 {
172 break;
173 }
174 if let Some(record) = parse_jsonl_line(&line) {
175 new_records.push(record);
176 }
177 }
178
179 if !new_records.is_empty() {
180 debug!(
181 path = %state.path,
182 new_records = new_records.len(),
183 "Read new transcript records"
184 );
185 state.push_records(new_records);
186 state.preview_text = render_preview(&state.recent_records, MAX_PREVIEW_LINES);
187 }
188
189 state.last_read_pos = current_size;
190 Ok(())
191}
192
193#[cfg(test)]
194mod tests {
195 use super::*;
196 use std::io::Write;
197
198 #[test]
199 fn test_transcript_watcher_start_stop() {
200 let registry = new_transcript_registry();
201 let watcher = TranscriptWatcher::new(registry.clone());
202
203 watcher.start_watching("5", "/tmp/nonexistent_transcript.jsonl", "sess1");
205
206 {
207 let reg = registry.read();
208 assert!(reg.contains_key("5"));
209 }
210
211 watcher.stop_watching("5");
212
213 {
214 let reg = registry.read();
215 assert!(!reg.contains_key("5"));
216 }
217 }
218
219 #[test]
220 fn test_transcript_watcher_reads_file() {
221 let tmp = tempfile::NamedTempFile::new().unwrap();
222 let path = tmp.path().to_str().unwrap().to_string();
223
224 {
226 let mut file = std::fs::File::create(&path).unwrap();
227 writeln!(file, r#"{{"type":"user","message":{{"content":"Hello"}}}}"#).unwrap();
228 writeln!(file, r#"{{"type":"assistant","message":{{"content":[{{"type":"text","text":"Hi there!"}}]}}}}"#).unwrap();
229 }
230
231 let registry = new_transcript_registry();
232 let watcher = TranscriptWatcher::new(registry.clone());
233 watcher.start_watching("5", &path, "sess1");
234
235 {
236 let reg = registry.read();
237 let state = reg.get("5").unwrap();
238 assert_eq!(state.recent_records.len(), 2);
239 assert!(state.preview_text.contains("▶ User: Hello"));
240 assert!(state.preview_text.contains("◀ Hi there!"));
241 }
242 }
243
244 #[test]
245 fn test_transcript_watcher_incremental_read() {
246 let tmp = tempfile::NamedTempFile::new().unwrap();
247 let path = tmp.path().to_str().unwrap().to_string();
248
249 {
251 let mut file = std::fs::File::create(&path).unwrap();
252 writeln!(file, r#"{{"type":"user","message":{{"content":"First"}}}}"#).unwrap();
253 }
254
255 let registry = new_transcript_registry();
256 let watcher = TranscriptWatcher::new(registry.clone());
257 watcher.start_watching("5", &path, "sess1");
258
259 {
260 let reg = registry.read();
261 assert_eq!(reg.get("5").unwrap().recent_records.len(), 1);
262 }
263
264 {
266 let mut file = std::fs::OpenOptions::new()
267 .append(true)
268 .open(&path)
269 .unwrap();
270 writeln!(
271 file,
272 r#"{{"type":"user","message":{{"content":"Second"}}}}"#
273 )
274 .unwrap();
275 }
276
277 watcher.poll_updates();
279
280 {
281 let reg = registry.read();
282 let state = reg.get("5").unwrap();
283 assert_eq!(state.recent_records.len(), 2);
284 assert!(state.preview_text.contains("Second"));
285 }
286 }
287}