1use crate::{StreamEvent, LlmEvent, SessionEvent, AgentEvent};
7use serde_json::Value;
8
9#[derive(Debug)]
11pub enum EngineStreamEvent {
12 Thinking(String),
14 Text(String),
16 ToolStart { tool_id: String, tool_name: String },
18 ToolDelta { tool_id: String, delta: String },
20 ToolFinalized { tool_id: String, tool_name: String, input: serde_json::Value },
27 ToolResultDelta { tool_id: String, delta: String },
29 ToolResult { tool_id: String, result: String },
31 SubagentStart { id: u64, name: String, task: String },
33 SubagentUpdate { id: u64, status: String },
35 SubagentDone { id: u64, status: String, duration_secs: f64 },
37 SteeringDelivered { message: String },
39 Usage {
41 input_tokens: u64,
42 output_tokens: u64,
43 cache_read: u64,
44 cache_creation: u64,
45 cache_creation_5m: Option<u64>,
47 cache_creation_1h: Option<u64>,
48 model: Option<String>,
49 },
50 Noop,
52 Notice(String),
54 Done,
56 Error(String),
58}
59
60#[derive(Debug, Clone)]
62pub struct SubagentTracker {
63 pub id: u64,
64 pub name: String,
65 pub status: String,
66 pub start_time: std::time::Instant,
67 pub done: bool,
68 pub duration_secs: Option<f64>,
69}
70
71#[derive(Debug)]
73pub enum StreamCompletion {
74 Continue,
76 AutoSendQueued(String),
78 AutoTriggerEvents,
80 Done,
82 Error(String),
84}
85
86pub fn process_stream_event(
94 event: StreamEvent,
95 messages: &mut Vec<Value>,
96 subagents: &mut Vec<SubagentTracker>,
97 queued_message: &mut Option<String>,
98 pending_events: &mut Vec<String>,
99) -> (EngineStreamEvent, StreamCompletion) {
100 match event {
101 StreamEvent::Llm(LlmEvent::Thinking(text)) => {
102 (EngineStreamEvent::Thinking(text), StreamCompletion::Continue)
103 }
104 StreamEvent::Llm(LlmEvent::Text(text)) => {
105 (EngineStreamEvent::Text(text), StreamCompletion::Continue)
106 }
107 StreamEvent::Llm(LlmEvent::ToolUseStart { tool_name, tool_id }) => {
108 (EngineStreamEvent::ToolStart { tool_id, tool_name }, StreamCompletion::Continue)
109 }
110 StreamEvent::Llm(LlmEvent::ToolUseDelta { tool_id, delta }) => {
111 (EngineStreamEvent::ToolDelta { tool_id, delta }, StreamCompletion::Continue)
112 }
113 StreamEvent::Llm(LlmEvent::ToolUse { tool_name, tool_id, input }) => {
114 (EngineStreamEvent::ToolFinalized { tool_id, tool_name, input }, StreamCompletion::Continue)
115 }
116 StreamEvent::Llm(LlmEvent::ToolResultDelta { delta, tool_id }) => {
117 (EngineStreamEvent::ToolResultDelta { tool_id, delta }, StreamCompletion::Continue)
118 }
119 StreamEvent::Llm(LlmEvent::ToolResult { result, tool_id }) => {
120 (EngineStreamEvent::ToolResult { tool_id, result }, StreamCompletion::Continue)
121 }
122 StreamEvent::Session(SessionEvent::MessageHistory(history)) => {
123 *messages = history;
124 (EngineStreamEvent::Noop, StreamCompletion::Continue)
125 }
126 StreamEvent::Agent(AgentEvent::SubagentStart { subagent_id, agent_name, task_preview }) => {
127 subagents.push(SubagentTracker {
128 id: subagent_id,
129 name: agent_name.clone(),
130 status: format!("starting: {}", task_preview),
131 start_time: std::time::Instant::now(),
132 done: false,
133 duration_secs: None,
134 });
135 (EngineStreamEvent::SubagentStart { id: subagent_id, name: agent_name, task: task_preview }, StreamCompletion::Continue)
136 }
137 StreamEvent::Agent(AgentEvent::SubagentUpdate { subagent_id, status, .. }) => {
138 if let Some(sa) = subagents.iter_mut().find(|s| s.id == subagent_id) {
139 sa.status = status.clone();
140 }
141 (EngineStreamEvent::SubagentUpdate { id: subagent_id, status }, StreamCompletion::Continue)
142 }
143 StreamEvent::Agent(AgentEvent::SubagentDone { subagent_id, result_preview, duration_secs, .. }) => {
144 let status = if result_preview.starts_with("[TIMED OUT") {
145 "\u{26a0} timed out".to_string()
146 } else if result_preview.starts_with("ERROR") {
147 let preview: String = result_preview.chars().take(40).collect();
148 format!("\u{2718} {}", preview)
149 } else {
150 let preview: String = result_preview.chars().take(40).collect();
151 format!("\u{2714} {}", preview)
152 };
153 if let Some(sa) = subagents.iter_mut().find(|s| s.id == subagent_id) {
154 sa.done = true;
155 sa.duration_secs = Some(duration_secs);
156 sa.status = status.clone();
157 }
158 (EngineStreamEvent::SubagentDone { id: subagent_id, status, duration_secs }, StreamCompletion::Continue)
159 }
160 StreamEvent::Agent(AgentEvent::SteeringDelivered { message }) => {
161 if queued_message.as_ref() == Some(&message) {
162 *queued_message = None;
163 }
164 (EngineStreamEvent::SteeringDelivered { message }, StreamCompletion::Continue)
165 }
166 StreamEvent::Session(SessionEvent::Usage {
167 input_tokens,
168 output_tokens,
169 cache_read_input_tokens,
170 cache_creation_input_tokens,
171 cache_creation_5m,
172 cache_creation_1h,
173 model,
174 }) => {
175 (EngineStreamEvent::Usage {
176 input_tokens,
177 output_tokens,
178 cache_read: cache_read_input_tokens,
179 cache_creation: cache_creation_input_tokens,
180 cache_creation_5m,
181 cache_creation_1h,
182 model,
183 }, StreamCompletion::Continue)
184 }
185 StreamEvent::Session(SessionEvent::Done) => {
186 subagents.clear();
187
188 let had_pending = !pending_events.is_empty();
190 for formatted in pending_events.drain(..) {
191 messages.push(serde_json::json!({
192 "role": "user",
193 "content": formatted
194 }));
195 }
196
197 if let Some(queued) = queued_message.take() {
199 (EngineStreamEvent::Done, StreamCompletion::AutoSendQueued(queued))
200 } else if had_pending {
201 (EngineStreamEvent::Done, StreamCompletion::AutoTriggerEvents)
202 } else {
203 (EngineStreamEvent::Done, StreamCompletion::Done)
204 }
205 }
206 StreamEvent::Session(SessionEvent::Notice(text)) => {
207 (EngineStreamEvent::Notice(text), StreamCompletion::Continue)
210 }
211 StreamEvent::Session(SessionEvent::Error(err)) => {
212 subagents.clear();
213 if let Some(last) = messages.last() {
215 let role = last["role"].as_str().unwrap_or("");
216 let is_text_user = role == "user" && last["content"].is_string();
217 let is_assistant = role == "assistant";
218 if is_text_user || is_assistant {
219 messages.pop();
220 }
221 }
222 (EngineStreamEvent::Error(err.clone()), StreamCompletion::Error(err))
223 }
224 }
225}