use std::collections::HashMap;
use super::types::{AssistantMessage, ClaudeEvent, ContentBlock, FeedItem, UserMessage};
#[derive(Debug, Default)]
pub struct StreamAccumulator {
tools: HashMap<u64, InProgressTool>,
text_buffer: String,
thinking_buffer: String,
}
#[derive(Debug)]
struct InProgressTool {
name: String,
json_parts: Vec<String>,
}
impl StreamAccumulator {
pub fn new() -> Self {
Self::default()
}
fn handle(&mut self, inner: super::types::StreamEventInner) -> Vec<FeedItem> {
let index = inner
.extra
.get("index")
.and_then(|v| v.as_u64())
.unwrap_or(0);
match inner.event_type.as_str() {
"content_block_start" => {
if let Some(ContentBlock::ToolUse {
name: Some(name), ..
}) = inner.content_block
{
let item = FeedItem::ToolCall {
name: name.clone(),
input: serde_json::Value::Null,
};
self.tools.insert(
index,
InProgressTool {
name,
json_parts: Vec::new(),
},
);
return vec![item];
}
vec![]
}
"content_block_delta" => {
if let Some(delta) = inner.delta {
match delta.delta_type.as_deref() {
Some("text_delta") => {
if let Some(text) = delta.text {
if !text.is_empty() {
self.text_buffer.push_str(&text);
return vec![FeedItem::AssistantTextStreaming(
self.text_buffer.clone(),
)];
}
}
}
Some("thinking_delta") => {
if let Some(thinking) = delta.thinking {
if !thinking.is_empty() {
self.thinking_buffer.push_str(&thinking);
return vec![FeedItem::ThinkingStreaming(
self.thinking_buffer.clone(),
)];
}
}
}
Some("input_json_delta") => {
if let Some(partial) = delta.partial_json {
if let Some(tool) = self.tools.get_mut(&index) {
tool.json_parts.push(partial);
}
}
}
Some("signature_delta") => {
}
Some(other) => {
eprintln!("[keel:parser] unhandled delta type: {other}");
}
None => {
}
}
}
vec![]
}
"content_block_stop" => {
if let Some(tool) = self.tools.remove(&index) {
let json_str: String = tool.json_parts.concat();
let input =
serde_json::from_str(&json_str).unwrap_or(serde_json::Value::Null);
return vec![FeedItem::ToolCall {
name: tool.name,
input,
}];
}
if !self.thinking_buffer.is_empty() {
let thinking = std::mem::take(&mut self.thinking_buffer);
return vec![FeedItem::Thinking(thinking)];
}
vec![]
}
_ => vec![],
}
}
}
pub fn parse_event(line: &str, acc: &mut StreamAccumulator) -> Vec<FeedItem> {
let line = line.trim();
if line.is_empty() {
return vec![];
}
let event: ClaudeEvent = match serde_json::from_str(line) {
Ok(e) => e,
Err(e) => {
eprintln!("Failed to parse Claude event: {e}\nLine: {line}");
return vec![];
}
};
match event {
ClaudeEvent::System { .. } => vec![],
ClaudeEvent::Assistant {
subtype, message, ..
} => {
if subtype.as_deref() != Some("partial") {
acc.text_buffer.clear();
acc.thinking_buffer.clear();
}
parse_assistant_event(subtype.as_deref(), message)
}
ClaudeEvent::User { message, .. } => parse_user_event(message),
ClaudeEvent::Result {
result,
cost_usd,
duration_ms,
..
} => {
vec![FeedItem::FinalResult {
result: result.unwrap_or_default(),
cost_usd,
duration_ms,
}]
}
ClaudeEvent::StreamEvent { event: inner, .. } => acc.handle(inner),
ClaudeEvent::RateLimitEvent { .. } => vec![],
}
}
fn parse_assistant_event(
subtype: Option<&str>,
message: Option<AssistantMessage>,
) -> Vec<FeedItem> {
let is_partial = subtype == Some("partial");
let Some(msg) = message else {
return vec![];
};
let Some(content) = msg.content else {
return vec![];
};
let mut items = Vec::new();
for block in content {
match block {
ContentBlock::Text { text } => {
if !text.is_empty() {
if is_partial {
items.push(FeedItem::AssistantTextStreaming(text));
} else {
items.push(FeedItem::AssistantText(text));
}
}
}
ContentBlock::Thinking { thinking } => {
if let Some(t) = thinking {
if !t.is_empty() {
if is_partial {
items.push(FeedItem::ThinkingStreaming(t));
} else {
items.push(FeedItem::Thinking(t));
}
}
}
}
ContentBlock::ToolUse { name, input, .. } => {
items.push(FeedItem::ToolCall {
name: name.unwrap_or_else(|| "unknown".into()),
input: input.unwrap_or(serde_json::Value::Null),
});
}
ContentBlock::ToolResult {
content, is_error, ..
} => {
let text = match content {
Some(serde_json::Value::String(s)) => s,
Some(v) => serde_json::to_string_pretty(&v).unwrap_or_default(),
None => String::new(),
};
items.push(FeedItem::ToolResult {
content: text,
is_error: is_error.unwrap_or(false),
});
}
ContentBlock::Unknown => {}
}
}
items
}
fn parse_user_event(message: Option<UserMessage>) -> Vec<FeedItem> {
let Some(msg) = message else {
return vec![];
};
let Some(content_val) = msg.content else {
return vec![];
};
let blocks: Vec<ContentBlock> = match serde_json::from_value(content_val) {
Ok(b) => b,
Err(_) => return vec![],
};
let mut items = Vec::new();
for block in blocks {
if let ContentBlock::ToolResult {
content, is_error, ..
} = block
{
let text = match content {
Some(serde_json::Value::String(s)) => s,
Some(v) => serde_json::to_string_pretty(&v).unwrap_or_default(),
None => String::new(),
};
items.push(FeedItem::ToolResult {
content: text,
is_error: is_error.unwrap_or(false),
});
}
}
items
}
pub fn extract_session_id(line: &str) -> Option<String> {
let event: ClaudeEvent = serde_json::from_str(line.trim()).ok()?;
match event {
ClaudeEvent::System { session_id, .. } => session_id,
ClaudeEvent::Result { session_id, .. } => session_id,
ClaudeEvent::StreamEvent { session_id, .. } => session_id,
_ => None,
}
}
#[cfg(test)]
mod tests {
use super::*;
fn acc() -> StreamAccumulator {
StreamAccumulator::new()
}
#[test]
fn parse_system_event() {
let line = r#"{"type":"system","subtype":"init","session_id":"abc-123"}"#;
let items = parse_event(line, &mut acc());
assert!(items.is_empty(), "system events produce no feed items");
assert_eq!(extract_session_id(line), Some("abc-123".to_string()));
}
#[test]
fn parse_assistant_text() {
let line = r#"{"type":"assistant","message":{"content":[{"type":"text","text":"Hello world"}]}}"#;
let items = parse_event(line, &mut acc());
assert_eq!(items.len(), 1);
assert!(matches!(&items[0], FeedItem::AssistantText(t) if t == "Hello world"));
}
#[test]
fn parse_assistant_streaming() {
let line = r#"{"type":"assistant","subtype":"partial","message":{"content":[{"type":"text","text":"Hel"}]}}"#;
let items = parse_event(line, &mut acc());
assert_eq!(items.len(), 1);
assert!(matches!(&items[0], FeedItem::AssistantTextStreaming(t) if t == "Hel"));
}
#[test]
fn parse_tool_use() {
let line = r#"{"type":"assistant","message":{"content":[{"type":"tool_use","id":"t1","name":"Read","input":{"path":"/foo"}}]}}"#;
let items = parse_event(line, &mut acc());
assert_eq!(items.len(), 1);
match &items[0] {
FeedItem::ToolCall { name, .. } => assert_eq!(name, "Read"),
other => panic!("expected ToolCall, got {other:?}"),
}
}
#[test]
fn parse_tool_result() {
let line = r#"{"type":"assistant","message":{"content":[{"type":"tool_result","tool_use_id":"t1","content":"file contents","is_error":false}]}}"#;
let items = parse_event(line, &mut acc());
assert_eq!(items.len(), 1);
match &items[0] {
FeedItem::ToolResult { content, is_error } => {
assert_eq!(content, "file contents");
assert!(!is_error);
}
other => panic!("expected ToolResult, got {other:?}"),
}
}
#[test]
fn parse_user_tool_result() {
let line = r#"{"type":"user","message":{"content":[{"type":"tool_result","tool_use_id":"t1","content":"Task created: Fix bug (id: abc-123). Running now.","is_error":false}]}}"#;
let items = parse_event(line, &mut acc());
assert_eq!(items.len(), 1);
match &items[0] {
FeedItem::ToolResult { content, is_error } => {
assert!(content.contains("(id: abc-123)"));
assert!(!is_error);
}
other => panic!("expected ToolResult, got {other:?}"),
}
}
#[test]
fn parse_user_event_without_tool_result_is_empty() {
let line = r#"{"type":"user","message":{"content":"hi"}}"#;
assert!(parse_event(line, &mut acc()).is_empty());
}
#[test]
fn parse_result_event() {
let line = r#"{"type":"result","result":"Done","cost_usd":0.05,"duration_ms":1234,"session_id":"s1"}"#;
let items = parse_event(line, &mut acc());
assert_eq!(items.len(), 1);
match &items[0] {
FeedItem::FinalResult {
cost_usd,
duration_ms,
..
} => {
assert_eq!(*cost_usd, Some(0.05));
assert_eq!(*duration_ms, Some(1234));
}
other => panic!("expected FinalResult, got {other:?}"),
}
assert_eq!(extract_session_id(line), Some("s1".to_string()));
}
#[test]
fn parse_empty_line() {
assert!(parse_event("", &mut acc()).is_empty());
assert!(parse_event(" ", &mut acc()).is_empty());
}
#[test]
fn parse_invalid_json() {
assert!(parse_event("not json", &mut acc()).is_empty());
}
#[test]
fn parse_user_event_ignored() {
let line = r#"{"type":"user","message":{"content":"hi"}}"#;
assert!(parse_event(line, &mut acc()).is_empty());
}
#[test]
fn extract_session_id_from_non_system() {
let line = r#"{"type":"assistant","message":{"content":[{"type":"text","text":"hi"}]}}"#;
assert_eq!(extract_session_id(line), None);
}
#[test]
fn parse_stream_event_text_delta_accumulates() {
let mut a = acc();
let line1 = r#"{"type":"stream_event","event":{"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}},"session_id":"s1","uuid":"u1"}"#;
let items1 = parse_event(line1, &mut a);
assert_eq!(items1.len(), 1);
assert!(matches!(&items1[0], FeedItem::AssistantTextStreaming(t) if t == "Hello"));
let line2 = r#"{"type":"stream_event","event":{"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" world"}},"session_id":"s1","uuid":"u2"}"#;
let items2 = parse_event(line2, &mut a);
assert_eq!(items2.len(), 1);
assert!(matches!(&items2[0], FeedItem::AssistantTextStreaming(t) if t == "Hello world"));
assert_eq!(extract_session_id(line1), Some("s1".to_string()));
}
#[test]
fn stream_tool_accumulates_input() {
let mut a = acc();
let start = r#"{"type":"stream_event","event":{"type":"content_block_start","index":1,"content_block":{"type":"tool_use","id":"t1","name":"complete_job","input":{}}},"session_id":"s1","uuid":"u1"}"#;
let start_items = parse_event(start, &mut a);
assert_eq!(start_items.len(), 1);
match &start_items[0] {
FeedItem::ToolCall { name, input } => {
assert_eq!(name, "complete_job");
assert!(input.is_null());
}
other => panic!("expected ToolCall, got {other:?}"),
}
let d1 = r#"{"type":"stream_event","event":{"type":"content_block_delta","index":1,"delta":{"type":"input_json_delta","partial_json":"{\"summary\""}},"session_id":"s1","uuid":"u2"}"#;
assert!(parse_event(d1, &mut a).is_empty());
let d2 = r#"{"type":"stream_event","event":{"type":"content_block_delta","index":1,"delta":{"type":"input_json_delta","partial_json":": \"Done!\"}"}},"session_id":"s1","uuid":"u3"}"#;
assert!(parse_event(d2, &mut a).is_empty());
let stop = r#"{"type":"stream_event","event":{"type":"content_block_stop","index":1},"session_id":"s1","uuid":"u4"}"#;
let items = parse_event(stop, &mut a);
assert_eq!(items.len(), 1);
match &items[0] {
FeedItem::ToolCall { name, input } => {
assert_eq!(name, "complete_job");
assert_eq!(input.get("summary").unwrap().as_str().unwrap(), "Done!");
}
other => panic!("expected ToolCall, got {other:?}"),
}
}
#[test]
fn parse_stream_event_message_stop_ignored() {
let line = r#"{"type":"stream_event","event":{"type":"message_stop"},"session_id":"s1","uuid":"u1"}"#;
assert!(parse_event(line, &mut acc()).is_empty());
}
#[test]
fn parse_rate_limit_event_ignored() {
let line = r#"{"type":"rate_limit_event","rate_limit_info":{"status":"allowed"},"uuid":"u1","session_id":"s1"}"#;
assert!(parse_event(line, &mut acc()).is_empty());
}
}