use std::sync::Arc;
use kameo::prelude::*;
use super::actor::{AgentContext, SilentAppendMsg};
use super::event::ChangeEvent;
use super::types::ContextBackend;
use crate::error::AgentError;
pub struct AgentSendStream<B: ContextBackend> {
backend: B,
inner:
std::pin::Pin<Box<dyn futures_core::Stream<Item = Result<B::Response, AgentError>> + Send>>,
accumulated: Vec<B::Response>,
actor_ref: Option<ActorRef<AgentContext<B>>>,
#[expect(clippy::type_complexity, reason = "回调类型不可避免复杂")]
on_change: Option<Arc<dyn Fn(ChangeEvent<B::Message>) + Send + Sync>>,
}
impl<B: ContextBackend> AgentSendStream<B> {
#[expect(clippy::type_complexity, reason = "回调类型不可避免复杂")]
pub(crate) fn new(
backend: B,
inner: impl futures_core::Stream<Item = Result<B::Response, AgentError>> + Send + 'static,
actor_ref: ActorRef<AgentContext<B>>,
on_change: Option<Arc<dyn Fn(ChangeEvent<B::Message>) + Send + Sync>>,
) -> Self {
Self {
backend,
inner: Box::pin(inner),
accumulated: Vec::new(),
actor_ref: Some(actor_ref),
on_change,
}
}
pub fn take_accumulated(&mut self) -> Vec<B::Response> {
std::mem::take(&mut self.accumulated)
}
}
impl<B: ContextBackend> futures_core::Stream for AgentSendStream<B> {
type Item = Result<B::Response, AgentError>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = unsafe { self.as_mut().get_unchecked_mut() };
match this.inner.as_mut().poll_next(cx) {
std::task::Poll::Ready(Some(Ok(resp))) => {
this.accumulated.push(resp.clone());
std::task::Poll::Ready(Some(Ok(resp)))
}
std::task::Poll::Ready(Some(Err(e))) => std::task::Poll::Ready(Some(Err(e))),
std::task::Poll::Ready(None) => std::task::Poll::Ready(None),
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}
impl<B: ContextBackend> Drop for AgentSendStream<B> {
fn drop(&mut self) {
let responses = std::mem::take(&mut self.accumulated);
if !responses.is_empty() {
let backend = self.backend.clone();
if let Some(actor_ref) = self.actor_ref.take() {
let on_change = self.on_change.clone();
tokio::spawn(async move {
if let Ok(raw_msgs) = backend.extract_messages_from_backend_response(&responses)
{
if let Ok(request_msgs) = backend.to_request_messages(raw_msgs) {
for m in request_msgs {
if let Some(ref cb) = on_change {
cb(ChangeEvent::Appended(m.clone()));
}
if let Err(e) = actor_ref.tell(SilentAppendMsg { message: m }).await
{
log::warn!("SilentAppendMsg 发送失败: {e:?}");
}
}
} else {
log::warn!("流式响应转换请求格式失败,已丢弃");
}
} else {
log::warn!("流式响应提取消息失败,已丢弃");
}
});
}
}
}
}
impl<B: ContextBackend> kameo::Reply for AgentSendStream<B> {
type Ok = Self;
type Error = std::convert::Infallible;
type Value = Self;
fn to_result(self) -> Result<Self::Ok, Self::Error> {
Ok(self)
}
fn into_any_err(self) -> Option<Box<dyn kameo::reply::ReplyError>> {
None
}
fn into_value(self) -> Self::Value {
self
}
}