crb_superagent/interplay/
drainer.rs

1use crate::supervisor::ForwardTo;
2use anyhow::Result;
3use async_trait::async_trait;
4use crb_agent::{Address, Agent, AgentSession, DoAsync, Next, OnEvent, RunAgent};
5use crb_core::{
6    Msg, Tag, mpsc,
7    time::{Duration, timeout},
8};
9use crb_runtime::InterruptionLevel;
10use crb_send::{Recipient, Sender};
11use futures::{
12    Stream, StreamExt,
13    stream::BoxStream,
14    task::{Context, Poll},
15};
16use std::pin::{Pin, pin};
17use tokio_stream::wrappers::UnboundedReceiverStream;
18
19pub struct Drainer<ITEM> {
20    stream: BoxStream<'static, ITEM>,
21}
22
23impl<ITEM> Drainer<ITEM>
24where
25    ITEM: Msg,
26{
27    pub fn new<S>(stream: S) -> Self
28    where
29        S: Stream<Item = ITEM> + Send + 'static,
30    {
31        Self {
32            stream: stream.boxed(),
33        }
34    }
35
36    pub fn from_mpsc(rx: mpsc::UnboundedReceiver<ITEM>) -> Self {
37        let stream = UnboundedReceiverStream::new(rx);
38        Drainer::new(stream)
39    }
40}
41
42impl<ITEM> Stream for Drainer<ITEM> {
43    type Item = ITEM;
44
45    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
46        pin!(&mut self.get_mut().stream).poll_next(cx)
47    }
48}
49
50impl<A, ITEM, T> ForwardTo<A, T> for Drainer<ITEM>
51where
52    A: OnEvent<ITEM, T>,
53    ITEM: Msg,
54    T: Tag + Sync + Clone,
55{
56    type Runtime = RunAgent<DrainerTask<ITEM>>;
57
58    fn into_trackable(self, address: Address<A>, tag: T) -> Self::Runtime {
59        let task = DrainerTask {
60            recipient: address.recipient_tagged(tag),
61            stream: self.stream,
62        };
63        let mut runtime = RunAgent::new(task);
64        runtime.level = InterruptionLevel::ABORT;
65        runtime
66    }
67}
68
69pub struct DrainerTask<ITEM> {
70    recipient: Recipient<ITEM>,
71    stream: Pin<Box<dyn Stream<Item = ITEM> + Send>>,
72}
73
74impl<ITEM> Agent for DrainerTask<ITEM>
75where
76    ITEM: Msg,
77{
78    type Context = AgentSession<Self>;
79    type Link = Address<Self>;
80
81    fn begin(&mut self) -> Next<Self> {
82        Next::do_async(())
83    }
84}
85
86#[async_trait]
87impl<ITEM> DoAsync for DrainerTask<ITEM>
88where
89    ITEM: Msg,
90{
91    async fn repeat(&mut self, _: &mut ()) -> Result<Option<Next<Self>>> {
92        let duration = Duration::from_secs(5);
93        match timeout(duration, self.stream.next()).await {
94            Ok(Some(item)) => {
95                // The next item forwarding
96                self.recipient.send(item)?;
97                Ok(None)
98            }
99            Ok(None) => {
100                // Stream is ended
101                Ok(Some(Next::done()))
102            }
103            Err(_) => {
104                // Timeout, try again
105                Ok(None)
106            }
107        }
108    }
109}