use crate::adapter::inter_stream::{InterStreamEnd, InterStreamEvent};
use crate::chat::{ChatMessage, ContentPart, MessageContent, ToolCall, Usage};
use futures::Stream;
use serde::{Deserialize, Serialize};
use std::pin::Pin;
use std::task::{Context, Poll};
type InterStreamType = Pin<Box<dyn Stream<Item = crate::Result<InterStreamEvent>> + Send>>;
pub struct ChatStream {
inter_stream: InterStreamType,
}
impl ChatStream {
pub(crate) fn new(inter_stream: InterStreamType) -> Self {
ChatStream { inter_stream }
}
pub(crate) fn from_inter_stream<T>(inter_stream: T) -> Self
where
T: Stream<Item = crate::Result<InterStreamEvent>> + Send + 'static,
{
let boxed_stream: InterStreamType = Box::pin(inter_stream);
ChatStream::new(boxed_stream)
}
}
impl Stream for ChatStream {
type Item = crate::Result<ChatStreamEvent>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
match Pin::new(&mut this.inter_stream).poll_next(cx) {
Poll::Ready(Some(Ok(event))) => {
let chat_event = match event {
InterStreamEvent::Start => ChatStreamEvent::Start,
InterStreamEvent::Chunk(content) => ChatStreamEvent::Chunk(StreamChunk { content }),
InterStreamEvent::ReasoningChunk(content) => {
ChatStreamEvent::ReasoningChunk(StreamChunk { content })
}
InterStreamEvent::ThoughtSignatureChunk(content) => {
ChatStreamEvent::ThoughtSignatureChunk(StreamChunk { content })
}
InterStreamEvent::ToolCallChunk(tool_call) => {
ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call })
}
InterStreamEvent::End(inter_end) => ChatStreamEvent::End(inter_end.into()),
};
Poll::Ready(Some(Ok(chat_event)))
}
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub enum ChatStreamEvent {
Start,
Chunk(StreamChunk),
ReasoningChunk(StreamChunk),
ThoughtSignatureChunk(StreamChunk),
ToolCallChunk(ToolChunk),
End(StreamEnd),
}
#[derive(Debug, Serialize, Deserialize)]
pub struct StreamChunk {
pub content: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ToolChunk {
pub tool_call: ToolCall,
}
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct StreamEnd {
pub captured_usage: Option<Usage>,
pub captured_content: Option<MessageContent>,
pub captured_reasoning_content: Option<String>,
}
impl From<InterStreamEnd> for StreamEnd {
fn from(inter_end: InterStreamEnd) -> Self {
let captured_text_content = inter_end.captured_text_content;
let mut captured_tool_calls = inter_end.captured_tool_calls;
let mut captured_content: Option<MessageContent> = None;
if let Some(captured_thoughts) = inter_end.captured_thought_signatures {
let thoughts_content = captured_thoughts
.into_iter()
.map(ContentPart::ThoughtSignature)
.collect::<Vec<_>>();
if let Some(tool_calls) = captured_tool_calls.as_mut()
&& let Some(first_call) = tool_calls.first_mut()
{
first_call.thought_signatures = Some(
thoughts_content
.iter()
.filter_map(|p| p.as_thought_signature().map(|s| s.to_string()))
.collect(),
);
}
if let Some(existing_content) = &mut captured_content {
existing_content.extend_front(thoughts_content);
} else {
captured_content = Some(MessageContent::from_parts(thoughts_content));
}
}
if let Some(captured_text_content) = captured_text_content {
if let Some(existing_content) = &mut captured_content {
existing_content.extend(MessageContent::from_text(captured_text_content));
} else {
captured_content = Some(MessageContent::from_text(captured_text_content));
}
}
if let Some(captured_tool_calls) = captured_tool_calls {
if let Some(existing_content) = &mut captured_content {
existing_content.extend(MessageContent::from_tool_calls(captured_tool_calls));
} else {
captured_content = Some(MessageContent::from_tool_calls(captured_tool_calls));
}
}
StreamEnd {
captured_usage: inter_end.captured_usage,
captured_content,
captured_reasoning_content: inter_end.captured_reasoning_content,
}
}
}
impl StreamEnd {
pub fn captured_first_text(&self) -> Option<&str> {
let captured_content = self.captured_content.as_ref()?;
captured_content.first_text()
}
pub fn captured_into_first_text(self) -> Option<String> {
let captured_content = self.captured_content?;
captured_content.into_first_text()
}
pub fn captured_texts(&self) -> Option<Vec<&str>> {
let captured_content = self.captured_content.as_ref()?;
Some(captured_content.texts())
}
pub fn into_texts(self) -> Option<Vec<String>> {
let captured_content = self.captured_content?;
Some(captured_content.into_texts())
}
pub fn captured_tool_calls(&self) -> Option<Vec<&ToolCall>> {
let captured_content = self.captured_content.as_ref()?;
Some(captured_content.tool_calls())
}
pub fn captured_into_tool_calls(self) -> Option<Vec<ToolCall>> {
let captured_content = self.captured_content?;
Some(captured_content.into_tool_calls())
}
pub fn captured_thought_signatures(&self) -> Option<Vec<&str>> {
let captured_content = self.captured_content.as_ref()?;
Some(
captured_content
.parts()
.iter()
.filter_map(|p| p.as_thought_signature())
.collect(),
)
}
pub fn captured_into_thought_signatures(self) -> Option<Vec<String>> {
let captured_content = self.captured_content?;
Some(
captured_content
.into_parts()
.into_iter()
.filter_map(|p| p.into_thought_signature())
.collect(),
)
}
pub fn into_assistant_message_for_tool_use(self) -> Option<ChatMessage> {
let content = self.captured_content?;
let mut thought_signatures: Vec<String> = Vec::new();
let mut tool_calls: Vec<ToolCall> = Vec::new();
for part in content.into_parts() {
match part {
ContentPart::ThoughtSignature(t) => thought_signatures.push(t),
ContentPart::ToolCall(tc) => tool_calls.push(tc),
_ => {}
}
}
if tool_calls.is_empty() {
return None;
}
Some(ChatMessage::assistant_tool_calls_with_thoughts(
tool_calls,
thought_signatures,
))
}
}