Skip to main content

agent_context/context/
stream.rs

1//! 流式对话返回类型 [`AgentSendStream`]。
2//!
3//! 逐块播出后端流数据,内部累积完整响应。流消费完毕后通过
4//! [`take_chunks`](AgentSendStream::take_chunks) 取出累积的响应自行处理。
5
6use super::types::ContextBackend;
7use crate::error::AgentError;
8
9// ---------------------------------------------------------------------------
10// AgentSendStream — 流式对话返回类型
11// ---------------------------------------------------------------------------
12
13/// 流式对话的 Stream 返回类型。
14///
15/// 逐块播出后端流数据,内部累积完整响应。
16/// 流消费完毕后调用 [`take_chunks`](Self::take_chunks) 取出累积的响应。
17pub struct AgentSendStream<B: ContextBackend> {
18    inner:
19        std::pin::Pin<Box<dyn futures_core::Stream<Item = Result<B::Response, AgentError>> + Send>>,
20    chunks: Vec<B::Response>,
21}
22
23impl<B: ContextBackend> AgentSendStream<B> {
24    pub(crate) fn new(
25        inner: impl futures_core::Stream<Item = Result<B::Response, AgentError>> + Send + 'static,
26    ) -> Self {
27        Self {
28            inner: Box::pin(inner),
29            chunks: Vec::new(),
30        }
31    }
32
33    /// 从 stream 内部缓冲取出流式过程中已累积的响应。
34    pub fn take_chunks(&mut self) -> Vec<B::Response> {
35        std::mem::take(&mut self.chunks)
36    }
37}
38
39impl<B: ContextBackend> futures_core::Stream for AgentSendStream<B> {
40    type Item = Result<B::Response, AgentError>;
41
42    fn poll_next(
43        mut self: std::pin::Pin<&mut Self>,
44        cx: &mut std::task::Context<'_>,
45    ) -> std::task::Poll<Option<Self::Item>> {
46        let this = unsafe { self.as_mut().get_unchecked_mut() };
47        match this.inner.as_mut().poll_next(cx) {
48            std::task::Poll::Ready(Some(Ok(resp))) => {
49                this.chunks.push(resp.clone());
50                std::task::Poll::Ready(Some(Ok(resp)))
51            }
52            std::task::Poll::Ready(Some(Err(e))) => std::task::Poll::Ready(Some(Err(e))),
53            std::task::Poll::Ready(None) => std::task::Poll::Ready(None),
54            std::task::Poll::Pending => std::task::Poll::Pending,
55        }
56    }
57}
58
59impl<B: ContextBackend> kameo::Reply for AgentSendStream<B> {
60    type Ok = Self;
61    type Error = std::convert::Infallible;
62    type Value = Self;
63    fn to_result(self) -> Result<Self::Ok, Self::Error> {
64        Ok(self)
65    }
66    fn into_any_err(self) -> Option<Box<dyn kameo::reply::ReplyError>> {
67        None
68    }
69    fn into_value(self) -> Self::Value {
70        self
71    }
72}