Skip to main content

hivemind/core/
runtime_event_projection.rs

1//! Runtime output to observational event projection.
2//!
3//! This module extracts best-effort, non-authoritative runtime observations
4//! from stdout/stderr chunks. These projections are telemetry only and must
5//! never drive `TaskFlow` control flow.
6
7use crate::core::events::RuntimeOutputStream;
8use std::collections::BTreeMap;
9
10#[derive(Debug, Clone, PartialEq, Eq)]
11pub enum ProjectedRuntimeObservation {
12    CommandObserved {
13        stream: RuntimeOutputStream,
14        command: String,
15    },
16    ToolCallObserved {
17        stream: RuntimeOutputStream,
18        tool_name: String,
19        details: String,
20    },
21    TodoSnapshotUpdated {
22        stream: RuntimeOutputStream,
23        items: Vec<String>,
24    },
25    NarrativeOutputObserved {
26        stream: RuntimeOutputStream,
27        content: String,
28    },
29}
30
31#[derive(Debug, Default)]
32pub struct RuntimeEventProjector {
33    stdout_buffer: String,
34    stderr_buffer: String,
35    todo_items: BTreeMap<String, bool>,
36}
37
38impl RuntimeEventProjector {
39    #[must_use]
40    pub fn new() -> Self {
41        Self::default()
42    }
43
44    pub fn observe_chunk(
45        &mut self,
46        stream: RuntimeOutputStream,
47        chunk: &str,
48    ) -> Vec<ProjectedRuntimeObservation> {
49        {
50            let buffer = self.buffer_for_stream(stream);
51            buffer.push_str(chunk);
52        }
53
54        let mut lines = Vec::new();
55        {
56            let buffer = self.buffer_for_stream(stream);
57            while let Some(line) = Self::drain_next_line(buffer) {
58                lines.push(line);
59            }
60        }
61
62        let mut projected = Vec::new();
63        for line in lines {
64            projected.extend(self.observe_line(stream, &line));
65        }
66        projected
67    }
68
69    pub fn flush(&mut self) -> Vec<ProjectedRuntimeObservation> {
70        let mut projected = Vec::new();
71
72        if self.stdout_buffer.trim().is_empty() {
73            self.stdout_buffer.clear();
74        } else {
75            let remaining = std::mem::take(&mut self.stdout_buffer);
76            projected.extend(self.observe_line(
77                RuntimeOutputStream::Stdout,
78                remaining.trim_end_matches('\r'),
79            ));
80        }
81
82        if self.stderr_buffer.trim().is_empty() {
83            self.stderr_buffer.clear();
84        } else {
85            let remaining = std::mem::take(&mut self.stderr_buffer);
86            projected.extend(self.observe_line(
87                RuntimeOutputStream::Stderr,
88                remaining.trim_end_matches('\r'),
89            ));
90        }
91
92        projected
93    }
94
95    fn buffer_for_stream(&mut self, stream: RuntimeOutputStream) -> &mut String {
96        match stream {
97            RuntimeOutputStream::Stdout => &mut self.stdout_buffer,
98            RuntimeOutputStream::Stderr => &mut self.stderr_buffer,
99        }
100    }
101
102    fn drain_next_line(buffer: &mut String) -> Option<String> {
103        let bytes = buffer.as_bytes();
104        let mut newline_idx = None;
105        let mut remove_len = 1usize;
106
107        for (idx, b) in bytes.iter().enumerate() {
108            if *b == b'\n' {
109                newline_idx = Some(idx);
110                remove_len = 1;
111                break;
112            }
113            if *b == b'\r' {
114                newline_idx = Some(idx);
115                remove_len = if bytes.get(idx + 1) == Some(&b'\n') {
116                    2
117                } else {
118                    1
119                };
120                break;
121            }
122        }
123
124        let idx = newline_idx?;
125        let line = buffer[..idx].to_string();
126        buffer.drain(..idx + remove_len);
127        Some(line)
128    }
129
130    fn observe_line(
131        &mut self,
132        stream: RuntimeOutputStream,
133        raw_line: &str,
134    ) -> Vec<ProjectedRuntimeObservation> {
135        let normalized = normalize_projection_line(raw_line);
136        let line = normalized.trim();
137        if line.is_empty() {
138            return Vec::new();
139        }
140
141        let mut projected = Vec::new();
142
143        if let Some(command) = parse_command(line) {
144            projected.push(ProjectedRuntimeObservation::CommandObserved { stream, command });
145        }
146
147        if let Some(tool_name) = parse_tool_name(line) {
148            projected.push(ProjectedRuntimeObservation::ToolCallObserved {
149                stream,
150                tool_name,
151                details: line.to_string(),
152            });
153        }
154
155        if let Some((item, completed)) = parse_todo_item(line) {
156            let changed = self
157                .todo_items
158                .insert(item, completed)
159                .is_none_or(|prev| prev != completed);
160            if changed {
161                let items = self
162                    .todo_items
163                    .iter()
164                    .map(|(todo, done)| {
165                        if *done {
166                            format!("[x] {todo}")
167                        } else {
168                            format!("[ ] {todo}")
169                        }
170                    })
171                    .collect();
172                projected.push(ProjectedRuntimeObservation::TodoSnapshotUpdated { stream, items });
173            }
174        }
175
176        if is_narrative_line(line)
177            && projected.is_empty()
178            && matches!(stream, RuntimeOutputStream::Stdout)
179        {
180            projected.push(ProjectedRuntimeObservation::NarrativeOutputObserved {
181                stream,
182                content: line.to_string(),
183            });
184        }
185
186        projected
187    }
188}
189
190fn parse_command(line: &str) -> Option<String> {
191    if let Some(rest) = line.find("Command: ").map(|idx| &line[idx..]) {
192        let cmd = rest.trim_start_matches("Command: ").trim();
193        if !cmd.is_empty() {
194            return Some(cmd.to_string());
195        }
196    }
197    if let Some(rest) = line.find("Running command: ").map(|idx| &line[idx..]) {
198        let cmd = rest.trim_start_matches("Running command: ").trim();
199        if !cmd.is_empty() {
200            return Some(cmd.to_string());
201        }
202    }
203
204    for prefix in [
205        "$ ",
206        "> ",
207        "Command: ",
208        "Running: ",
209        "Running command: ",
210        "Executing: ",
211        "Execute: ",
212    ] {
213        if let Some(rest) = line.strip_prefix(prefix) {
214            let cmd = rest.trim();
215            if !cmd.is_empty() {
216                return Some(cmd.to_string());
217            }
218        }
219    }
220    None
221}
222
223fn parse_tool_name(line: &str) -> Option<String> {
224    if let Some(rest) = line.find("Tool: ").map(|idx| &line[idx..]) {
225        let name = rest
226            .trim_start_matches("Tool: ")
227            .split_whitespace()
228            .next()?;
229        return Some(name.to_string());
230    }
231    if let Some(rest) = line.strip_prefix("Tool: ") {
232        let name = rest.split_whitespace().next()?;
233        return Some(name.to_string());
234    }
235    if let Some(rest) = line.strip_prefix("Using tool ") {
236        let name = rest
237            .split(|c: char| c.is_whitespace() || c == ':' || c == '(')
238            .next()?;
239        if !name.is_empty() {
240            return Some(name.to_string());
241        }
242    }
243    if let Some(rest) = line.strip_prefix("tool=") {
244        let name = rest
245            .split(|c: char| c.is_whitespace() || c == ',' || c == ')')
246            .next()?;
247        if !name.is_empty() {
248            return Some(name.to_string());
249        }
250    }
251    None
252}
253
254fn parse_todo_item(line: &str) -> Option<(String, bool)> {
255    for (prefix, completed) in [
256        ("- [ ] ", false),
257        ("* [ ] ", false),
258        ("- [x] ", true),
259        ("* [x] ", true),
260        ("- [X] ", true),
261        ("* [X] ", true),
262    ] {
263        if let Some(rest) = line.strip_prefix(prefix) {
264            let item = rest.trim();
265            if !item.is_empty() {
266                return Some((item.to_string(), completed));
267            }
268        }
269    }
270    for (prefix, completed) in [
271        ("TODO: ", false),
272        ("TODO ", false),
273        ("DONE: ", true),
274        ("DONE ", true),
275    ] {
276        if let Some(rest) = line.strip_prefix(prefix) {
277            let item = rest.trim();
278            if !item.is_empty() {
279                return Some((item.to_string(), completed));
280            }
281        }
282    }
283    None
284}
285
286fn is_narrative_line(line: &str) -> bool {
287    let lower = line.to_lowercase();
288    lower.starts_with("i ")
289        || lower.starts_with("i'")
290        || lower.starts_with("i\"")
291        || lower.starts_with("next ")
292        || lower.starts_with("plan:")
293        || lower.starts_with("because")
294        || lower.starts_with("thinking:")
295        || lower.starts_with("hello")
296        || lower.starts_with("starting")
297        || lower.starts_with("working")
298        || lower.starts_with("updating")
299        || lower.starts_with("checking")
300        || lower.starts_with("analyzing")
301        || lower.starts_with("investigating")
302}
303
304fn normalize_projection_line(raw: &str) -> String {
305    let no_ansi = strip_ansi_sequences(raw);
306    no_ansi
307        .chars()
308        .filter(|c| !c.is_control() || *c == '\n' || *c == '\r' || *c == '\t')
309        .collect::<String>()
310}
311
312fn strip_ansi_sequences(input: &str) -> String {
313    let mut out = String::with_capacity(input.len());
314    let mut chars = input.chars().peekable();
315
316    while let Some(ch) = chars.next() {
317        if ch == '\u{1b}' {
318            if chars.peek().is_some_and(|c| *c == '[') {
319                let _ = chars.next();
320                for next in chars.by_ref() {
321                    if ('@'..='~').contains(&next) {
322                        break;
323                    }
324                }
325                continue;
326            }
327            continue;
328        }
329        out.push(ch);
330    }
331
332    out
333}
334
335#[cfg(test)]
336mod tests {
337    use super::*;
338
339    #[test]
340    fn projects_command_lines_from_stdout() {
341        let mut projector = RuntimeEventProjector::new();
342        let observed = projector.observe_chunk(RuntimeOutputStream::Stdout, "$ cargo test\n");
343
344        assert_eq!(
345            observed,
346            vec![ProjectedRuntimeObservation::CommandObserved {
347                stream: RuntimeOutputStream::Stdout,
348                command: "cargo test".to_string(),
349            }]
350        );
351    }
352
353    #[test]
354    fn projects_tool_and_todo_updates() {
355        let mut projector = RuntimeEventProjector::new();
356        let observed = projector.observe_chunk(
357            RuntimeOutputStream::Stdout,
358            "Tool: grep\n- [ ] collect logs\n- [x] collect logs\n",
359        );
360
361        assert!(observed.iter().any(|obs| {
362            matches!(
363                obs,
364                ProjectedRuntimeObservation::ToolCallObserved { tool_name, .. } if tool_name == "grep"
365            )
366        }));
367
368        assert!(observed.iter().any(|obs| {
369            matches!(
370                obs,
371                ProjectedRuntimeObservation::TodoSnapshotUpdated { items, .. }
372                    if items == &vec!["[x] collect logs".to_string()]
373            )
374        }));
375    }
376
377    #[test]
378    fn handles_split_lines_across_chunks() {
379        let mut projector = RuntimeEventProjector::new();
380        let first = projector.observe_chunk(RuntimeOutputStream::Stdout, "Tool: git");
381        assert!(first.is_empty());
382
383        let second = projector.observe_chunk(RuntimeOutputStream::Stdout, " status\n");
384        assert!(second.iter().any(|obs| {
385            matches!(
386                obs,
387                ProjectedRuntimeObservation::ToolCallObserved { tool_name, .. } if tool_name == "git"
388            )
389        }));
390    }
391
392    #[test]
393    fn flushes_partial_lines_as_observations() {
394        let mut projector = RuntimeEventProjector::new();
395        let _ = projector.observe_chunk(
396            RuntimeOutputStream::Stdout,
397            "I will run verification checks next",
398        );
399
400        let flushed = projector.flush();
401        assert_eq!(
402            flushed,
403            vec![ProjectedRuntimeObservation::NarrativeOutputObserved {
404                stream: RuntimeOutputStream::Stdout,
405                content: "I will run verification checks next".to_string(),
406            }]
407        );
408    }
409
410    #[test]
411    fn projects_deterministic_markers_from_stderr() {
412        let mut projector = RuntimeEventProjector::new();
413        let observed = projector.observe_chunk(
414            RuntimeOutputStream::Stderr,
415            "Command: cargo clippy\nTool: rustc\n",
416        );
417
418        assert!(observed.iter().any(|obs| {
419            matches!(
420                obs,
421                ProjectedRuntimeObservation::CommandObserved { stream, command }
422                    if *stream == RuntimeOutputStream::Stderr && command == "cargo clippy"
423            )
424        }));
425
426        assert!(observed.iter().any(|obs| {
427            matches!(
428                obs,
429                ProjectedRuntimeObservation::ToolCallObserved {
430                    stream,
431                    tool_name,
432                    ..
433                } if *stream == RuntimeOutputStream::Stderr && tool_name == "rustc"
434            )
435        }));
436    }
437
438    #[test]
439    fn ignores_noisy_lines_without_markers() {
440        let mut projector = RuntimeEventProjector::new();
441        let observed = projector.observe_chunk(
442            RuntimeOutputStream::Stdout,
443            "[12:30:44] ::::: non-structured runtime noise :::::\n",
444        );
445
446        assert!(observed.is_empty());
447    }
448
449    #[test]
450    fn projects_todo_from_todo_prefixes() {
451        let mut projector = RuntimeEventProjector::new();
452        let observed = projector.observe_chunk(
453            RuntimeOutputStream::Stdout,
454            "TODO: collect logs\nDONE: collect logs\n",
455        );
456
457        assert!(observed.iter().any(|obs| {
458            matches!(
459                obs,
460                ProjectedRuntimeObservation::TodoSnapshotUpdated { items, .. }
461                    if items == &vec!["[x] collect logs".to_string()]
462            )
463        }));
464    }
465
466    #[test]
467    fn projects_narrative_from_runtime_status_lines() {
468        let mut projector = RuntimeEventProjector::new();
469        let observed = projector.observe_chunk(RuntimeOutputStream::Stdout, "Hello from runtime\n");
470
471        assert!(observed.iter().any(|obs| {
472            matches!(
473                obs,
474                ProjectedRuntimeObservation::NarrativeOutputObserved { content, .. }
475                    if content == "Hello from runtime"
476            )
477        }));
478    }
479
480    #[test]
481    fn projects_markers_with_ansi_prefixes() {
482        let mut projector = RuntimeEventProjector::new();
483        let observed = projector.observe_chunk(
484            RuntimeOutputStream::Stderr,
485            "\u{1b}[0m→ Tool: git status\n\u{1b}[91mCommand: cargo test\u{1b}[0m\n",
486        );
487
488        assert!(observed.iter().any(|obs| {
489            matches!(
490                obs,
491                ProjectedRuntimeObservation::ToolCallObserved { tool_name, .. } if tool_name == "git"
492            )
493        }));
494        assert!(observed.iter().any(|obs| {
495            matches!(
496                obs,
497                ProjectedRuntimeObservation::CommandObserved { command, .. } if command == "cargo test"
498            )
499        }));
500    }
501}