Skip to main content

agent_engine/engine/
stream.rs

1//! Engine-level stream processing — TUI-agnostic event handling.
2//!
3//! Processes StreamEvent variants, tracks subagent state and usage,
4//! and returns renderer-agnostic actions.
5
6use crate::{StreamEvent, LlmEvent, SessionEvent, AgentEvent};
7use serde_json::Value;
8
9/// What happened during a stream event — renderer decides how to display.
10#[derive(Debug)]
11pub enum EngineStreamEvent {
12    /// Thinking text arrived.
13    Thinking(String),
14    /// Response text arrived.
15    Text(String),
16    /// Tool use started.
17    ToolStart { tool_id: String, tool_name: String },
18    /// Tool use input delta.
19    ToolDelta { tool_id: String, delta: String },
20    /// Tool use finalized.
21    ///
22    /// `input` is the parsed JSON value, not a stringified version. Renderers
23    /// that need a string preview (chat.rs, server's HistoryEntry::ToolUse)
24    /// can call `serde_json::to_string` themselves; the wire-format ToolUse
25    /// in server mode passes the Value through directly.
26    ToolFinalized { tool_id: String, tool_name: String, input: serde_json::Value },
27    /// Tool result delta.
28    ToolResultDelta { tool_id: String, delta: String },
29    /// Tool result complete.
30    ToolResult { tool_id: String, result: String },
31    /// Subagent dispatched.
32    SubagentStart { id: u64, name: String, task: String },
33    /// Subagent status update.
34    SubagentUpdate { id: u64, status: String },
35    /// Subagent finished.
36    SubagentDone { id: u64, status: String, duration_secs: f64 },
37    /// Steering message was delivered.
38    SteeringDelivered { message: String },
39    /// Usage stats for this turn.
40    Usage {
41        input_tokens: u64,
42        output_tokens: u64,
43        cache_read: u64,
44        cache_creation: u64,
45        /// Cache-write TTL split — `None` when the API omitted the breakdown.
46        cache_creation_5m: Option<u64>,
47        cache_creation_1h: Option<u64>,
48        model: Option<String>,
49    },
50    /// Internal bookkeeping — no visual output.
51    Noop,
52    /// Display-only status notice (e.g. API retry). Not part of the transcript.
53    Notice(String),
54    /// Stream completed.
55    Done,
56    /// Stream errored.
57    Error(String),
58}
59
60/// Subagent tracking state.
61#[derive(Debug, Clone)]
62pub struct SubagentTracker {
63    pub id: u64,
64    pub name: String,
65    pub status: String,
66    pub start_time: std::time::Instant,
67    pub done: bool,
68    pub duration_secs: Option<f64>,
69}
70
71/// What the caller should do after stream completion.
72#[derive(Debug)]
73pub enum StreamCompletion {
74    /// Stream is still going.
75    Continue,
76    /// Stream done — auto-send this queued message.
77    AutoSendQueued(String),
78    /// Stream done — pending events need a new model turn.
79    AutoTriggerEvents,
80    /// Stream done — nothing special.
81    Done,
82    /// Stream errored.
83    Error(String),
84}
85
86/// Convert a raw StreamEvent into an EngineStreamEvent.
87/// Also handles message history capture and returns completion signals.
88///
89/// `messages` — the conversation history (updated in place on MessageHistory)
90/// `subagents` — tracked subagent states (updated in place)
91/// `queued_message` — message queued during streaming (taken if stream completes)
92/// `pending_events` — events buffered during streaming (drained on completion)
93pub fn process_stream_event(
94    event: StreamEvent,
95    messages: &mut Vec<Value>,
96    subagents: &mut Vec<SubagentTracker>,
97    queued_message: &mut Option<String>,
98    pending_events: &mut Vec<String>,
99) -> (EngineStreamEvent, StreamCompletion) {
100    match event {
101        StreamEvent::Llm(LlmEvent::Thinking(text)) => {
102            (EngineStreamEvent::Thinking(text), StreamCompletion::Continue)
103        }
104        StreamEvent::Llm(LlmEvent::Text(text)) => {
105            (EngineStreamEvent::Text(text), StreamCompletion::Continue)
106        }
107        StreamEvent::Llm(LlmEvent::ToolUseStart { tool_name, tool_id }) => {
108            (EngineStreamEvent::ToolStart { tool_id, tool_name }, StreamCompletion::Continue)
109        }
110        StreamEvent::Llm(LlmEvent::ToolUseDelta { tool_id, delta }) => {
111            (EngineStreamEvent::ToolDelta { tool_id, delta }, StreamCompletion::Continue)
112        }
113        StreamEvent::Llm(LlmEvent::ToolUse { tool_name, tool_id, input }) => {
114            (EngineStreamEvent::ToolFinalized { tool_id, tool_name, input }, StreamCompletion::Continue)
115        }
116        StreamEvent::Llm(LlmEvent::ToolResultDelta { delta, tool_id }) => {
117            (EngineStreamEvent::ToolResultDelta { tool_id, delta }, StreamCompletion::Continue)
118        }
119        StreamEvent::Llm(LlmEvent::ToolResult { result, tool_id }) => {
120            (EngineStreamEvent::ToolResult { tool_id, result }, StreamCompletion::Continue)
121        }
122        StreamEvent::Session(SessionEvent::MessageHistory(history)) => {
123            *messages = history;
124            (EngineStreamEvent::Noop, StreamCompletion::Continue)
125        }
126        StreamEvent::Agent(AgentEvent::SubagentStart { subagent_id, agent_name, task_preview }) => {
127            subagents.push(SubagentTracker {
128                id: subagent_id,
129                name: agent_name.clone(),
130                status: format!("starting: {}", task_preview),
131                start_time: std::time::Instant::now(),
132                done: false,
133                duration_secs: None,
134            });
135            (EngineStreamEvent::SubagentStart { id: subagent_id, name: agent_name, task: task_preview }, StreamCompletion::Continue)
136        }
137        StreamEvent::Agent(AgentEvent::SubagentUpdate { subagent_id, status, .. }) => {
138            if let Some(sa) = subagents.iter_mut().find(|s| s.id == subagent_id) {
139                sa.status = status.clone();
140            }
141            (EngineStreamEvent::SubagentUpdate { id: subagent_id, status }, StreamCompletion::Continue)
142        }
143        StreamEvent::Agent(AgentEvent::SubagentDone { subagent_id, result_preview, duration_secs, .. }) => {
144            let status = if result_preview.starts_with("[TIMED OUT") {
145                "\u{26a0} timed out".to_string()
146            } else if result_preview.starts_with("ERROR") {
147                let preview: String = result_preview.chars().take(40).collect();
148                format!("\u{2718} {}", preview)
149            } else {
150                let preview: String = result_preview.chars().take(40).collect();
151                format!("\u{2714} {}", preview)
152            };
153            if let Some(sa) = subagents.iter_mut().find(|s| s.id == subagent_id) {
154                sa.done = true;
155                sa.duration_secs = Some(duration_secs);
156                sa.status = status.clone();
157            }
158            (EngineStreamEvent::SubagentDone { id: subagent_id, status, duration_secs }, StreamCompletion::Continue)
159        }
160        StreamEvent::Agent(AgentEvent::SteeringDelivered { message }) => {
161            if queued_message.as_ref() == Some(&message) {
162                *queued_message = None;
163            }
164            (EngineStreamEvent::SteeringDelivered { message }, StreamCompletion::Continue)
165        }
166        StreamEvent::Session(SessionEvent::Usage {
167            input_tokens,
168            output_tokens,
169            cache_read_input_tokens,
170            cache_creation_input_tokens,
171            cache_creation_5m,
172            cache_creation_1h,
173            model,
174        }) => {
175            (EngineStreamEvent::Usage {
176                input_tokens,
177                output_tokens,
178                cache_read: cache_read_input_tokens,
179                cache_creation: cache_creation_input_tokens,
180                cache_creation_5m,
181                cache_creation_1h,
182                model,
183            }, StreamCompletion::Continue)
184        }
185        StreamEvent::Session(SessionEvent::Done) => {
186            subagents.clear();
187
188            // Drain pending events into messages
189            let had_pending = !pending_events.is_empty();
190            for formatted in pending_events.drain(..) {
191                messages.push(serde_json::json!({
192                    "role": "user",
193                    "content": formatted
194                }));
195            }
196
197            // Check for queued message
198            if let Some(queued) = queued_message.take() {
199                (EngineStreamEvent::Done, StreamCompletion::AutoSendQueued(queued))
200            } else if had_pending {
201                (EngineStreamEvent::Done, StreamCompletion::AutoTriggerEvents)
202            } else {
203                (EngineStreamEvent::Done, StreamCompletion::Done)
204            }
205        }
206        StreamEvent::Session(SessionEvent::Notice(text)) => {
207            // Display-only status (e.g. retry notice) — surface as a system
208            // line, never recorded into message history.
209            (EngineStreamEvent::Notice(text), StreamCompletion::Continue)
210        }
211        StreamEvent::Session(SessionEvent::Error(err)) => {
212            subagents.clear();
213            // Drop trailing unmatched messages
214            if let Some(last) = messages.last() {
215                let role = last["role"].as_str().unwrap_or("");
216                let is_text_user = role == "user" && last["content"].is_string();
217                let is_assistant = role == "assistant";
218                if is_text_user || is_assistant {
219                    messages.pop();
220                }
221            }
222            (EngineStreamEvent::Error(err.clone()), StreamCompletion::Error(err))
223        }
224    }
225}