agent-context 0.1.0

Multi-backend agent context manager with three-zone memory model
Documentation
//! 流式对话返回类型 [`AgentSendStream`]。
//!
//! 逐块播出后端流数据,内部累积完整响应,Drop 时自动将结果存入 [`AgentContext`](super::AgentContext)。

use std::sync::Arc;

use kameo::prelude::*;

use super::actor::{AgentContext, SilentAppendMsg};
use super::event::ChangeEvent;
use super::types::ContextBackend;
use crate::error::AgentError;

// ---------------------------------------------------------------------------
// AgentSendStream — 流式对话返回类型
// ---------------------------------------------------------------------------

/// 流式对话的 Stream 返回类型。
///
/// 两层职责:
/// 1. **poll 时**:逐块播出后端流的数据,同时内部累积
/// 2. **Drop 时**:将累积的响应通过 [`extract_messages_from_backend_response`](ContextBackend::extract_messages_from_backend_response)
///    和 [`to_request_messages`](ContextBackend::to_request_messages) 转换后,自动存入 `AgentContext` 的 incremental 区
///
/// 无需手动处理 — 调用者消费 stream 直到结束,drop 时触发自动存储。
pub struct AgentSendStream<B: ContextBackend> {
    backend: B,
    inner:
        std::pin::Pin<Box<dyn futures_core::Stream<Item = Result<B::Response, AgentError>> + Send>>,
    accumulated: Vec<B::Response>,
    actor_ref: Option<ActorRef<AgentContext<B>>>,
    #[expect(clippy::type_complexity, reason = "回调类型不可避免复杂")]
    on_change: Option<Arc<dyn Fn(ChangeEvent<B::Message>) + Send + Sync>>,
}

impl<B: ContextBackend> AgentSendStream<B> {
    #[expect(clippy::type_complexity, reason = "回调类型不可避免复杂")]
    pub(crate) fn new(
        backend: B,
        inner: impl futures_core::Stream<Item = Result<B::Response, AgentError>> + Send + 'static,
        actor_ref: ActorRef<AgentContext<B>>,
        on_change: Option<Arc<dyn Fn(ChangeEvent<B::Message>) + Send + Sync>>,
    ) -> Self {
        Self {
            backend,
            inner: Box::pin(inner),
            accumulated: Vec::new(),
            actor_ref: Some(actor_ref),
            on_change,
        }
    }

    pub fn take_accumulated(&mut self) -> Vec<B::Response> {
        std::mem::take(&mut self.accumulated)
    }
}

impl<B: ContextBackend> futures_core::Stream for AgentSendStream<B> {
    type Item = Result<B::Response, AgentError>;

    fn poll_next(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
        let this = unsafe { self.as_mut().get_unchecked_mut() };
        match this.inner.as_mut().poll_next(cx) {
            std::task::Poll::Ready(Some(Ok(resp))) => {
                this.accumulated.push(resp.clone());
                std::task::Poll::Ready(Some(Ok(resp)))
            }
            std::task::Poll::Ready(Some(Err(e))) => std::task::Poll::Ready(Some(Err(e))),
            std::task::Poll::Ready(None) => std::task::Poll::Ready(None),
            std::task::Poll::Pending => std::task::Poll::Pending,
        }
    }
}

impl<B: ContextBackend> Drop for AgentSendStream<B> {
    fn drop(&mut self) {
        let responses = std::mem::take(&mut self.accumulated);
        if !responses.is_empty() {
            let backend = self.backend.clone();
            if let Some(actor_ref) = self.actor_ref.take() {
                let on_change = self.on_change.clone();
                tokio::spawn(async move {
                    if let Ok(raw_msgs) = backend.extract_messages_from_backend_response(&responses)
                    {
                        if let Ok(request_msgs) = backend.to_request_messages(raw_msgs) {
                            for m in request_msgs {
                                if let Some(ref cb) = on_change {
                                    cb(ChangeEvent::Appended(m.clone()));
                                }
                                if let Err(e) = actor_ref.tell(SilentAppendMsg { message: m }).await
                                {
                                    log::warn!("SilentAppendMsg 发送失败: {e:?}");
                                }
                            }
                        } else {
                            log::warn!("流式响应转换请求格式失败,已丢弃");
                        }
                    } else {
                        log::warn!("流式响应提取消息失败,已丢弃");
                    }
                });
            }
        }
    }
}

impl<B: ContextBackend> kameo::Reply for AgentSendStream<B> {
    type Ok = Self;
    type Error = std::convert::Infallible;
    type Value = Self;
    fn to_result(self) -> Result<Self::Ok, Self::Error> {
        Ok(self)
    }
    fn into_any_err(self) -> Option<Box<dyn kameo::reply::ReplyError>> {
        None
    }
    fn into_value(self) -> Self::Value {
        self
    }
}