tmai_core/transcript/
watcher.rs1use std::collections::HashMap;
5use std::io::{BufRead, Seek, SeekFrom};
6use std::path::Path;
7use std::sync::Arc;
8
9use parking_lot::RwLock;
10use tracing::debug;
11
12use super::parser::parse_jsonl_line;
13use super::renderer::render_preview;
14use super::types::TranscriptState;
15
16pub type TranscriptRegistry = Arc<RwLock<HashMap<String, TranscriptState>>>;
18
19pub fn new_transcript_registry() -> TranscriptRegistry {
21 Arc::new(RwLock::new(HashMap::new()))
22}
23
24const MAX_PREVIEW_LINES: usize = 80;
26
27const INITIAL_TAIL_LINES: usize = 50_000;
30
31pub struct TranscriptWatcher {
33 registry: TranscriptRegistry,
35}
36
37impl TranscriptWatcher {
38 pub fn new(registry: TranscriptRegistry) -> Self {
40 Self { registry }
41 }
42
43 pub fn start_watching(&self, pane_id: &str, path: &str, session_id: &str) {
48 {
50 let reg = self.registry.read();
51 if reg.contains_key(pane_id) {
52 return;
53 }
54 }
55
56 debug!(pane_id, path, "Starting transcript watch");
57
58 let mut state = TranscriptState::new(
59 path.to_string(),
60 session_id.to_string(),
61 pane_id.to_string(),
62 );
63
64 if let Err(e) = read_tail_lines(path, &mut state) {
66 debug!(path, error = %e, "Failed initial transcript read (file may not exist yet)");
67 }
68
69 state.preview_text = render_preview(&state.recent_records, MAX_PREVIEW_LINES);
71
72 let mut reg = self.registry.write();
73 reg.insert(pane_id.to_string(), state);
74 }
75
76 pub fn stop_watching(&self, pane_id: &str) {
78 let mut reg = self.registry.write();
79 if reg.remove(pane_id).is_some() {
80 debug!(pane_id, "Stopped transcript watch");
81 }
82 }
83
84 pub fn poll_updates(&self) {
88 let pane_ids: Vec<String> = {
89 let reg = self.registry.read();
90 reg.keys().cloned().collect()
91 };
92
93 for pane_id in pane_ids {
94 let mut reg = self.registry.write();
95 if let Some(state) = reg.get_mut(&pane_id) {
96 if let Err(e) = read_new_lines(state) {
97 debug!(
98 pane_id,
99 path = %state.path,
100 error = %e,
101 "Failed to read transcript updates"
102 );
103 }
104 }
105 }
106 }
107
108 pub fn registry(&self) -> &TranscriptRegistry {
110 &self.registry
111 }
112}
113
114fn read_tail_lines(path: &str, state: &mut TranscriptState) -> std::io::Result<()> {
116 let file = std::fs::File::open(path)?;
117 let metadata = file.metadata()?;
118 let file_size = metadata.len();
119
120 let reader = std::io::BufReader::new(&file);
122 let all_lines: Vec<String> = reader.lines().collect::<Result<_, _>>()?;
123
124 let start = all_lines.len().saturating_sub(INITIAL_TAIL_LINES);
125 let mut records = Vec::new();
126 for line in &all_lines[start..] {
127 if let Some(record) = parse_jsonl_line(line) {
128 records.push(record);
129 }
130 }
131
132 state.push_records(records);
133 state.last_read_pos = file_size;
134
135 Ok(())
136}
137
138fn read_new_lines(state: &mut TranscriptState) -> std::io::Result<()> {
140 let path = Path::new(&state.path);
141 if !path.exists() {
142 return Ok(());
143 }
144
145 let file = std::fs::File::open(path)?;
146 let metadata = file.metadata()?;
147 let current_size = metadata.len();
148
149 if current_size <= state.last_read_pos {
151 if current_size < state.last_read_pos {
153 debug!(
154 path = %state.path,
155 "Transcript file truncated, resetting position"
156 );
157 state.last_read_pos = 0;
158 state.recent_records.clear();
159 } else {
160 return Ok(());
161 }
162 }
163
164 let mut reader = std::io::BufReader::new(file);
165 reader.seek(SeekFrom::Start(state.last_read_pos))?;
166
167 let mut new_records = Vec::new();
168 let mut line = String::new();
169 loop {
170 line.clear();
171 let bytes_read = reader.read_line(&mut line)?;
172 if bytes_read == 0 {
173 break;
174 }
175 if let Some(record) = parse_jsonl_line(&line) {
176 new_records.push(record);
177 }
178 }
179
180 if !new_records.is_empty() {
181 debug!(
182 path = %state.path,
183 new_records = new_records.len(),
184 "Read new transcript records"
185 );
186 state.push_records(new_records);
187 state.preview_text = render_preview(&state.recent_records, MAX_PREVIEW_LINES);
188 }
189
190 state.last_read_pos = current_size;
191 Ok(())
192}
193
194#[cfg(test)]
195mod tests {
196 use super::*;
197 use std::io::Write;
198
199 #[test]
200 fn test_transcript_watcher_start_stop() {
201 let registry = new_transcript_registry();
202 let watcher = TranscriptWatcher::new(registry.clone());
203
204 watcher.start_watching("5", "/tmp/nonexistent_transcript.jsonl", "sess1");
206
207 {
208 let reg = registry.read();
209 assert!(reg.contains_key("5"));
210 }
211
212 watcher.stop_watching("5");
213
214 {
215 let reg = registry.read();
216 assert!(!reg.contains_key("5"));
217 }
218 }
219
220 #[test]
221 fn test_transcript_watcher_reads_file() {
222 let tmp = tempfile::NamedTempFile::new().unwrap();
223 let path = tmp.path().to_str().unwrap().to_string();
224
225 {
227 let mut file = std::fs::File::create(&path).unwrap();
228 writeln!(file, r#"{{"type":"user","message":{{"content":"Hello"}}}}"#).unwrap();
229 writeln!(file, r#"{{"type":"assistant","message":{{"content":[{{"type":"text","text":"Hi there!"}}]}}}}"#).unwrap();
230 }
231
232 let registry = new_transcript_registry();
233 let watcher = TranscriptWatcher::new(registry.clone());
234 watcher.start_watching("5", &path, "sess1");
235
236 {
237 let reg = registry.read();
238 let state = reg.get("5").unwrap();
239 assert_eq!(state.recent_records.len(), 2);
240 assert!(state.preview_text.contains("▶ User: Hello"));
241 assert!(state.preview_text.contains("◀ Hi there!"));
242 }
243 }
244
245 #[test]
246 fn test_transcript_watcher_incremental_read() {
247 let tmp = tempfile::NamedTempFile::new().unwrap();
248 let path = tmp.path().to_str().unwrap().to_string();
249
250 {
252 let mut file = std::fs::File::create(&path).unwrap();
253 writeln!(file, r#"{{"type":"user","message":{{"content":"First"}}}}"#).unwrap();
254 }
255
256 let registry = new_transcript_registry();
257 let watcher = TranscriptWatcher::new(registry.clone());
258 watcher.start_watching("5", &path, "sess1");
259
260 {
261 let reg = registry.read();
262 assert_eq!(reg.get("5").unwrap().recent_records.len(), 1);
263 }
264
265 {
267 let mut file = std::fs::OpenOptions::new()
268 .append(true)
269 .open(&path)
270 .unwrap();
271 writeln!(
272 file,
273 r#"{{"type":"user","message":{{"content":"Second"}}}}"#
274 )
275 .unwrap();
276 }
277
278 watcher.poll_updates();
280
281 {
282 let reg = registry.read();
283 let state = reg.get("5").unwrap();
284 assert_eq!(state.recent_records.len(), 2);
285 assert!(state.preview_text.contains("Second"));
286 }
287 }
288}