moirai-pump 0.8.3

Event-pump runtime for Moirai job system
Documentation
use flume::{Receiver, bounded};
use moirai::{job::JobLocation, queue::JobQueue};
use std::{
    pin::Pin,
    task::{Context, Poll},
    time::Duration,
};

type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send + Sync>>;
type Factory<Input, Output> = Box<dyn Fn(Input) -> BoxFuture<Output> + Send + Sync>;

pub struct EventPump<Input, Output>
where
    Input: Send + Sync + 'static,
    Output: Send + Sync + 'static,
{
    queue: JobQueue,
    factory: Factory<Input, Output>,
}

impl<Input, Output> EventPump<Input, Output>
where
    Input: Send + Sync + 'static,
    Output: Send + Sync + 'static,
{
    pub fn new<Fun, Fut>(factory: Fun) -> Self
    where
        Fun: Fn(Input) -> Fut + Send + Sync + 'static,
        Fut: Future<Output = Output> + Send + Sync + 'static,
    {
        let queue = JobQueue::default();
        Self {
            queue,
            factory: Box::new(move |input| Box::pin(factory(input))),
        }
    }

    pub fn send(&self, input: Input) -> Receiver<Output> {
        let future = (self.factory)(input);
        let (sender, receiver) = bounded(1);
        self.queue.spawn(JobLocation::Local, async move {
            let output = future.await;
            sender.send(output).unwrap();
        });
        receiver
    }

    pub fn pump(&self) {
        self.queue.run(
            JobLocation::Local,
            true,
            Duration::MAX,
            Default::default(),
            Default::default(),
            Default::default(),
            Default::default(),
        );
    }

    pub fn pump_all(&self) {
        while !self.is_complete() {
            self.pump();
        }
    }

    pub fn is_complete(&self) -> bool {
        self.queue.is_empty()
    }
}

impl<Input, Output> Future for EventPump<Input, Output>
where
    Input: Send + Sync + 'static,
    Output: Send + Sync + 'static,
{
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.pump();
        if self.is_complete() {
            cx.waker().wake_by_ref();
            Poll::Ready(())
        } else {
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use flume::TryRecvError;
    use moirai::coroutine::yield_now;

    #[test]
    fn test_event_pump() {
        let pump = EventPump::new(|input: i32| async move {
            let value = input * 2;
            yield_now().await;
            value.to_string()
        });
        let receiver = pump.send(21);
        pump.pump();
        assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
        pump.pump();
        assert_eq!(receiver.try_recv().unwrap().as_str(), "42");
    }

    #[pollster::test]
    async fn test_event_pump_async() {
        let pump = EventPump::new(|input: i32| async move {
            let value = input * 2;
            yield_now().await;
            value.to_string()
        });
        let receiver = pump.send(21);
        pump.await;
        assert_eq!(receiver.recv_async().await.unwrap().as_str(), "42");
    }
}