tmai_core/transcript/
watcher.rs1use std::collections::HashMap;
5use std::io::{BufRead, Seek, SeekFrom};
6use std::path::Path;
7use std::sync::Arc;
8
9use parking_lot::RwLock;
10use tracing::debug;
11
12use super::parser::parse_jsonl_line;
13use super::renderer::render_preview;
14use super::types::TranscriptState;
15
16pub type TranscriptRegistry = Arc<RwLock<HashMap<String, TranscriptState>>>;
18
19pub fn new_transcript_registry() -> TranscriptRegistry {
21 Arc::new(RwLock::new(HashMap::new()))
22}
23
24const MAX_PREVIEW_LINES: usize = 80;
26
27const INITIAL_TAIL_LINES: usize = 50_000;
30
31pub struct TranscriptWatcher {
33 registry: TranscriptRegistry,
35}
36
37impl TranscriptWatcher {
38 pub fn new(registry: TranscriptRegistry) -> Self {
40 Self { registry }
41 }
42
43 pub fn start_watching(&self, pane_id: &str, path: &str, session_id: &str) {
48 {
50 let reg = self.registry.read();
51 if reg.contains_key(pane_id) {
52 return;
53 }
54 }
55
56 debug!(pane_id, path, "Starting transcript watch");
57
58 let mut state = TranscriptState::new(
59 path.to_string(),
60 session_id.to_string(),
61 pane_id.to_string(),
62 );
63
64 if let Err(e) = read_tail_lines(path, &mut state) {
66 debug!(path, error = %e, "Failed initial transcript read (file may not exist yet)");
67 }
68
69 state.preview_text = render_preview(&state.recent_records, MAX_PREVIEW_LINES);
71
72 let mut reg = self.registry.write();
73 reg.insert(pane_id.to_string(), state);
74 }
75
76 pub fn stop_watching(&self, pane_id: &str) {
78 let mut reg = self.registry.write();
79 if reg.remove(pane_id).is_some() {
80 debug!(pane_id, "Stopped transcript watch");
81 }
82 }
83
84 pub fn poll_updates(&self) {
88 let pane_ids: Vec<String> = {
89 let reg = self.registry.read();
90 reg.keys().cloned().collect()
91 };
92
93 for pane_id in pane_ids {
94 let mut reg = self.registry.write();
95 if let Some(state) = reg.get_mut(&pane_id) {
96 if let Err(e) = read_new_lines(state) {
97 debug!(
98 pane_id,
99 path = %state.path,
100 error = %e,
101 "Failed to read transcript updates"
102 );
103 }
104 }
105 }
106 }
107
108 pub fn registry(&self) -> &TranscriptRegistry {
110 &self.registry
111 }
112}
113
114fn read_tail_lines(path: &str, state: &mut TranscriptState) -> std::io::Result<()> {
116 let file = std::fs::File::open(path)?;
117 let metadata = file.metadata()?;
118 let file_size = metadata.len();
119
120 let reader = std::io::BufReader::new(&file);
122 let all_lines: Vec<String> = reader.lines().collect::<Result<_, _>>()?;
123
124 let start = all_lines.len().saturating_sub(INITIAL_TAIL_LINES);
125 let mut records = Vec::new();
126 for line in &all_lines[start..] {
127 records.extend(parse_jsonl_line(line));
128 }
129
130 state.push_records(records);
131 state.last_read_pos = file_size;
132
133 Ok(())
134}
135
136fn read_new_lines(state: &mut TranscriptState) -> std::io::Result<()> {
138 let path = Path::new(&state.path);
139 if !path.exists() {
140 return Ok(());
141 }
142
143 let file = std::fs::File::open(path)?;
144 let metadata = file.metadata()?;
145 let current_size = metadata.len();
146
147 if current_size <= state.last_read_pos {
149 if current_size < state.last_read_pos {
151 debug!(
152 path = %state.path,
153 "Transcript file truncated, resetting position"
154 );
155 state.last_read_pos = 0;
156 state.recent_records.clear();
157 } else {
158 return Ok(());
159 }
160 }
161
162 let mut reader = std::io::BufReader::new(file);
163 reader.seek(SeekFrom::Start(state.last_read_pos))?;
164
165 let mut new_records = Vec::new();
166 let mut line = String::new();
167 loop {
168 line.clear();
169 let bytes_read = reader.read_line(&mut line)?;
170 if bytes_read == 0 {
171 break;
172 }
173 new_records.extend(parse_jsonl_line(&line));
174 }
175
176 if !new_records.is_empty() {
177 debug!(
178 path = %state.path,
179 new_records = new_records.len(),
180 "Read new transcript records"
181 );
182 state.push_records(new_records);
183 state.preview_text = render_preview(&state.recent_records, MAX_PREVIEW_LINES);
184 }
185
186 state.last_read_pos = current_size;
187 Ok(())
188}
189
190#[cfg(test)]
191mod tests {
192 use super::*;
193 use std::io::Write;
194
195 #[test]
196 fn test_transcript_watcher_start_stop() {
197 let registry = new_transcript_registry();
198 let watcher = TranscriptWatcher::new(registry.clone());
199
200 watcher.start_watching("5", "/tmp/nonexistent_transcript.jsonl", "sess1");
202
203 {
204 let reg = registry.read();
205 assert!(reg.contains_key("5"));
206 }
207
208 watcher.stop_watching("5");
209
210 {
211 let reg = registry.read();
212 assert!(!reg.contains_key("5"));
213 }
214 }
215
216 #[test]
217 fn test_transcript_watcher_reads_file() {
218 let tmp = tempfile::NamedTempFile::new().unwrap();
219 let path = tmp.path().to_str().unwrap().to_string();
220
221 {
223 let mut file = std::fs::File::create(&path).unwrap();
224 writeln!(file, r#"{{"type":"user","message":{{"content":"Hello"}}}}"#).unwrap();
225 writeln!(file, r#"{{"type":"assistant","message":{{"content":[{{"type":"text","text":"Hi there!"}}]}}}}"#).unwrap();
226 }
227
228 let registry = new_transcript_registry();
229 let watcher = TranscriptWatcher::new(registry.clone());
230 watcher.start_watching("5", &path, "sess1");
231
232 {
233 let reg = registry.read();
234 let state = reg.get("5").unwrap();
235 assert_eq!(state.recent_records.len(), 2);
236 assert!(state.preview_text.contains("▶ User: Hello"));
237 assert!(state.preview_text.contains("◀ Hi there!"));
238 }
239 }
240
241 #[test]
242 fn test_transcript_watcher_incremental_read() {
243 let tmp = tempfile::NamedTempFile::new().unwrap();
244 let path = tmp.path().to_str().unwrap().to_string();
245
246 {
248 let mut file = std::fs::File::create(&path).unwrap();
249 writeln!(file, r#"{{"type":"user","message":{{"content":"First"}}}}"#).unwrap();
250 }
251
252 let registry = new_transcript_registry();
253 let watcher = TranscriptWatcher::new(registry.clone());
254 watcher.start_watching("5", &path, "sess1");
255
256 {
257 let reg = registry.read();
258 assert_eq!(reg.get("5").unwrap().recent_records.len(), 1);
259 }
260
261 {
263 let mut file = std::fs::OpenOptions::new()
264 .append(true)
265 .open(&path)
266 .unwrap();
267 writeln!(
268 file,
269 r#"{{"type":"user","message":{{"content":"Second"}}}}"#
270 )
271 .unwrap();
272 }
273
274 watcher.poll_updates();
276
277 {
278 let reg = registry.read();
279 let state = reg.get("5").unwrap();
280 assert_eq!(state.recent_records.len(), 2);
281 assert!(state.preview_text.contains("Second"));
282 }
283 }
284}