1use crate::{StreamEvent, LlmEvent, SessionEvent, AgentEvent};
7use serde_json::Value;
8
9#[derive(Debug)]
11pub enum EngineStreamEvent {
12 Thinking(String),
14 Text(String),
16 ToolStart { tool_id: String, tool_name: String },
18 ToolDelta { tool_id: String, delta: String },
20 ToolFinalized { tool_id: String, tool_name: String, input: serde_json::Value },
27 ToolResultDelta { tool_id: String, delta: String },
29 ToolResult { tool_id: String, result: String },
31 SubagentStart { id: u64, name: String, task: String },
33 SubagentUpdate { id: u64, status: String },
35 SubagentDone { id: u64, status: String, duration_secs: f64 },
37 SteeringDelivered { message: String },
39 Usage {
41 input_tokens: u64,
42 output_tokens: u64,
43 cache_read: u64,
44 cache_creation: u64,
45 model: Option<String>,
46 },
47 Noop,
49 Done,
51 Error(String),
53}
54
55#[derive(Debug, Clone)]
57pub struct SubagentTracker {
58 pub id: u64,
59 pub name: String,
60 pub status: String,
61 pub start_time: std::time::Instant,
62 pub done: bool,
63 pub duration_secs: Option<f64>,
64}
65
66#[derive(Debug)]
68pub enum StreamCompletion {
69 Continue,
71 AutoSendQueued(String),
73 AutoTriggerEvents,
75 Done,
77 Error(String),
79}
80
81pub fn process_stream_event(
89 event: StreamEvent,
90 messages: &mut Vec<Value>,
91 subagents: &mut Vec<SubagentTracker>,
92 queued_message: &mut Option<String>,
93 pending_events: &mut Vec<String>,
94) -> (EngineStreamEvent, StreamCompletion) {
95 match event {
96 StreamEvent::Llm(LlmEvent::Thinking(text)) => {
97 (EngineStreamEvent::Thinking(text), StreamCompletion::Continue)
98 }
99 StreamEvent::Llm(LlmEvent::Text(text)) => {
100 (EngineStreamEvent::Text(text), StreamCompletion::Continue)
101 }
102 StreamEvent::Llm(LlmEvent::ToolUseStart { tool_name, tool_id }) => {
103 (EngineStreamEvent::ToolStart { tool_id, tool_name }, StreamCompletion::Continue)
104 }
105 StreamEvent::Llm(LlmEvent::ToolUseDelta { tool_id, delta }) => {
106 (EngineStreamEvent::ToolDelta { tool_id, delta }, StreamCompletion::Continue)
107 }
108 StreamEvent::Llm(LlmEvent::ToolUse { tool_name, tool_id, input }) => {
109 (EngineStreamEvent::ToolFinalized { tool_id, tool_name, input }, StreamCompletion::Continue)
110 }
111 StreamEvent::Llm(LlmEvent::ToolResultDelta { delta, tool_id }) => {
112 (EngineStreamEvent::ToolResultDelta { tool_id, delta }, StreamCompletion::Continue)
113 }
114 StreamEvent::Llm(LlmEvent::ToolResult { result, tool_id }) => {
115 (EngineStreamEvent::ToolResult { tool_id, result }, StreamCompletion::Continue)
116 }
117 StreamEvent::Session(SessionEvent::MessageHistory(history)) => {
118 *messages = history;
119 (EngineStreamEvent::Noop, StreamCompletion::Continue)
120 }
121 StreamEvent::Agent(AgentEvent::SubagentStart { subagent_id, agent_name, task_preview }) => {
122 subagents.push(SubagentTracker {
123 id: subagent_id.clone(),
124 name: agent_name.clone(),
125 status: format!("starting: {}", task_preview),
126 start_time: std::time::Instant::now(),
127 done: false,
128 duration_secs: None,
129 });
130 (EngineStreamEvent::SubagentStart { id: subagent_id, name: agent_name, task: task_preview }, StreamCompletion::Continue)
131 }
132 StreamEvent::Agent(AgentEvent::SubagentUpdate { subagent_id, status, .. }) => {
133 if let Some(sa) = subagents.iter_mut().find(|s| s.id == subagent_id) {
134 sa.status = status.clone();
135 }
136 (EngineStreamEvent::SubagentUpdate { id: subagent_id, status }, StreamCompletion::Continue)
137 }
138 StreamEvent::Agent(AgentEvent::SubagentDone { subagent_id, result_preview, duration_secs, .. }) => {
139 let status = if result_preview.starts_with("[TIMED OUT") {
140 "\u{26a0} timed out".to_string()
141 } else if result_preview.starts_with("ERROR") {
142 let preview: String = result_preview.chars().take(40).collect();
143 format!("\u{2718} {}", preview)
144 } else {
145 let preview: String = result_preview.chars().take(40).collect();
146 format!("\u{2714} {}", preview)
147 };
148 if let Some(sa) = subagents.iter_mut().find(|s| s.id == subagent_id) {
149 sa.done = true;
150 sa.duration_secs = Some(duration_secs);
151 sa.status = status.clone();
152 }
153 (EngineStreamEvent::SubagentDone { id: subagent_id, status, duration_secs }, StreamCompletion::Continue)
154 }
155 StreamEvent::Agent(AgentEvent::SteeringDelivered { message }) => {
156 if queued_message.as_ref() == Some(&message) {
157 *queued_message = None;
158 }
159 (EngineStreamEvent::SteeringDelivered { message }, StreamCompletion::Continue)
160 }
161 StreamEvent::Session(SessionEvent::Usage {
162 input_tokens,
163 output_tokens,
164 cache_read_input_tokens,
165 cache_creation_input_tokens,
166 model,
167 }) => {
168 (EngineStreamEvent::Usage {
169 input_tokens,
170 output_tokens,
171 cache_read: cache_read_input_tokens,
172 cache_creation: cache_creation_input_tokens,
173 model,
174 }, StreamCompletion::Continue)
175 }
176 StreamEvent::Session(SessionEvent::Done) => {
177 subagents.clear();
178
179 let had_pending = !pending_events.is_empty();
181 for formatted in pending_events.drain(..) {
182 messages.push(serde_json::json!({
183 "role": "user",
184 "content": formatted
185 }));
186 }
187
188 if let Some(queued) = queued_message.take() {
190 (EngineStreamEvent::Done, StreamCompletion::AutoSendQueued(queued))
191 } else if had_pending {
192 (EngineStreamEvent::Done, StreamCompletion::AutoTriggerEvents)
193 } else {
194 (EngineStreamEvent::Done, StreamCompletion::Done)
195 }
196 }
197 StreamEvent::Session(SessionEvent::Error(err)) => {
198 subagents.clear();
199 if let Some(last) = messages.last() {
201 let role = last["role"].as_str().unwrap_or("");
202 let is_text_user = role == "user" && last["content"].is_string();
203 let is_assistant = role == "assistant";
204 if is_text_user || is_assistant {
205 messages.pop();
206 }
207 }
208 (EngineStreamEvent::Error(err.clone()), StreamCompletion::Error(err))
209 }
210 }
211}