agent_context/context/
stream.rs1use super::types::ContextBackend;
7use crate::error::AgentError;
8
9pub struct AgentSendStream<B: ContextBackend> {
18 inner:
19 std::pin::Pin<Box<dyn futures_core::Stream<Item = Result<B::Response, AgentError>> + Send>>,
20 chunks: Vec<B::Response>,
21}
22
23impl<B: ContextBackend> AgentSendStream<B> {
24 pub(crate) fn new(
25 inner: impl futures_core::Stream<Item = Result<B::Response, AgentError>> + Send + 'static,
26 ) -> Self {
27 Self {
28 inner: Box::pin(inner),
29 chunks: Vec::new(),
30 }
31 }
32
33 pub fn take_chunks(&mut self) -> Vec<B::Response> {
35 std::mem::take(&mut self.chunks)
36 }
37}
38
39impl<B: ContextBackend> futures_core::Stream for AgentSendStream<B> {
40 type Item = Result<B::Response, AgentError>;
41
42 fn poll_next(
43 mut self: std::pin::Pin<&mut Self>,
44 cx: &mut std::task::Context<'_>,
45 ) -> std::task::Poll<Option<Self::Item>> {
46 let this = unsafe { self.as_mut().get_unchecked_mut() };
47 match this.inner.as_mut().poll_next(cx) {
48 std::task::Poll::Ready(Some(Ok(resp))) => {
49 this.chunks.push(resp.clone());
50 std::task::Poll::Ready(Some(Ok(resp)))
51 }
52 std::task::Poll::Ready(Some(Err(e))) => std::task::Poll::Ready(Some(Err(e))),
53 std::task::Poll::Ready(None) => std::task::Poll::Ready(None),
54 std::task::Poll::Pending => std::task::Poll::Pending,
55 }
56 }
57}
58
59impl<B: ContextBackend> kameo::Reply for AgentSendStream<B> {
60 type Ok = Self;
61 type Error = std::convert::Infallible;
62 type Value = Self;
63 fn to_result(self) -> Result<Self::Ok, Self::Error> {
64 Ok(self)
65 }
66 fn into_any_err(self) -> Option<Box<dyn kameo::reply::ReplyError>> {
67 None
68 }
69 fn into_value(self) -> Self::Value {
70 self
71 }
72}