use stynx_code_types::{ContentBlock, StopReason, StreamEvent};
use futures::StreamExt;
use crate::domain::EngineEvent;
struct PendingTool {
id: String,
name: String,
json: String,
}
pub async fn read_stream(
stream: &mut (impl StreamExt<Item = StreamEvent> + Unpin),
on_event: &mut (impl FnMut(EngineEvent) + Send),
) -> (Vec<ContentBlock>, StopReason, u64, Option<String>) {
let mut blocks: Vec<ContentBlock> = Vec::new();
let mut pending_tool: Option<PendingTool> = None;
let mut text_buf = String::new();
let mut thinking_buf = String::new();
let mut stop_reason = StopReason::EndTurn;
let mut stream_error: Option<String> = None;
let mut last_input_tokens: u64 = 0;
let idle_timeout = std::time::Duration::from_secs(180);
loop {
let next = tokio::time::timeout(idle_timeout, stream.next()).await;
let event = match next {
Ok(Some(e)) => e,
Ok(None) => break,
Err(_) => {
stream_error = Some("provider stream idle for 180s — aborting".to_string());
break;
}
};
match event {
StreamEvent::ContentDelta { text } => {
if !thinking_buf.is_empty() {
blocks.push(ContentBlock::Thinking {
thinking: std::mem::take(&mut thinking_buf),
});
}
on_event(EngineEvent::TextDelta(text.clone()));
text_buf.push_str(&text);
}
StreamEvent::ThinkingDelta { text } => {
on_event(EngineEvent::ThinkingDelta(text.clone()));
thinking_buf.push_str(&text);
}
StreamEvent::ToolUseStart { id, name } => {
if !text_buf.is_empty() {
blocks.push(ContentBlock::Text {
text: std::mem::take(&mut text_buf),
});
}
if let Some(pt) = pending_tool.take() {
let input: serde_json::Value = serde_json::from_str(&pt.json)
.unwrap_or(serde_json::Value::Object(Default::default()));
blocks.push(ContentBlock::ToolUse {
id: pt.id,
name: pt.name,
input,
});
}
on_event(EngineEvent::ToolStart {
name: name.clone(),
id: id.clone(),
});
pending_tool = Some(PendingTool {
id,
name,
json: String::new(),
});
}
StreamEvent::ToolUseDelta { json_chunk } => {
on_event(EngineEvent::ToolInput {
json_chunk: json_chunk.clone(),
});
if let Some(ref mut pt) = pending_tool {
pt.json.push_str(&json_chunk);
}
}
StreamEvent::Stop { reason } => {
stop_reason = reason;
}
StreamEvent::Usage { stats } => {
if stats.input_tokens > 0 {
last_input_tokens = stats.input_tokens;
}
on_event(EngineEvent::Usage {
input_tokens: stats.input_tokens,
output_tokens: stats.output_tokens,
});
}
StreamEvent::Error { message } => {
stream_error = Some(message);
break;
}
}
}
if !thinking_buf.is_empty() {
blocks.push(ContentBlock::Thinking {
thinking: thinking_buf,
});
}
if !text_buf.is_empty() {
blocks.push(ContentBlock::Text { text: text_buf });
}
if let Some(pt) = pending_tool.take() {
let input: serde_json::Value = serde_json::from_str(&pt.json)
.unwrap_or(serde_json::Value::Object(Default::default()));
blocks.push(ContentBlock::ToolUse {
id: pt.id,
name: pt.name,
input,
});
}
(blocks, stop_reason, last_input_tokens, stream_error)
}