#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct StreamUsage {
pub input_tokens: u64,
pub output_tokens: u64,
pub cache_read_tokens: Option<u64>,
pub cache_creation_tokens: Option<u64>,
pub stop_reason: Option<String>,
}
impl StreamUsage {
pub fn total_tokens(&self) -> u64 {
self.input_tokens
+ self.output_tokens
+ self.cache_read_tokens.unwrap_or(0)
+ self.cache_creation_tokens.unwrap_or(0)
}
}
#[derive(Debug, Clone)]
pub enum StreamEvent {
TextDelta(String),
ThinkingDelta(String),
InputJsonDelta(String),
Usage(StreamUsage),
Done,
Error(String),
}
#[derive(Debug, Default)]
pub struct StreamAccumulator {
text: String,
usage: Option<StreamUsage>,
}
impl StreamAccumulator {
pub fn new() -> Self {
Self::default()
}
pub fn push(&mut self, event: &StreamEvent) {
match event {
StreamEvent::TextDelta(delta) => self.text.push_str(delta),
StreamEvent::Usage(usage) => self.usage = Some(usage.clone()),
StreamEvent::ThinkingDelta(_)
| StreamEvent::InputJsonDelta(_)
| StreamEvent::Done
| StreamEvent::Error(_) => {}
}
}
pub fn text(&self) -> &str {
&self.text
}
pub fn usage(&self) -> Option<&StreamUsage> {
self.usage.as_ref()
}
pub fn total_tokens(&self) -> Option<u64> {
self.usage.as_ref().map(StreamUsage::total_tokens)
}
pub fn into_text(self) -> String {
self.text
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn accumulates_text_deltas() {
let mut acc = StreamAccumulator::new();
acc.push(&StreamEvent::TextDelta("Hello".to_string()));
acc.push(&StreamEvent::TextDelta(", world".to_string()));
acc.push(&StreamEvent::Done);
assert_eq!(acc.text(), "Hello, world");
}
#[test]
fn ignores_non_text_events() {
let mut acc = StreamAccumulator::new();
acc.push(&StreamEvent::InputJsonDelta("{\"foo\":".to_string()));
acc.push(&StreamEvent::Done);
assert_eq!(acc.text(), "");
}
#[test]
fn into_text_consumes() {
let mut acc = StreamAccumulator::new();
acc.push(&StreamEvent::TextDelta("hi".to_string()));
assert_eq!(acc.into_text(), "hi");
}
#[test]
fn captures_usage_and_excludes_thinking_from_text() {
let mut acc = StreamAccumulator::new();
acc.push(&StreamEvent::ThinkingDelta("reasoning...".to_string()));
acc.push(&StreamEvent::TextDelta("answer".to_string()));
acc.push(&StreamEvent::Usage(StreamUsage {
input_tokens: 10,
output_tokens: 5,
cache_read_tokens: Some(100),
cache_creation_tokens: None,
stop_reason: Some("end_turn".to_string()),
}));
acc.push(&StreamEvent::Done);
assert_eq!(acc.text(), "answer");
assert_eq!(acc.total_tokens(), Some(115));
assert_eq!(
acc.usage().unwrap().stop_reason.as_deref(),
Some("end_turn")
);
}
#[test]
fn total_tokens_none_without_usage() {
let mut acc = StreamAccumulator::new();
acc.push(&StreamEvent::TextDelta("hi".to_string()));
assert_eq!(acc.total_tokens(), None);
}
}