1use crate::{StreamEvent, LlmEvent, SessionEvent, AgentEvent};
7use serde_json::Value;
8
9#[derive(Debug)]
11pub enum EngineStreamEvent {
12 Thinking(String),
14 Text(String),
16 ToolStart { tool_id: String, tool_name: String },
18 ToolDelta { tool_id: String, delta: String },
20 ToolFinalized { tool_id: String, tool_name: String, input: serde_json::Value },
27 ToolResultDelta { tool_id: String, delta: String },
29 ToolResult { tool_id: String, result: String },
31 SubagentStart { id: u64, name: String, task: String },
33 SubagentUpdate { id: u64, status: String },
35 SubagentDone { id: u64, status: String, duration_secs: f64 },
37 SteeringDelivered { message: String },
39 Usage {
41 input_tokens: u64,
42 output_tokens: u64,
43 cache_read: u64,
44 cache_creation: u64,
45 model: Option<String>,
46 },
47 Noop,
49 Notice(String),
51 Done,
53 Error(String),
55}
56
57#[derive(Debug, Clone)]
59pub struct SubagentTracker {
60 pub id: u64,
61 pub name: String,
62 pub status: String,
63 pub start_time: std::time::Instant,
64 pub done: bool,
65 pub duration_secs: Option<f64>,
66}
67
68#[derive(Debug)]
70pub enum StreamCompletion {
71 Continue,
73 AutoSendQueued(String),
75 AutoTriggerEvents,
77 Done,
79 Error(String),
81}
82
83pub fn process_stream_event(
91 event: StreamEvent,
92 messages: &mut Vec<Value>,
93 subagents: &mut Vec<SubagentTracker>,
94 queued_message: &mut Option<String>,
95 pending_events: &mut Vec<String>,
96) -> (EngineStreamEvent, StreamCompletion) {
97 match event {
98 StreamEvent::Llm(LlmEvent::Thinking(text)) => {
99 (EngineStreamEvent::Thinking(text), StreamCompletion::Continue)
100 }
101 StreamEvent::Llm(LlmEvent::Text(text)) => {
102 (EngineStreamEvent::Text(text), StreamCompletion::Continue)
103 }
104 StreamEvent::Llm(LlmEvent::ToolUseStart { tool_name, tool_id }) => {
105 (EngineStreamEvent::ToolStart { tool_id, tool_name }, StreamCompletion::Continue)
106 }
107 StreamEvent::Llm(LlmEvent::ToolUseDelta { tool_id, delta }) => {
108 (EngineStreamEvent::ToolDelta { tool_id, delta }, StreamCompletion::Continue)
109 }
110 StreamEvent::Llm(LlmEvent::ToolUse { tool_name, tool_id, input }) => {
111 (EngineStreamEvent::ToolFinalized { tool_id, tool_name, input }, StreamCompletion::Continue)
112 }
113 StreamEvent::Llm(LlmEvent::ToolResultDelta { delta, tool_id }) => {
114 (EngineStreamEvent::ToolResultDelta { tool_id, delta }, StreamCompletion::Continue)
115 }
116 StreamEvent::Llm(LlmEvent::ToolResult { result, tool_id }) => {
117 (EngineStreamEvent::ToolResult { tool_id, result }, StreamCompletion::Continue)
118 }
119 StreamEvent::Session(SessionEvent::MessageHistory(history)) => {
120 *messages = history;
121 (EngineStreamEvent::Noop, StreamCompletion::Continue)
122 }
123 StreamEvent::Agent(AgentEvent::SubagentStart { subagent_id, agent_name, task_preview }) => {
124 subagents.push(SubagentTracker {
125 id: subagent_id,
126 name: agent_name.clone(),
127 status: format!("starting: {}", task_preview),
128 start_time: std::time::Instant::now(),
129 done: false,
130 duration_secs: None,
131 });
132 (EngineStreamEvent::SubagentStart { id: subagent_id, name: agent_name, task: task_preview }, StreamCompletion::Continue)
133 }
134 StreamEvent::Agent(AgentEvent::SubagentUpdate { subagent_id, status, .. }) => {
135 if let Some(sa) = subagents.iter_mut().find(|s| s.id == subagent_id) {
136 sa.status = status.clone();
137 }
138 (EngineStreamEvent::SubagentUpdate { id: subagent_id, status }, StreamCompletion::Continue)
139 }
140 StreamEvent::Agent(AgentEvent::SubagentDone { subagent_id, result_preview, duration_secs, .. }) => {
141 let status = if result_preview.starts_with("[TIMED OUT") {
142 "\u{26a0} timed out".to_string()
143 } else if result_preview.starts_with("ERROR") {
144 let preview: String = result_preview.chars().take(40).collect();
145 format!("\u{2718} {}", preview)
146 } else {
147 let preview: String = result_preview.chars().take(40).collect();
148 format!("\u{2714} {}", preview)
149 };
150 if let Some(sa) = subagents.iter_mut().find(|s| s.id == subagent_id) {
151 sa.done = true;
152 sa.duration_secs = Some(duration_secs);
153 sa.status = status.clone();
154 }
155 (EngineStreamEvent::SubagentDone { id: subagent_id, status, duration_secs }, StreamCompletion::Continue)
156 }
157 StreamEvent::Agent(AgentEvent::SteeringDelivered { message }) => {
158 if queued_message.as_ref() == Some(&message) {
159 *queued_message = None;
160 }
161 (EngineStreamEvent::SteeringDelivered { message }, StreamCompletion::Continue)
162 }
163 StreamEvent::Session(SessionEvent::Usage {
164 input_tokens,
165 output_tokens,
166 cache_read_input_tokens,
167 cache_creation_input_tokens,
168 model,
169 }) => {
170 (EngineStreamEvent::Usage {
171 input_tokens,
172 output_tokens,
173 cache_read: cache_read_input_tokens,
174 cache_creation: cache_creation_input_tokens,
175 model,
176 }, StreamCompletion::Continue)
177 }
178 StreamEvent::Session(SessionEvent::Done) => {
179 subagents.clear();
180
181 let had_pending = !pending_events.is_empty();
183 for formatted in pending_events.drain(..) {
184 messages.push(serde_json::json!({
185 "role": "user",
186 "content": formatted
187 }));
188 }
189
190 if let Some(queued) = queued_message.take() {
192 (EngineStreamEvent::Done, StreamCompletion::AutoSendQueued(queued))
193 } else if had_pending {
194 (EngineStreamEvent::Done, StreamCompletion::AutoTriggerEvents)
195 } else {
196 (EngineStreamEvent::Done, StreamCompletion::Done)
197 }
198 }
199 StreamEvent::Session(SessionEvent::Notice(text)) => {
200 (EngineStreamEvent::Notice(text), StreamCompletion::Continue)
203 }
204 StreamEvent::Session(SessionEvent::Error(err)) => {
205 subagents.clear();
206 if let Some(last) = messages.last() {
208 let role = last["role"].as_str().unwrap_or("");
209 let is_text_user = role == "user" && last["content"].is_string();
210 let is_assistant = role == "assistant";
211 if is_text_user || is_assistant {
212 messages.pop();
213 }
214 }
215 (EngineStreamEvent::Error(err.clone()), StreamCompletion::Error(err))
216 }
217 }
218}