use crate::ids::{prefixed_id, unix_timestamp};
use crate::latency::LatencyProfile;
use crate::openai::{
ItemStatus, OutputContentPart, OutputItem, OutputRole, OutputTokensDetails, ReasoningSummary,
ResponseStatus, ResponsesResponse, ResponsesStreamEvent, ResponsesUsage,
};
use async_stream::stream;
use futures_core::Stream;
use std::pin::Pin;
use tokio::time::sleep;
type OnCompleteCallback = Box<dyn FnOnce() + Send>;
struct CompletionGuard {
callback: Option<OnCompleteCallback>,
}
impl CompletionGuard {
fn new(callback: Option<OnCompleteCallback>) -> Self {
Self { callback }
}
fn complete(&mut self) {
if let Some(callback) = self.callback.take() {
callback();
}
}
}
impl Drop for CompletionGuard {
fn drop(&mut self) {
self.complete();
}
}
pub struct ResponsesTokenStream {
response_id: String,
message_id: String,
model: String,
created_at: i64,
latency: LatencyProfile,
content: String,
usage: ResponsesUsage,
include_reasoning: bool,
reasoning_summary: Option<String>,
on_complete: Option<OnCompleteCallback>,
}
impl ResponsesTokenStream {
pub fn new(
model: String,
content: String,
latency: LatencyProfile,
usage: ResponsesUsage,
) -> Self {
Self {
response_id: prefixed_id("resp_"),
message_id: prefixed_id("msg_"),
model,
created_at: unix_timestamp(),
latency,
content,
usage,
include_reasoning: false,
reasoning_summary: None,
on_complete: None,
}
}
pub fn with_on_complete<F>(mut self, callback: F) -> Self
where
F: FnOnce() + Send + 'static,
{
self.on_complete = Some(Box::new(callback));
self
}
fn tokenize_text(text: &str) -> Vec<String> {
let mut tokens = Vec::new();
let mut current_word = String::new();
for ch in text.chars() {
if ch.is_whitespace() {
if !current_word.is_empty() {
tokens.push(current_word.clone());
current_word.clear();
}
tokens.push(ch.to_string());
} else {
current_word.push(ch);
}
}
if !current_word.is_empty() {
tokens.push(current_word);
}
tokens
}
pub fn into_stream(self) -> Pin<Box<dyn Stream<Item = String> + Send>> {
let content_tokens = Self::tokenize_text(&self.content);
let response_id = self.response_id.clone();
let message_id = self.message_id.clone();
let model = self.model.clone();
let created_at = self.created_at;
let latency = self.latency.clone();
let usage = self.usage.clone();
let content = self.content.clone();
let include_reasoning = self.include_reasoning;
let reasoning_summary = self.reasoning_summary.clone();
let on_complete = self.on_complete;
Box::pin(stream! {
let mut completion_guard = CompletionGuard::new(on_complete);
let mut seq: u32 = 0;
let initial_response = ResponsesResponse {
id: response_id.clone(),
object: "response".to_string(),
created_at,
model: model.clone(),
status: ResponseStatus::InProgress,
output: vec![],
output_text: None,
usage: None,
error: None,
metadata: None,
};
yield ResponsesStreamEvent::response_created(initial_response.clone(), seq);
seq += 1;
let ttft = latency.sample_ttft();
if !ttft.is_zero() {
sleep(ttft).await;
}
yield ResponsesStreamEvent::response_in_progress(initial_response.clone(), seq);
seq += 1;
let mut final_output_items: Vec<OutputItem> = Vec::new();
if include_reasoning {
let reasoning_id = prefixed_id("rs_");
let reasoning_output_index: u32 = 0;
let reasoning_item = OutputItem::Reasoning {
id: reasoning_id.clone(),
status: ItemStatus::InProgress,
summary: None,
};
yield ResponsesStreamEvent::output_item_added(reasoning_output_index, &reasoning_item, seq);
seq += 1;
if let Some(ref summary_text) = reasoning_summary {
let empty_summary = ReasoningSummary {
summary_type: "summary_text".to_string(),
text: String::new(),
};
yield ResponsesStreamEvent::reasoning_summary_part_added(
reasoning_output_index, 0, &reasoning_id, &empty_summary, seq,
);
seq += 1;
let summary_tokens = Self::tokenize_text(summary_text);
for token in summary_tokens.into_iter() {
let tbt = latency.sample_tbt();
if !tbt.is_zero() {
sleep(tbt).await;
}
yield ResponsesStreamEvent::reasoning_summary_text_delta(
reasoning_output_index, 0, &reasoning_id, &token, seq,
);
seq += 1;
}
yield ResponsesStreamEvent::reasoning_summary_text_done(
reasoning_output_index, 0, &reasoning_id, summary_text, seq,
);
seq += 1;
let final_summary = ReasoningSummary {
summary_type: "summary_text".to_string(),
text: summary_text.clone(),
};
yield ResponsesStreamEvent::reasoning_summary_part_done(
reasoning_output_index, 0, &reasoning_id, &final_summary, seq,
);
seq += 1;
}
let final_reasoning_item = OutputItem::Reasoning {
id: reasoning_id,
status: ItemStatus::Completed,
summary: reasoning_summary.as_ref().map(|text| {
vec![ReasoningSummary {
summary_type: "summary_text".to_string(),
text: text.clone(),
}]
}),
};
yield ResponsesStreamEvent::output_item_done(reasoning_output_index, &final_reasoning_item, seq);
seq += 1;
final_output_items.push(final_reasoning_item);
}
let message_output_index = if include_reasoning { 1 } else { 0 };
let message_item = OutputItem::Message {
id: message_id.clone(),
role: OutputRole::Assistant,
status: ItemStatus::InProgress,
content: vec![],
};
yield ResponsesStreamEvent::output_item_added(message_output_index, &message_item, seq);
seq += 1;
let content_part = OutputContentPart::OutputText {
text: String::new(),
annotations: vec![],
};
yield ResponsesStreamEvent::content_part_added(message_output_index, 0, &message_id, &content_part, seq);
seq += 1;
for token in content_tokens.into_iter() {
let tbt = latency.sample_tbt();
if !tbt.is_zero() {
sleep(tbt).await;
}
yield ResponsesStreamEvent::output_text_delta(
message_output_index, 0, &message_id, &token, seq,
);
seq += 1;
}
yield ResponsesStreamEvent::output_text_done(message_output_index, 0, &message_id, &content, seq);
seq += 1;
let final_content_part = OutputContentPart::OutputText {
text: content.clone(),
annotations: vec![],
};
yield ResponsesStreamEvent::content_part_done(message_output_index, 0, &message_id, &final_content_part, seq);
seq += 1;
let final_message_item = OutputItem::Message {
id: message_id.clone(),
role: OutputRole::Assistant,
status: ItemStatus::Completed,
content: vec![final_content_part],
};
yield ResponsesStreamEvent::output_item_done(message_output_index, &final_message_item, seq);
seq += 1;
final_output_items.push(final_message_item);
let final_response = ResponsesResponse {
id: response_id.clone(),
object: "response".to_string(),
created_at,
model: model.clone(),
status: ResponseStatus::Completed,
output: final_output_items,
output_text: Some(content.clone()),
usage: Some(usage),
error: None,
metadata: None,
};
yield ResponsesStreamEvent::response_completed(final_response, seq);
completion_guard.complete();
})
}
}
pub struct ResponsesTokenStreamBuilder {
model: String,
content: String,
latency: LatencyProfile,
usage: ResponsesUsage,
include_reasoning: bool,
reasoning_summary: Option<String>,
on_complete: Option<OnCompleteCallback>,
}
impl ResponsesTokenStreamBuilder {
pub fn new(model: impl Into<String>, content: impl Into<String>) -> Self {
Self {
model: model.into(),
content: content.into(),
latency: LatencyProfile::default(),
usage: ResponsesUsage {
input_tokens: 0,
output_tokens: 0,
total_tokens: 0,
output_tokens_details: Some(OutputTokensDetails {
reasoning_tokens: 0,
}),
},
include_reasoning: false,
reasoning_summary: None,
on_complete: None,
}
}
pub fn latency(mut self, latency: LatencyProfile) -> Self {
self.latency = latency;
self
}
pub fn usage(mut self, usage: ResponsesUsage) -> Self {
self.usage = usage;
self
}
pub fn reasoning(mut self, summary_text: Option<String>) -> Self {
self.include_reasoning = true;
self.reasoning_summary = summary_text;
self
}
pub fn on_complete<F>(mut self, callback: F) -> Self
where
F: FnOnce() + Send + 'static,
{
self.on_complete = Some(Box::new(callback));
self
}
pub fn build(self) -> ResponsesTokenStream {
let mut stream =
ResponsesTokenStream::new(self.model, self.content, self.latency, self.usage);
stream.include_reasoning = self.include_reasoning;
stream.reasoning_summary = self.reasoning_summary;
if let Some(on_complete) = self.on_complete {
stream = stream.with_on_complete(on_complete);
}
stream
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures_util::StreamExt;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
#[tokio::test]
async fn test_responses_stream_basic() {
let usage = ResponsesUsage {
input_tokens: 10,
output_tokens: 5,
total_tokens: 15,
output_tokens_details: None,
};
let stream = ResponsesTokenStreamBuilder::new("gpt-5", "Hello world")
.latency(LatencyProfile::instant())
.usage(usage)
.build();
let events: Vec<String> = stream.into_stream().collect().await;
assert!(!events.is_empty());
assert!(events[0].contains("response.created"));
assert!(events.last().unwrap().contains("response.completed"));
}
#[tokio::test]
async fn test_responses_stream_deltas() {
let usage = ResponsesUsage {
input_tokens: 5,
output_tokens: 3,
total_tokens: 8,
output_tokens_details: None,
};
let stream = ResponsesTokenStreamBuilder::new("gpt-5", "Hello world")
.latency(LatencyProfile::instant())
.usage(usage)
.build();
let events: Vec<String> = stream.into_stream().collect().await;
let delta_events: Vec<_> = events
.iter()
.filter(|e| e.contains("output_text.delta"))
.collect();
assert!(!delta_events.is_empty());
assert!(delta_events[0].contains("sequence_number"));
}
#[tokio::test]
async fn test_responses_stream_event_order() {
let usage = ResponsesUsage {
input_tokens: 5,
output_tokens: 2,
total_tokens: 7,
output_tokens_details: None,
};
let stream = ResponsesTokenStreamBuilder::new("gpt-5", "Hi")
.latency(LatencyProfile::instant())
.usage(usage)
.build();
let events: Vec<String> = stream.into_stream().collect().await;
let event_types: Vec<&str> = events
.iter()
.filter_map(|e| {
if e.contains("response.created") {
Some("created")
} else if e.contains("response.in_progress") {
Some("in_progress")
} else if e.contains("output_item.added") {
Some("item_added")
} else if e.contains("content_part.added") {
Some("part_added")
} else if e.contains("output_text.delta") {
Some("delta")
} else if e.contains("output_text.done") {
Some("text_done")
} else if e.contains("content_part.done") {
Some("part_done")
} else if e.contains("output_item.done") {
Some("item_done")
} else if e.contains("response.completed") {
Some("completed")
} else {
None
}
})
.collect();
assert_eq!(event_types.first(), Some(&"created"));
assert_eq!(event_types.last(), Some(&"completed"));
}
#[tokio::test]
async fn test_responses_stream_with_reasoning_and_summary() {
let usage = ResponsesUsage {
input_tokens: 5,
output_tokens: 3,
total_tokens: 17,
output_tokens_details: Some(OutputTokensDetails {
reasoning_tokens: 9,
}),
};
let stream = ResponsesTokenStreamBuilder::new("o3", "Answer here")
.latency(LatencyProfile::instant())
.usage(usage)
.reasoning(Some("Thinking about it.".to_string()))
.build();
let events: Vec<String> = stream.into_stream().collect().await;
let reasoning_added: Vec<_> = events
.iter()
.filter(|e| e.contains("output_item.added") && e.contains("\"reasoning\""))
.collect();
assert_eq!(
reasoning_added.len(),
1,
"Should have one reasoning item added"
);
let summary_part_added: Vec<_> = events
.iter()
.filter(|e| e.contains("reasoning_summary_part.added"))
.collect();
assert_eq!(summary_part_added.len(), 1);
let summary_deltas: Vec<_> = events
.iter()
.filter(|e| e.contains("reasoning_summary_text.delta"))
.collect();
assert!(
!summary_deltas.is_empty(),
"Should have summary text deltas"
);
let summary_done: Vec<_> = events
.iter()
.filter(|e| e.contains("reasoning_summary_text.done"))
.collect();
assert_eq!(summary_done.len(), 1);
let summary_part_done: Vec<_> = events
.iter()
.filter(|e| e.contains("reasoning_summary_part.done"))
.collect();
assert_eq!(summary_part_done.len(), 1);
let reasoning_done_idx = events
.iter()
.position(|e| e.contains("output_item.done") && e.contains("\"reasoning\""))
.expect("Should have reasoning item done");
let message_added_idx = events
.iter()
.position(|e| e.contains("output_item.added") && e.contains("\"message\""))
.expect("Should have message item added");
assert!(
reasoning_done_idx < message_added_idx,
"Reasoning item should complete before message starts"
);
let message_event = &events[message_added_idx];
assert!(message_event.contains("\"output_index\":1"));
let completed = events.last().unwrap();
assert!(completed.contains("response.completed"));
assert!(completed.contains("\"reasoning\""));
assert!(completed.contains("\"message\""));
}
#[tokio::test]
async fn test_responses_stream_with_reasoning_no_summary() {
let usage = ResponsesUsage {
input_tokens: 5,
output_tokens: 3,
total_tokens: 17,
output_tokens_details: Some(OutputTokensDetails {
reasoning_tokens: 9,
}),
};
let stream = ResponsesTokenStreamBuilder::new("o3", "Answer")
.latency(LatencyProfile::instant())
.usage(usage)
.reasoning(None)
.build();
let events: Vec<String> = stream.into_stream().collect().await;
let reasoning_added: Vec<_> = events
.iter()
.filter(|e| e.contains("output_item.added") && e.contains("\"reasoning\""))
.collect();
assert_eq!(reasoning_added.len(), 1);
let summary_events: Vec<_> = events
.iter()
.filter(|e| e.contains("reasoning_summary"))
.collect();
assert!(
summary_events.is_empty(),
"No summary events when summary not requested"
);
let message_added = events
.iter()
.find(|e| e.contains("output_item.added") && e.contains("\"message\""))
.expect("Should have message item");
assert!(message_added.contains("\"output_index\":1"));
}
#[tokio::test]
async fn test_responses_stream_no_reasoning() {
let usage = ResponsesUsage {
input_tokens: 5,
output_tokens: 3,
total_tokens: 8,
output_tokens_details: None,
};
let stream = ResponsesTokenStreamBuilder::new("gpt-4o", "Hello")
.latency(LatencyProfile::instant())
.usage(usage)
.build();
let events: Vec<String> = stream.into_stream().collect().await;
let reasoning_events: Vec<_> = events
.iter()
.filter(|e| e.contains("\"reasoning\""))
.collect();
assert!(
reasoning_events.is_empty(),
"Non-reasoning model should not have reasoning items"
);
let message_added = events
.iter()
.find(|e| e.contains("output_item.added") && e.contains("\"message\""))
.expect("Should have message item");
assert!(message_added.contains("\"output_index\":0"));
}
#[tokio::test]
async fn test_responses_stream_reasoning_sequence_numbers_continuous() {
let usage = ResponsesUsage {
input_tokens: 5,
output_tokens: 3,
total_tokens: 20,
output_tokens_details: Some(OutputTokensDetails {
reasoning_tokens: 12,
}),
};
let stream = ResponsesTokenStreamBuilder::new("o3", "Hi")
.latency(LatencyProfile::instant())
.usage(usage)
.reasoning(Some("Thinking carefully.".to_string()))
.build();
let events: Vec<String> = stream.into_stream().collect().await;
let mut all_seq_numbers: Vec<u32> = Vec::new();
for event in &events {
if let Some(data_start) = event.find("data: ") {
let data = &event[data_start + 6..];
if let Ok(json) = serde_json::from_str::<serde_json::Value>(data.trim()) {
if let Some(seq) = json.get("sequence_number").and_then(|v| v.as_u64()) {
all_seq_numbers.push(seq as u32);
}
}
}
}
assert_eq!(
all_seq_numbers.len(),
events.len(),
"Every event should have a sequence_number"
);
for (i, seq) in all_seq_numbers.iter().enumerate() {
assert_eq!(
*seq, i as u32,
"Sequence numbers should be continuous: expected {}, got {}",
i, seq
);
}
}
#[tokio::test]
async fn test_responses_stream_on_complete_called_on_drop() {
let callback_count = Arc::new(AtomicUsize::new(0));
let callback_count_clone = Arc::clone(&callback_count);
let stream = ResponsesTokenStreamBuilder::new("gpt-5", "Hello world")
.latency(LatencyProfile::instant())
.on_complete(move || {
callback_count_clone.fetch_add(1, Ordering::SeqCst);
})
.build();
let mut stream = stream.into_stream();
assert!(stream.next().await.is_some());
drop(stream);
assert_eq!(callback_count.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_responses_stream_on_complete_called_once() {
let callback_count = Arc::new(AtomicUsize::new(0));
let callback_count_clone = Arc::clone(&callback_count);
let stream = ResponsesTokenStreamBuilder::new("gpt-5", "Hello world")
.latency(LatencyProfile::instant())
.on_complete(move || {
callback_count_clone.fetch_add(1, Ordering::SeqCst);
})
.build();
let _: Vec<String> = stream.into_stream().collect().await;
assert_eq!(callback_count.load(Ordering::SeqCst), 1);
}
}