#![allow(clippy::print_stdout)]
use bytes::Bytes;
use entelix::Result;
use entelix::codecs::{AnthropicMessagesCodec, BoxByteStream, Codec};
use entelix::stream::{StreamAggregator, StreamDelta};
use futures::StreamExt;
const SSE_BODY: &str = "event: message_start\n\
data: {\"type\":\"message_start\",\"message\":{\"id\":\"msg_demo\",\"model\":\"claude-opus-4-7\",\"role\":\"assistant\",\"content\":[],\"stop_reason\":null,\"usage\":{\"input_tokens\":3}}}\n\n\
event: content_block_start\n\
data: {\"type\":\"content_block_start\",\"index\":0,\"content_block\":{\"type\":\"text\",\"text\":\"\"}}\n\n\
event: content_block_delta\n\
data: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\"Hello\"}}\n\n\
event: content_block_delta\n\
data: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\", \"}}\n\n\
event: content_block_delta\n\
data: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\"world!\"}}\n\n\
event: content_block_stop\n\
data: {\"type\":\"content_block_stop\",\"index\":0}\n\n\
event: message_delta\n\
data: {\"type\":\"message_delta\",\"delta\":{\"stop_reason\":\"end_turn\"},\"usage\":{\"output_tokens\":7}}\n\n\
event: message_stop\n\
data: {\"type\":\"message_stop\"}\n\n";
fn body_stream() -> BoxByteStream<'static> {
Box::pin(futures::stream::iter(vec![Ok::<_, entelix::Error>(
Bytes::from(SSE_BODY),
)]))
}
#[tokio::main]
async fn main() -> Result<()> {
let codec = AnthropicMessagesCodec::new();
let mut stream = codec.decode_stream(body_stream(), Vec::new());
let mut aggregator = StreamAggregator::new();
println!("── streaming deltas ─────────────────────────────");
while let Some(item) = stream.next().await {
let delta = item?;
match &delta {
StreamDelta::Start {
id,
model,
provider_echoes,
} => {
println!(
" start id={id} model={model}{}",
if provider_echoes.is_empty() {
""
} else {
" [echo]"
}
);
}
StreamDelta::TextDelta {
text,
provider_echoes,
} => {
print!(" text ┊ {text}");
if !provider_echoes.is_empty() {
print!(" [echo]");
}
}
StreamDelta::ThinkingDelta {
text,
provider_echoes,
} => {
print!(" think ┊ {text}");
if !provider_echoes.is_empty() {
println!(" [echo]");
}
}
StreamDelta::ToolUseStart {
id,
name,
provider_echoes,
} => {
println!(
" tool> id={id} name={name}{}",
if provider_echoes.is_empty() {
""
} else {
" [echo]"
}
);
}
StreamDelta::ToolUseInputDelta { partial_json } => {
println!(" tool… {partial_json}");
}
StreamDelta::ToolUseStop => println!(" tool<"),
StreamDelta::Usage(u) => {
println!("\n usage in={} out={}", u.input_tokens, u.output_tokens);
}
StreamDelta::Warning(w) => println!(" warn {w:?}"),
StreamDelta::RateLimit(rl) => println!(
" quota req_remain={:?} tok_remain={:?}",
rl.requests_remaining, rl.tokens_remaining
),
StreamDelta::Stop { stop_reason } => println!(" stop {stop_reason:?}"),
_ => {}
}
aggregator.push(delta)?;
}
let response = aggregator.finalize()?;
println!("\n── finalized ────────────────────────────────────");
println!(" id : {}", response.id);
println!(" model : {}", response.model);
println!(" stop_reason : {:?}", response.stop_reason);
println!(" parts : {}", response.content.len());
println!(" output_tokens: {}", response.usage.output_tokens);
Ok(())
}