agent-context 0.1.3

Multi-backend agent context manager with three-zone memory model
Documentation
//! 流式对话返回类型 [`AgentSendStream`]。
//!
//! 逐块播出后端流数据,内部累积完整响应。流消费完毕后通过
//! [`take_chunks`](AgentSendStream::take_chunks) 取出累积的响应自行处理。

use super::types::ContextBackend;
use crate::error::AgentError;

// ---------------------------------------------------------------------------
// AgentSendStream — 流式对话返回类型
// ---------------------------------------------------------------------------

/// 流式对话的 Stream 返回类型。
///
/// 逐块播出后端流数据,内部累积完整响应。
/// 流消费完毕后调用 [`take_chunks`](Self::take_chunks) 取出累积的响应。
pub struct AgentSendStream<B: ContextBackend> {
    inner:
        std::pin::Pin<Box<dyn futures_core::Stream<Item = Result<B::Response, AgentError>> + Send>>,
    chunks: Vec<B::Response>,
}

impl<B: ContextBackend> AgentSendStream<B> {
    pub(crate) fn new(
        inner: impl futures_core::Stream<Item = Result<B::Response, AgentError>> + Send + 'static,
    ) -> Self {
        Self {
            inner: Box::pin(inner),
            chunks: Vec::new(),
        }
    }

    /// 从 stream 内部缓冲取出流式过程中已累积的响应。
    pub fn take_chunks(&mut self) -> Vec<B::Response> {
        std::mem::take(&mut self.chunks)
    }
}

impl<B: ContextBackend> futures_core::Stream for AgentSendStream<B> {
    type Item = Result<B::Response, AgentError>;

    fn poll_next(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
        let this = unsafe { self.as_mut().get_unchecked_mut() };
        match this.inner.as_mut().poll_next(cx) {
            std::task::Poll::Ready(Some(Ok(resp))) => {
                this.chunks.push(resp.clone());
                std::task::Poll::Ready(Some(Ok(resp)))
            }
            std::task::Poll::Ready(Some(Err(e))) => std::task::Poll::Ready(Some(Err(e))),
            std::task::Poll::Ready(None) => std::task::Poll::Ready(None),
            std::task::Poll::Pending => std::task::Poll::Pending,
        }
    }
}

impl<B: ContextBackend> kameo::Reply for AgentSendStream<B> {
    type Ok = Self;
    type Error = std::convert::Infallible;
    type Value = Self;
    fn to_result(self) -> Result<Self::Ok, Self::Error> {
        Ok(self)
    }
    fn into_any_err(self) -> Option<Box<dyn kameo::reply::ReplyError>> {
        None
    }
    fn into_value(self) -> Self::Value {
        self
    }
}