use crate::llm::{ContentBlock, StopReason, Usage};
use futures::Stream;
use std::collections::HashMap;
use std::pin::Pin;
#[derive(Debug, Clone)]
pub enum StreamDelta {
TextDelta {
delta: String,
block_index: usize,
},
ThinkingDelta {
delta: String,
block_index: usize,
},
ToolUseStart {
id: String,
name: String,
block_index: usize,
thought_signature: Option<String>,
},
ToolInputDelta {
id: String,
delta: String,
block_index: usize,
},
Usage(Usage),
Done {
stop_reason: Option<StopReason>,
},
SignatureDelta {
delta: String,
block_index: usize,
},
RedactedThinking {
data: String,
block_index: usize,
},
Error {
message: String,
recoverable: bool,
},
}
pub type StreamBox<'a> = Pin<Box<dyn Stream<Item = anyhow::Result<StreamDelta>> + Send + 'a>>;
#[derive(Debug, Default)]
pub struct StreamAccumulator {
text_blocks: Vec<String>,
thinking_blocks: Vec<String>,
thinking_signatures: HashMap<usize, String>,
redacted_thinking_blocks: Vec<(usize, String)>,
tool_uses: Vec<ToolUseAccumulator>,
usage: Option<Usage>,
stop_reason: Option<StopReason>,
}
#[derive(Debug, Default)]
pub struct ToolUseAccumulator {
pub id: String,
pub name: String,
pub input_json: String,
pub block_index: usize,
pub thought_signature: Option<String>,
}
impl StreamAccumulator {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn apply(&mut self, delta: &StreamDelta) {
match delta {
StreamDelta::TextDelta { delta, block_index } => {
while self.text_blocks.len() <= *block_index {
self.text_blocks.push(String::new());
}
self.text_blocks[*block_index].push_str(delta);
}
StreamDelta::ThinkingDelta { delta, block_index } => {
while self.thinking_blocks.len() <= *block_index {
self.thinking_blocks.push(String::new());
}
self.thinking_blocks[*block_index].push_str(delta);
}
StreamDelta::ToolUseStart {
id,
name,
block_index,
thought_signature,
} => {
self.tool_uses.push(ToolUseAccumulator {
id: id.clone(),
name: name.clone(),
input_json: String::new(),
block_index: *block_index,
thought_signature: thought_signature.clone(),
});
}
StreamDelta::ToolInputDelta { id, delta, .. } => {
if let Some(tool) = self.tool_uses.iter_mut().find(|t| t.id == *id) {
tool.input_json.push_str(delta);
}
}
StreamDelta::SignatureDelta { delta, block_index } => {
self.thinking_signatures
.entry(*block_index)
.or_default()
.push_str(delta);
}
StreamDelta::RedactedThinking { data, block_index } => {
self.redacted_thinking_blocks
.push((*block_index, data.clone()));
}
StreamDelta::Usage(u) => {
self.usage = Some(u.clone());
}
StreamDelta::Done { stop_reason } => {
self.stop_reason = *stop_reason;
}
StreamDelta::Error { .. } => {}
}
}
#[must_use]
pub const fn usage(&self) -> Option<&Usage> {
self.usage.as_ref()
}
#[must_use]
pub const fn stop_reason(&self) -> Option<&StopReason> {
self.stop_reason.as_ref()
}
#[must_use]
pub fn into_content_blocks(self) -> Vec<ContentBlock> {
let mut blocks: Vec<(usize, ContentBlock)> = Vec::new();
let mut signatures = self.thinking_signatures;
for (idx, thinking) in self.thinking_blocks.into_iter().enumerate() {
if !thinking.is_empty() {
let signature = signatures.remove(&idx).filter(|s| !s.is_empty());
blocks.push((
idx,
ContentBlock::Thinking {
thinking,
signature,
},
));
}
}
for (idx, data) in self.redacted_thinking_blocks {
blocks.push((idx, ContentBlock::RedactedThinking { data }));
}
for (idx, text) in self.text_blocks.into_iter().enumerate() {
if !text.is_empty() {
blocks.push((idx, ContentBlock::Text { text }));
}
}
for tool in self.tool_uses {
let input: serde_json::Value =
serde_json::from_str(&tool.input_json).unwrap_or_else(|e| {
log::warn!(
"Failed to parse streamed tool input JSON for tool '{}' (id={}): {} — \
input_json ({} bytes): '{}'",
tool.name,
tool.id,
e,
tool.input_json.len(),
tool.input_json.chars().take(500).collect::<String>(),
);
serde_json::json!({})
});
blocks.push((
tool.block_index,
ContentBlock::ToolUse {
id: tool.id,
name: tool.name,
input,
thought_signature: tool.thought_signature,
},
));
}
blocks.sort_by_key(|(idx, _)| *idx);
blocks.into_iter().map(|(_, block)| block).collect()
}
pub const fn take_usage(&mut self) -> Option<Usage> {
self.usage.take()
}
pub const fn take_stop_reason(&mut self) -> Option<StopReason> {
self.stop_reason.take()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_accumulator_text_deltas() {
let mut acc = StreamAccumulator::new();
acc.apply(&StreamDelta::TextDelta {
delta: "Hello".to_string(),
block_index: 0,
});
acc.apply(&StreamDelta::TextDelta {
delta: " world".to_string(),
block_index: 0,
});
let blocks = acc.into_content_blocks();
assert_eq!(blocks.len(), 1);
assert!(matches!(&blocks[0], ContentBlock::Text { text } if text == "Hello world"));
}
#[test]
fn test_accumulator_multiple_text_blocks() {
let mut acc = StreamAccumulator::new();
acc.apply(&StreamDelta::TextDelta {
delta: "First".to_string(),
block_index: 0,
});
acc.apply(&StreamDelta::TextDelta {
delta: "Second".to_string(),
block_index: 1,
});
let blocks = acc.into_content_blocks();
assert_eq!(blocks.len(), 2);
assert!(matches!(&blocks[0], ContentBlock::Text { text } if text == "First"));
assert!(matches!(&blocks[1], ContentBlock::Text { text } if text == "Second"));
}
#[test]
fn test_accumulator_thinking_signature() {
let mut acc = StreamAccumulator::new();
acc.apply(&StreamDelta::ThinkingDelta {
delta: "Reasoning".to_string(),
block_index: 0,
});
acc.apply(&StreamDelta::SignatureDelta {
delta: "sig_123".to_string(),
block_index: 0,
});
let blocks = acc.into_content_blocks();
assert_eq!(blocks.len(), 1);
assert!(matches!(
&blocks[0],
ContentBlock::Thinking { thinking, signature }
if thinking == "Reasoning" && signature.as_deref() == Some("sig_123")
));
}
#[test]
fn test_accumulator_tool_use() {
let mut acc = StreamAccumulator::new();
acc.apply(&StreamDelta::ToolUseStart {
id: "call_123".to_string(),
name: "read_file".to_string(),
block_index: 0,
thought_signature: None,
});
acc.apply(&StreamDelta::ToolInputDelta {
id: "call_123".to_string(),
delta: r#"{"path":"#.to_string(),
block_index: 0,
});
acc.apply(&StreamDelta::ToolInputDelta {
id: "call_123".to_string(),
delta: r#""test.txt"}"#.to_string(),
block_index: 0,
});
let blocks = acc.into_content_blocks();
assert_eq!(blocks.len(), 1);
match &blocks[0] {
ContentBlock::ToolUse {
id, name, input, ..
} => {
assert_eq!(id, "call_123");
assert_eq!(name, "read_file");
assert_eq!(input["path"], "test.txt");
}
_ => panic!("Expected ToolUse block"),
}
}
#[test]
fn test_accumulator_mixed_content() {
let mut acc = StreamAccumulator::new();
acc.apply(&StreamDelta::TextDelta {
delta: "Let me read that file.".to_string(),
block_index: 0,
});
acc.apply(&StreamDelta::ToolUseStart {
id: "call_456".to_string(),
name: "read_file".to_string(),
block_index: 1,
thought_signature: None,
});
acc.apply(&StreamDelta::ToolInputDelta {
id: "call_456".to_string(),
delta: r#"{"path":"file.txt"}"#.to_string(),
block_index: 1,
});
acc.apply(&StreamDelta::Usage(Usage {
input_tokens: 100,
output_tokens: 50,
cached_input_tokens: 0,
}));
acc.apply(&StreamDelta::Done {
stop_reason: Some(StopReason::ToolUse),
});
assert!(acc.usage().is_some());
assert_eq!(acc.usage().map(|u| u.input_tokens), Some(100));
assert!(matches!(acc.stop_reason(), Some(StopReason::ToolUse)));
let blocks = acc.into_content_blocks();
assert_eq!(blocks.len(), 2);
assert!(matches!(&blocks[0], ContentBlock::Text { .. }));
assert!(matches!(&blocks[1], ContentBlock::ToolUse { .. }));
}
#[test]
fn test_accumulator_invalid_tool_json() {
let mut acc = StreamAccumulator::new();
acc.apply(&StreamDelta::ToolUseStart {
id: "call_789".to_string(),
name: "test_tool".to_string(),
block_index: 0,
thought_signature: None,
});
acc.apply(&StreamDelta::ToolInputDelta {
id: "call_789".to_string(),
delta: "invalid json {".to_string(),
block_index: 0,
});
let blocks = acc.into_content_blocks();
assert_eq!(blocks.len(), 1);
match &blocks[0] {
ContentBlock::ToolUse { input, .. } => {
assert!(input.is_object());
}
_ => panic!("Expected ToolUse block"),
}
}
#[test]
fn test_accumulator_empty_tool_input_falls_back_to_empty_object() {
let mut acc = StreamAccumulator::new();
acc.apply(&StreamDelta::ToolUseStart {
id: "call_empty".to_string(),
name: "read".to_string(),
block_index: 0,
thought_signature: None,
});
let blocks = acc.into_content_blocks();
assert_eq!(blocks.len(), 1);
match &blocks[0] {
ContentBlock::ToolUse { input, name, .. } => {
assert_eq!(name, "read");
assert_eq!(input, &serde_json::json!({}));
}
_ => panic!("Expected ToolUse block"),
}
}
#[test]
fn test_accumulator_mismatched_delta_id_drops_input() {
let mut acc = StreamAccumulator::new();
acc.apply(&StreamDelta::ToolUseStart {
id: "call_A".to_string(),
name: "bash".to_string(),
block_index: 0,
thought_signature: None,
});
acc.apply(&StreamDelta::ToolInputDelta {
id: "call_B".to_string(),
delta: r#"{"command":"ls"}"#.to_string(),
block_index: 0,
});
let blocks = acc.into_content_blocks();
assert_eq!(blocks.len(), 1);
match &blocks[0] {
ContentBlock::ToolUse { input, .. } => {
assert_eq!(input, &serde_json::json!({}));
}
_ => panic!("Expected ToolUse block"),
}
}
#[test]
fn test_accumulator_empty() {
let acc = StreamAccumulator::new();
let blocks = acc.into_content_blocks();
assert!(blocks.is_empty());
}
#[test]
fn test_accumulator_skips_empty_text() {
let mut acc = StreamAccumulator::new();
acc.apply(&StreamDelta::TextDelta {
delta: String::new(),
block_index: 0,
});
acc.apply(&StreamDelta::TextDelta {
delta: "Hello".to_string(),
block_index: 1,
});
let blocks = acc.into_content_blocks();
assert_eq!(blocks.len(), 1);
assert!(matches!(&blocks[0], ContentBlock::Text { text } if text == "Hello"));
}
}