use crate::supervisor::ForwardTo;
use anyhow::Result;
use async_trait::async_trait;
use crb_agent::{Address, Agent, AgentSession, DoAsync, Next, OnEvent, RunAgent};
use crb_core::{
Msg, Tag, mpsc,
time::{Duration, timeout},
};
use crb_runtime::InterruptionLevel;
use crb_send::{Recipient, Sender};
use futures::{
Stream, StreamExt,
stream::BoxStream,
task::{Context, Poll},
};
use std::pin::{Pin, pin};
use tokio_stream::wrappers::UnboundedReceiverStream;
pub struct Drainer<ITEM> {
stream: BoxStream<'static, ITEM>,
}
impl<ITEM> Drainer<ITEM>
where
ITEM: Msg,
{
pub fn new<S>(stream: S) -> Self
where
S: Stream<Item = ITEM> + Send + 'static,
{
Self {
stream: stream.boxed(),
}
}
pub fn from_mpsc(rx: mpsc::UnboundedReceiver<ITEM>) -> Self {
let stream = UnboundedReceiverStream::new(rx);
Drainer::new(stream)
}
}
impl<ITEM> Stream for Drainer<ITEM> {
type Item = ITEM;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
pin!(&mut self.get_mut().stream).poll_next(cx)
}
}
impl<A, ITEM, T> ForwardTo<A, T> for Drainer<ITEM>
where
A: OnEvent<ITEM, T>,
ITEM: Msg,
T: Tag + Sync + Clone,
{
type Runtime = RunAgent<DrainerTask<ITEM>>;
fn into_trackable(self, address: Address<A>, tag: T) -> Self::Runtime {
let task = DrainerTask {
recipient: address.recipient_tagged(tag),
stream: self.stream,
};
let mut runtime = RunAgent::new(task);
runtime.level = InterruptionLevel::ABORT;
runtime
}
}
pub struct DrainerTask<ITEM> {
recipient: Recipient<ITEM>,
stream: Pin<Box<dyn Stream<Item = ITEM> + Send>>,
}
impl<ITEM> Agent for DrainerTask<ITEM>
where
ITEM: Msg,
{
type Context = AgentSession<Self>;
fn begin(&mut self) -> Next<Self> {
Next::do_async(())
}
}
#[async_trait]
impl<ITEM> DoAsync for DrainerTask<ITEM>
where
ITEM: Msg,
{
async fn repeat(&mut self, _: &mut ()) -> Result<Option<Next<Self>>> {
let duration = Duration::from_secs(5);
match timeout(duration, self.stream.next()).await {
Ok(Some(item)) => {
self.recipient.send(item)?;
Ok(None)
}
Ok(None) => {
Ok(Some(Next::done()))
}
Err(_) => {
Ok(None)
}
}
}
}