intrepid-model 0.3.0

Manage complex async business logic with ease
Documentation
use futures::Stream;
use intrepid_core::Frame;
use tower::Service;

use crate::EventRepo;

use super::Subscription;

/// A backgrounded subscription. This process works like an "actor", as described in
/// [this blog post][actors]. It does the work to read from the event log and send
/// any new events to a [subscription handle][super::SubscriptionHandle]. Usually,
/// subscription processes aren't interacted with directly, but are created by
/// [`Subscriber::background`][super::Subscriber::background].
///
/// [actors]: https://ryhl.io/blog/actors-with-tokio/
pub struct SubscriptionWorker {
    completions: tokio::sync::mpsc::Receiver<Frame>,
    waker: tokio::sync::mpsc::Sender<()>,
}

impl SubscriptionWorker {
    /// Create a new subscription process with the given frame service.
    pub fn new<Repo, FrameService>(subscriber: Subscription<Repo>, service: FrameService) -> Self
    where
        Repo: EventRepo + Clone + Send + Sync + 'static,
        FrameService: Service<Frame> + Send + Sync + 'static,
        FrameService::Future: Send,
        <FrameService as Service<Frame>>::Response: Into<Frame>,
        <FrameService as Service<Frame>>::Error: Into<Frame>,
    {
        let (waker, mut wake) = tokio::sync::mpsc::channel(1);
        let (work_complete, completions) = tokio::sync::mpsc::channel(16);

        tokio::spawn(async move {
            let mut service = service;
            while let Some(()) = wake.recv().await {
                if let Ok(events) = subscriber.poll_events().await {
                    for event in events {
                        let frame = match service.call(event.into()).await {
                            Ok(frame) => frame.into(),
                            Err(error) => error.into(),
                        };
                        if let Err(error) = work_complete.send(frame).await {
                            tracing::error!("Failed to send frame: {:?}", error);
                            break;
                        }
                    }
                }
            }
        });

        let this = Self { waker, completions };

        this.wake();

        this
    }

    /// Trigger the subscription process to poll for new events.
    pub fn wake(&self) {
        let outbox = self.waker.clone();
        tokio::spawn(async move {
            let _ = outbox.send(()).await;
        });
    }
}

impl Stream for SubscriptionWorker {
    type Item = Frame;

    fn poll_next(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
        let poll = self.completions.poll_recv(cx);

        if poll.is_pending() {
            self.wake();
        }

        poll
    }
}