1use crate::core::events::RuntimeOutputStream;
8use std::collections::BTreeMap;
9
10#[derive(Debug, Clone, PartialEq, Eq)]
11pub enum ProjectedRuntimeObservation {
12 CommandObserved {
13 stream: RuntimeOutputStream,
14 command: String,
15 },
16 ToolCallObserved {
17 stream: RuntimeOutputStream,
18 tool_name: String,
19 details: String,
20 },
21 TodoSnapshotUpdated {
22 stream: RuntimeOutputStream,
23 items: Vec<String>,
24 },
25 NarrativeOutputObserved {
26 stream: RuntimeOutputStream,
27 content: String,
28 },
29}
30
31#[derive(Debug, Default)]
32pub struct RuntimeEventProjector {
33 stdout_buffer: String,
34 stderr_buffer: String,
35 todo_items: BTreeMap<String, bool>,
36}
37
38impl RuntimeEventProjector {
39 #[must_use]
40 pub fn new() -> Self {
41 Self::default()
42 }
43
44 pub fn observe_chunk(
45 &mut self,
46 stream: RuntimeOutputStream,
47 chunk: &str,
48 ) -> Vec<ProjectedRuntimeObservation> {
49 {
50 let buffer = self.buffer_for_stream(stream);
51 buffer.push_str(chunk);
52 }
53
54 let mut lines = Vec::new();
55 {
56 let buffer = self.buffer_for_stream(stream);
57 while let Some(line) = Self::drain_next_line(buffer) {
58 lines.push(line);
59 }
60 }
61
62 let mut projected = Vec::new();
63 for line in lines {
64 projected.extend(self.observe_line(stream, &line));
65 }
66 projected
67 }
68
69 pub fn flush(&mut self) -> Vec<ProjectedRuntimeObservation> {
70 let mut projected = Vec::new();
71
72 if self.stdout_buffer.trim().is_empty() {
73 self.stdout_buffer.clear();
74 } else {
75 let remaining = std::mem::take(&mut self.stdout_buffer);
76 projected.extend(self.observe_line(
77 RuntimeOutputStream::Stdout,
78 remaining.trim_end_matches('\r'),
79 ));
80 }
81
82 if self.stderr_buffer.trim().is_empty() {
83 self.stderr_buffer.clear();
84 } else {
85 let remaining = std::mem::take(&mut self.stderr_buffer);
86 projected.extend(self.observe_line(
87 RuntimeOutputStream::Stderr,
88 remaining.trim_end_matches('\r'),
89 ));
90 }
91
92 projected
93 }
94
95 fn buffer_for_stream(&mut self, stream: RuntimeOutputStream) -> &mut String {
96 match stream {
97 RuntimeOutputStream::Stdout => &mut self.stdout_buffer,
98 RuntimeOutputStream::Stderr => &mut self.stderr_buffer,
99 }
100 }
101
102 fn drain_next_line(buffer: &mut String) -> Option<String> {
103 let bytes = buffer.as_bytes();
104 let mut newline_idx = None;
105 let mut remove_len = 1usize;
106
107 for (idx, b) in bytes.iter().enumerate() {
108 if *b == b'\n' {
109 newline_idx = Some(idx);
110 remove_len = 1;
111 break;
112 }
113 if *b == b'\r' {
114 newline_idx = Some(idx);
115 remove_len = if bytes.get(idx + 1) == Some(&b'\n') {
116 2
117 } else {
118 1
119 };
120 break;
121 }
122 }
123
124 let idx = newline_idx?;
125 let line = buffer[..idx].to_string();
126 buffer.drain(..idx + remove_len);
127 Some(line)
128 }
129
130 fn observe_line(
131 &mut self,
132 stream: RuntimeOutputStream,
133 raw_line: &str,
134 ) -> Vec<ProjectedRuntimeObservation> {
135 let normalized = normalize_projection_line(raw_line);
136 let line = normalized.trim();
137 if line.is_empty() {
138 return Vec::new();
139 }
140
141 let mut projected = Vec::new();
142
143 if let Some(command) = parse_command(line) {
144 projected.push(ProjectedRuntimeObservation::CommandObserved { stream, command });
145 }
146
147 if let Some(tool_name) = parse_tool_name(line) {
148 projected.push(ProjectedRuntimeObservation::ToolCallObserved {
149 stream,
150 tool_name,
151 details: line.to_string(),
152 });
153 }
154
155 if let Some((item, completed)) = parse_todo_item(line) {
156 let changed = self
157 .todo_items
158 .insert(item, completed)
159 .is_none_or(|prev| prev != completed);
160 if changed {
161 let items = self
162 .todo_items
163 .iter()
164 .map(|(todo, done)| {
165 if *done {
166 format!("[x] {todo}")
167 } else {
168 format!("[ ] {todo}")
169 }
170 })
171 .collect();
172 projected.push(ProjectedRuntimeObservation::TodoSnapshotUpdated { stream, items });
173 }
174 }
175
176 if is_narrative_line(line)
177 && projected.is_empty()
178 && matches!(stream, RuntimeOutputStream::Stdout)
179 {
180 projected.push(ProjectedRuntimeObservation::NarrativeOutputObserved {
181 stream,
182 content: line.to_string(),
183 });
184 }
185
186 projected
187 }
188}
189
190fn parse_command(line: &str) -> Option<String> {
191 if let Some(rest) = line.find("Command: ").map(|idx| &line[idx..]) {
192 let cmd = rest.trim_start_matches("Command: ").trim();
193 if !cmd.is_empty() {
194 return Some(cmd.to_string());
195 }
196 }
197 if let Some(rest) = line.find("Running command: ").map(|idx| &line[idx..]) {
198 let cmd = rest.trim_start_matches("Running command: ").trim();
199 if !cmd.is_empty() {
200 return Some(cmd.to_string());
201 }
202 }
203
204 for prefix in [
205 "$ ",
206 "> ",
207 "Command: ",
208 "Running: ",
209 "Running command: ",
210 "Executing: ",
211 "Execute: ",
212 ] {
213 if let Some(rest) = line.strip_prefix(prefix) {
214 let cmd = rest.trim();
215 if !cmd.is_empty() {
216 return Some(cmd.to_string());
217 }
218 }
219 }
220 None
221}
222
223fn parse_tool_name(line: &str) -> Option<String> {
224 if let Some(rest) = line.find("Tool: ").map(|idx| &line[idx..]) {
225 let name = rest
226 .trim_start_matches("Tool: ")
227 .split_whitespace()
228 .next()?;
229 return Some(name.to_string());
230 }
231 if let Some(rest) = line.strip_prefix("Tool: ") {
232 let name = rest.split_whitespace().next()?;
233 return Some(name.to_string());
234 }
235 if let Some(rest) = line.strip_prefix("Using tool ") {
236 let name = rest
237 .split(|c: char| c.is_whitespace() || c == ':' || c == '(')
238 .next()?;
239 if !name.is_empty() {
240 return Some(name.to_string());
241 }
242 }
243 if let Some(rest) = line.strip_prefix("tool=") {
244 let name = rest
245 .split(|c: char| c.is_whitespace() || c == ',' || c == ')')
246 .next()?;
247 if !name.is_empty() {
248 return Some(name.to_string());
249 }
250 }
251 None
252}
253
254fn parse_todo_item(line: &str) -> Option<(String, bool)> {
255 for (prefix, completed) in [
256 ("- [ ] ", false),
257 ("* [ ] ", false),
258 ("- [x] ", true),
259 ("* [x] ", true),
260 ("- [X] ", true),
261 ("* [X] ", true),
262 ] {
263 if let Some(rest) = line.strip_prefix(prefix) {
264 let item = rest.trim();
265 if !item.is_empty() {
266 return Some((item.to_string(), completed));
267 }
268 }
269 }
270 for (prefix, completed) in [
271 ("TODO: ", false),
272 ("TODO ", false),
273 ("DONE: ", true),
274 ("DONE ", true),
275 ] {
276 if let Some(rest) = line.strip_prefix(prefix) {
277 let item = rest.trim();
278 if !item.is_empty() {
279 return Some((item.to_string(), completed));
280 }
281 }
282 }
283 None
284}
285
286fn is_narrative_line(line: &str) -> bool {
287 let lower = line.to_lowercase();
288 lower.starts_with("i ")
289 || lower.starts_with("i'")
290 || lower.starts_with("i\"")
291 || lower.starts_with("next ")
292 || lower.starts_with("plan:")
293 || lower.starts_with("because")
294 || lower.starts_with("thinking:")
295 || lower.starts_with("hello")
296 || lower.starts_with("starting")
297 || lower.starts_with("working")
298 || lower.starts_with("updating")
299 || lower.starts_with("checking")
300 || lower.starts_with("analyzing")
301 || lower.starts_with("investigating")
302}
303
304fn normalize_projection_line(raw: &str) -> String {
305 let no_ansi = strip_ansi_sequences(raw);
306 no_ansi
307 .chars()
308 .filter(|c| !c.is_control() || *c == '\n' || *c == '\r' || *c == '\t')
309 .collect::<String>()
310}
311
312fn strip_ansi_sequences(input: &str) -> String {
313 let mut out = String::with_capacity(input.len());
314 let mut chars = input.chars().peekable();
315
316 while let Some(ch) = chars.next() {
317 if ch == '\u{1b}' {
318 if chars.peek().is_some_and(|c| *c == '[') {
319 let _ = chars.next();
320 for next in chars.by_ref() {
321 if ('@'..='~').contains(&next) {
322 break;
323 }
324 }
325 continue;
326 }
327 continue;
328 }
329 out.push(ch);
330 }
331
332 out
333}
334
335#[cfg(test)]
336mod tests {
337 use super::*;
338
339 #[test]
340 fn projects_command_lines_from_stdout() {
341 let mut projector = RuntimeEventProjector::new();
342 let observed = projector.observe_chunk(RuntimeOutputStream::Stdout, "$ cargo test\n");
343
344 assert_eq!(
345 observed,
346 vec![ProjectedRuntimeObservation::CommandObserved {
347 stream: RuntimeOutputStream::Stdout,
348 command: "cargo test".to_string(),
349 }]
350 );
351 }
352
353 #[test]
354 fn projects_tool_and_todo_updates() {
355 let mut projector = RuntimeEventProjector::new();
356 let observed = projector.observe_chunk(
357 RuntimeOutputStream::Stdout,
358 "Tool: grep\n- [ ] collect logs\n- [x] collect logs\n",
359 );
360
361 assert!(observed.iter().any(|obs| {
362 matches!(
363 obs,
364 ProjectedRuntimeObservation::ToolCallObserved { tool_name, .. } if tool_name == "grep"
365 )
366 }));
367
368 assert!(observed.iter().any(|obs| {
369 matches!(
370 obs,
371 ProjectedRuntimeObservation::TodoSnapshotUpdated { items, .. }
372 if items == &vec!["[x] collect logs".to_string()]
373 )
374 }));
375 }
376
377 #[test]
378 fn handles_split_lines_across_chunks() {
379 let mut projector = RuntimeEventProjector::new();
380 let first = projector.observe_chunk(RuntimeOutputStream::Stdout, "Tool: git");
381 assert!(first.is_empty());
382
383 let second = projector.observe_chunk(RuntimeOutputStream::Stdout, " status\n");
384 assert!(second.iter().any(|obs| {
385 matches!(
386 obs,
387 ProjectedRuntimeObservation::ToolCallObserved { tool_name, .. } if tool_name == "git"
388 )
389 }));
390 }
391
392 #[test]
393 fn flushes_partial_lines_as_observations() {
394 let mut projector = RuntimeEventProjector::new();
395 let _ = projector.observe_chunk(
396 RuntimeOutputStream::Stdout,
397 "I will run verification checks next",
398 );
399
400 let flushed = projector.flush();
401 assert_eq!(
402 flushed,
403 vec![ProjectedRuntimeObservation::NarrativeOutputObserved {
404 stream: RuntimeOutputStream::Stdout,
405 content: "I will run verification checks next".to_string(),
406 }]
407 );
408 }
409
410 #[test]
411 fn projects_deterministic_markers_from_stderr() {
412 let mut projector = RuntimeEventProjector::new();
413 let observed = projector.observe_chunk(
414 RuntimeOutputStream::Stderr,
415 "Command: cargo clippy\nTool: rustc\n",
416 );
417
418 assert!(observed.iter().any(|obs| {
419 matches!(
420 obs,
421 ProjectedRuntimeObservation::CommandObserved { stream, command }
422 if *stream == RuntimeOutputStream::Stderr && command == "cargo clippy"
423 )
424 }));
425
426 assert!(observed.iter().any(|obs| {
427 matches!(
428 obs,
429 ProjectedRuntimeObservation::ToolCallObserved {
430 stream,
431 tool_name,
432 ..
433 } if *stream == RuntimeOutputStream::Stderr && tool_name == "rustc"
434 )
435 }));
436 }
437
438 #[test]
439 fn ignores_noisy_lines_without_markers() {
440 let mut projector = RuntimeEventProjector::new();
441 let observed = projector.observe_chunk(
442 RuntimeOutputStream::Stdout,
443 "[12:30:44] ::::: non-structured runtime noise :::::\n",
444 );
445
446 assert!(observed.is_empty());
447 }
448
449 #[test]
450 fn projects_todo_from_todo_prefixes() {
451 let mut projector = RuntimeEventProjector::new();
452 let observed = projector.observe_chunk(
453 RuntimeOutputStream::Stdout,
454 "TODO: collect logs\nDONE: collect logs\n",
455 );
456
457 assert!(observed.iter().any(|obs| {
458 matches!(
459 obs,
460 ProjectedRuntimeObservation::TodoSnapshotUpdated { items, .. }
461 if items == &vec!["[x] collect logs".to_string()]
462 )
463 }));
464 }
465
466 #[test]
467 fn projects_narrative_from_runtime_status_lines() {
468 let mut projector = RuntimeEventProjector::new();
469 let observed = projector.observe_chunk(RuntimeOutputStream::Stdout, "Hello from runtime\n");
470
471 assert!(observed.iter().any(|obs| {
472 matches!(
473 obs,
474 ProjectedRuntimeObservation::NarrativeOutputObserved { content, .. }
475 if content == "Hello from runtime"
476 )
477 }));
478 }
479
480 #[test]
481 fn projects_markers_with_ansi_prefixes() {
482 let mut projector = RuntimeEventProjector::new();
483 let observed = projector.observe_chunk(
484 RuntimeOutputStream::Stderr,
485 "\u{1b}[0m→ Tool: git status\n\u{1b}[91mCommand: cargo test\u{1b}[0m\n",
486 );
487
488 assert!(observed.iter().any(|obs| {
489 matches!(
490 obs,
491 ProjectedRuntimeObservation::ToolCallObserved { tool_name, .. } if tool_name == "git"
492 )
493 }));
494 assert!(observed.iter().any(|obs| {
495 matches!(
496 obs,
497 ProjectedRuntimeObservation::CommandObserved { command, .. } if command == "cargo test"
498 )
499 }));
500 }
501}