use crate::{StreamEvent, LlmEvent, SessionEvent, AgentEvent};
use serde_json::Value;
#[derive(Debug)]
pub enum EngineStreamEvent {
Thinking(String),
Text(String),
ToolStart { tool_id: String, tool_name: String },
ToolDelta { tool_id: String, delta: String },
ToolFinalized { tool_id: String, tool_name: String, input: serde_json::Value },
ToolResultDelta { tool_id: String, delta: String },
ToolResult { tool_id: String, result: String },
SubagentStart { id: u64, name: String, task: String },
SubagentUpdate { id: u64, status: String },
SubagentDone { id: u64, status: String, duration_secs: f64 },
SteeringDelivered { message: String },
Usage {
input_tokens: u64,
output_tokens: u64,
cache_read: u64,
cache_creation: u64,
cache_creation_5m: Option<u64>,
cache_creation_1h: Option<u64>,
model: Option<String>,
},
Noop,
Notice(String),
Done,
Error(String),
}
#[derive(Debug, Clone)]
pub struct SubagentTracker {
pub id: u64,
pub name: String,
pub status: String,
pub start_time: std::time::Instant,
pub done: bool,
pub duration_secs: Option<f64>,
}
#[derive(Debug)]
pub enum StreamCompletion {
Continue,
AutoSendQueued(String),
AutoTriggerEvents,
Done,
Error(String),
}
pub fn process_stream_event(
event: StreamEvent,
messages: &mut Vec<Value>,
subagents: &mut Vec<SubagentTracker>,
queued_message: &mut Option<String>,
pending_events: &mut Vec<String>,
) -> (EngineStreamEvent, StreamCompletion) {
match event {
StreamEvent::Llm(LlmEvent::Thinking(text)) => {
(EngineStreamEvent::Thinking(text), StreamCompletion::Continue)
}
StreamEvent::Llm(LlmEvent::Text(text)) => {
(EngineStreamEvent::Text(text), StreamCompletion::Continue)
}
StreamEvent::Llm(LlmEvent::ToolUseStart { tool_name, tool_id }) => {
(EngineStreamEvent::ToolStart { tool_id, tool_name }, StreamCompletion::Continue)
}
StreamEvent::Llm(LlmEvent::ToolUseDelta { tool_id, delta }) => {
(EngineStreamEvent::ToolDelta { tool_id, delta }, StreamCompletion::Continue)
}
StreamEvent::Llm(LlmEvent::ToolUse { tool_name, tool_id, input }) => {
(EngineStreamEvent::ToolFinalized { tool_id, tool_name, input }, StreamCompletion::Continue)
}
StreamEvent::Llm(LlmEvent::ToolResultDelta { delta, tool_id }) => {
(EngineStreamEvent::ToolResultDelta { tool_id, delta }, StreamCompletion::Continue)
}
StreamEvent::Llm(LlmEvent::ToolResult { result, tool_id }) => {
(EngineStreamEvent::ToolResult { tool_id, result }, StreamCompletion::Continue)
}
StreamEvent::Session(SessionEvent::MessageHistory(history)) => {
*messages = history;
(EngineStreamEvent::Noop, StreamCompletion::Continue)
}
StreamEvent::Agent(AgentEvent::SubagentStart { subagent_id, agent_name, task_preview }) => {
subagents.push(SubagentTracker {
id: subagent_id,
name: agent_name.clone(),
status: format!("starting: {}", task_preview),
start_time: std::time::Instant::now(),
done: false,
duration_secs: None,
});
(EngineStreamEvent::SubagentStart { id: subagent_id, name: agent_name, task: task_preview }, StreamCompletion::Continue)
}
StreamEvent::Agent(AgentEvent::SubagentUpdate { subagent_id, status, .. }) => {
if let Some(sa) = subagents.iter_mut().find(|s| s.id == subagent_id) {
sa.status = status.clone();
}
(EngineStreamEvent::SubagentUpdate { id: subagent_id, status }, StreamCompletion::Continue)
}
StreamEvent::Agent(AgentEvent::SubagentDone { subagent_id, result_preview, duration_secs, .. }) => {
let status = if result_preview.starts_with("[TIMED OUT") {
"\u{26a0} timed out".to_string()
} else if result_preview.starts_with("ERROR") {
let preview: String = result_preview.chars().take(40).collect();
format!("\u{2718} {}", preview)
} else {
let preview: String = result_preview.chars().take(40).collect();
format!("\u{2714} {}", preview)
};
if let Some(sa) = subagents.iter_mut().find(|s| s.id == subagent_id) {
sa.done = true;
sa.duration_secs = Some(duration_secs);
sa.status = status.clone();
}
(EngineStreamEvent::SubagentDone { id: subagent_id, status, duration_secs }, StreamCompletion::Continue)
}
StreamEvent::Agent(AgentEvent::SteeringDelivered { message }) => {
if queued_message.as_ref() == Some(&message) {
*queued_message = None;
}
(EngineStreamEvent::SteeringDelivered { message }, StreamCompletion::Continue)
}
StreamEvent::Session(SessionEvent::Usage {
input_tokens,
output_tokens,
cache_read_input_tokens,
cache_creation_input_tokens,
cache_creation_5m,
cache_creation_1h,
model,
}) => {
(EngineStreamEvent::Usage {
input_tokens,
output_tokens,
cache_read: cache_read_input_tokens,
cache_creation: cache_creation_input_tokens,
cache_creation_5m,
cache_creation_1h,
model,
}, StreamCompletion::Continue)
}
StreamEvent::Session(SessionEvent::Done) => {
subagents.clear();
let had_pending = !pending_events.is_empty();
for formatted in pending_events.drain(..) {
messages.push(serde_json::json!({
"role": "user",
"content": formatted
}));
}
if let Some(queued) = queued_message.take() {
(EngineStreamEvent::Done, StreamCompletion::AutoSendQueued(queued))
} else if had_pending {
(EngineStreamEvent::Done, StreamCompletion::AutoTriggerEvents)
} else {
(EngineStreamEvent::Done, StreamCompletion::Done)
}
}
StreamEvent::Session(SessionEvent::Notice(text)) => {
(EngineStreamEvent::Notice(text), StreamCompletion::Continue)
}
StreamEvent::Session(SessionEvent::Error(err)) => {
subagents.clear();
if let Some(last) = messages.last() {
let role = last["role"].as_str().unwrap_or("");
let is_text_user = role == "user" && last["content"].is_string();
let is_assistant = role == "assistant";
if is_text_user || is_assistant {
messages.pop();
}
}
(EngineStreamEvent::Error(err.clone()), StreamCompletion::Error(err))
}
}
}