use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use super::types::{
ChatResponseSharedState, ChatResult, ERROR_DRAIN_TIMEOUT, ResponseEvent, StreamChunk,
StreamError, StreamReceivers, ToolCallEvent,
};
use crate::types::{Step, UsageMetadata};
#[derive(Debug)]
pub struct ChatResponseHandle {
pub(super) rx: StreamReceivers,
pub(super) usage: Option<UsageMetadata>,
pub(super) structured_output_value: Option<serde_json::Value>,
pub(crate) shared_state: Arc<Mutex<ChatResponseSharedState>>,
}
impl ChatResponseHandle {
pub const fn take_text_stream(&mut self) -> Option<mpsc::Receiver<String>> {
self.rx.text.take()
}
pub const fn take_thought_stream(&mut self) -> Option<mpsc::Receiver<String>> {
self.rx.thought.take()
}
pub const fn take_tool_call_stream(&mut self) -> Option<mpsc::Receiver<ToolCallEvent>> {
self.rx.tool_call.take()
}
pub const fn take_step_stream(&mut self) -> Option<mpsc::Receiver<Step>> {
self.rx.step.take()
}
pub fn receive_steps(&mut self) -> Option<impl tokio_stream::Stream<Item = Step>> {
self.rx.step.take().map(ReceiverStream::new)
}
pub fn receive_chunks(&mut self) -> Option<impl tokio_stream::Stream<Item = StreamChunk>> {
self.rx.chunk.take().map(ReceiverStream::new)
}
pub async fn text(mut self) -> Result<ChatResult, StreamError> {
let mut buf = String::new();
if let Some(mut rx) = self.rx.text.take() {
while let Some(token) = rx.recv().await {
buf.push_str(&token);
}
}
if let Some(mut err_rx) = self.rx.error.take()
&& let Ok(Some(err)) = tokio::time::timeout(ERROR_DRAIN_TIMEOUT, err_rx.recv()).await
{
return Err(err);
}
self.finalize();
Ok(ChatResult {
text: buf,
usage: self.usage,
structured_output: self.structured_output_value,
})
}
pub fn finalize(&mut self) {
if let Ok(state) = self.shared_state.lock() {
self.usage = state.usage.clone();
self.structured_output_value = state.structured_output.clone();
} else {
tracing::error!(
"ChatResponseHandle shared_state mutex poisoned during finalize — \
usage and structured_output will be unavailable"
);
}
}
#[must_use]
pub const fn structured_output(&self) -> Option<&serde_json::Value> {
self.structured_output_value.as_ref()
}
#[must_use]
pub const fn usage_metadata(&self) -> Option<&UsageMetadata> {
self.usage.as_ref()
}
#[doc(hidden)]
#[must_use]
pub fn shared_state(&self) -> Arc<Mutex<ChatResponseSharedState>> {
Arc::clone(&self.shared_state)
}
pub async fn resolve(mut self) -> Vec<ResponseEvent> {
let mut events = Vec::new();
if let Some(mut rx) = self.rx.event.take() {
while let Some(event) = rx.recv().await {
events.push(event);
}
}
self.finalize();
events
}
}