crb_superagent/interplay/
drainer.rs1use crate::supervisor::ForwardTo;
2use anyhow::Result;
3use async_trait::async_trait;
4use crb_agent::{Address, Agent, AgentSession, DoAsync, Next, OnEvent, RunAgent};
5use crb_core::{
6 Msg, Tag, mpsc,
7 time::{Duration, timeout},
8};
9use crb_runtime::InterruptionLevel;
10use crb_send::{Recipient, Sender};
11use futures::{
12 Stream, StreamExt,
13 stream::BoxStream,
14 task::{Context, Poll},
15};
16use std::pin::{Pin, pin};
17use tokio_stream::wrappers::UnboundedReceiverStream;
18
19pub struct Drainer<ITEM> {
20 stream: BoxStream<'static, ITEM>,
21}
22
23impl<ITEM> Drainer<ITEM>
24where
25 ITEM: Msg,
26{
27 pub fn new<S>(stream: S) -> Self
28 where
29 S: Stream<Item = ITEM> + Send + 'static,
30 {
31 Self {
32 stream: stream.boxed(),
33 }
34 }
35
36 pub fn from_mpsc(rx: mpsc::UnboundedReceiver<ITEM>) -> Self {
37 let stream = UnboundedReceiverStream::new(rx);
38 Drainer::new(stream)
39 }
40}
41
42impl<ITEM> Stream for Drainer<ITEM> {
43 type Item = ITEM;
44
45 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
46 pin!(&mut self.get_mut().stream).poll_next(cx)
47 }
48}
49
50impl<A, ITEM, T> ForwardTo<A, T> for Drainer<ITEM>
51where
52 A: OnEvent<ITEM, T>,
53 ITEM: Msg,
54 T: Tag + Sync + Clone,
55{
56 type Runtime = RunAgent<DrainerTask<ITEM>>;
57
58 fn into_trackable(self, address: Address<A>, tag: T) -> Self::Runtime {
59 let task = DrainerTask {
60 recipient: address.recipient_tagged(tag),
61 stream: self.stream,
62 };
63 let mut runtime = RunAgent::new(task);
64 runtime.level = InterruptionLevel::ABORT;
65 runtime
66 }
67}
68
69pub struct DrainerTask<ITEM> {
70 recipient: Recipient<ITEM>,
71 stream: Pin<Box<dyn Stream<Item = ITEM> + Send>>,
72}
73
74impl<ITEM> Agent for DrainerTask<ITEM>
75where
76 ITEM: Msg,
77{
78 type Context = AgentSession<Self>;
79 type Link = Address<Self>;
80
81 fn begin(&mut self) -> Next<Self> {
82 Next::do_async(())
83 }
84}
85
86#[async_trait]
87impl<ITEM> DoAsync for DrainerTask<ITEM>
88where
89 ITEM: Msg,
90{
91 async fn repeat(&mut self, _: &mut ()) -> Result<Option<Next<Self>>> {
92 let duration = Duration::from_secs(5);
93 match timeout(duration, self.stream.next()).await {
94 Ok(Some(item)) => {
95 self.recipient.send(item)?;
97 Ok(None)
98 }
99 Ok(None) => {
100 Ok(Some(Next::done()))
102 }
103 Err(_) => {
104 Ok(None)
106 }
107 }
108 }
109}