intrepid-model 0.3.0

Manage complex async business logic with ease
Documentation
use futures::Stream;
use intrepid_core::Frame;

use crate::EventRepo;

use super::Subscription;

/// A handle to a subscription process. Creating a subscription handle will start a
/// background process that will read events from the target stream, and send those
/// events to the handle's inbox for foreground processing.
pub struct SubscriptionStream {
    /// The inbox for the subscription process. This is a mpsc channel with a buffer
    /// of 8 frames.
    pub inbox: tokio::sync::mpsc::Receiver<Frame>,

    /// The outbox we use to tell the subscription process to poll for new events.
    pub outbox: tokio::sync::mpsc::Sender<()>,
}

impl SubscriptionStream {
    /// Create a new subscription handle.
    pub fn new<Repo>(subscriber: Subscription<Repo>) -> Self
    where
        Repo: EventRepo + Clone + Send + Sync + 'static,
    {
        let (waker_outbox, mut waker_inbox) = tokio::sync::mpsc::channel(1);
        let (frame_outbox, frame_inbox) = tokio::sync::mpsc::channel(16);

        tokio::spawn(async move {
            while let Some(()) = waker_inbox.recv().await {
                if let Ok(events) = subscriber.poll_events().await {
                    for event in events {
                        let _ = frame_outbox.send(event.into()).await;
                    }
                }
            }
        });

        let this = Self {
            inbox: frame_inbox,
            outbox: waker_outbox,
        };

        this.trigger_next_poll();

        this
    }

    /// Trigger the subscription process to poll for new events.
    pub fn trigger_next_poll(&self) {
        let outbox = self.outbox.clone();
        tokio::spawn(async move {
            let _ = outbox.send(()).await;
        });
    }
}

impl Stream for SubscriptionStream {
    type Item = Frame;

    fn poll_next(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
        let poll = self.inbox.poll_recv(cx);

        if poll.is_pending() {
            self.trigger_next_poll();
        }

        poll
    }
}