intrepid-model 0.3.0

Manage complex async business logic with ease
Documentation
use intrepid_core::Frame;
use tower::Service;

use crate::{Event, EventKind, EventLog, EventRepo, EventRepoError};

use super::{
    SubscriptionMarker, SubscriptionName, SubscriptionStream, SubscriptionTopic, SubscriptionWorker,
};

/// A subscription to a stream of events. Subscriptions read events from a target stream
/// and manage a tracking stream to measure their progress. Subscriptions generate their own
/// ids when they are created, meaning every subscription is unique and replays the target
/// stream from the beginning. In order to avoid replaying the target stream, use the
/// `Subscription::resume` method and give it the id of an existing subscription.
#[derive(Clone, Debug)]
pub struct Subscription<Repo> {
    /// The given repository that the subscriber reads from.
    pub repo: Repo,
    /// The id of the subscription. This is used to track the subscription's progress.
    pub id: uuid::Uuid,
    /// The name of the stream that this connection marks reads in. This is essentially
    /// the identity of the connection. Changing this will cause the connection to
    /// read from the beginning of the target stream.
    /// The name of the stream that the subscriber tracks its reads in. This is more or
    /// less the identity of the subscriber. Changing this will cause the subscriber to
    /// read from the beginning of the target stream.
    pub name: SubscriptionName,
    /// The name of the stream that this connection is reading from. The connection will
    /// ignore events from other streams.
    pub target_stream: SubscriptionName,
}

impl<Repo> Subscription<Repo>
where
    Repo: EventRepo + Clone,
{
    /// Create a new subscription to a stream of events, generating a new id for the
    /// subscription.
    pub fn init(repo: &Repo, name: impl SubscriptionTopic) -> Result<Self, String> {
        let id = uuid::Uuid::new_v4();

        Self::resume(repo, id, name)
    }

    /// Resume a subscription to a stream of events, using the given id to track the
    /// subscription's progress. Use this to avoid replaying the target stream entirely,
    /// instead picking up at the last read position.
    pub fn resume(
        repo: &Repo,
        id: uuid::Uuid,
        name: impl SubscriptionTopic,
    ) -> Result<Self, String> {
        let target_stream = name.stream_name();
        let own_name: SubscriptionName = format!("{target_stream}-{id}").parse()?;

        Ok(Self {
            name: own_name.clone(),
            target_stream,
            repo: repo.clone(),
            id,
        })
    }

    /// Turn the subscription into a background process that you can listen to asynchronously.
    /// This returns a [`SubscriptionStream`] that you can use to listen to the subscription,
    /// and manipulate with [`StreamExt`] methods.
    pub fn stream(&self) -> SubscriptionStream
    where
        Repo: Clone + Send + Sync + 'static,
    {
        SubscriptionStream::new(self.clone())
    }

    /// Create a background process and process all pending events in the target stream
    /// using the given frame service.
    pub fn worker<FrameService>(
        &self,
        service: FrameService,
    ) -> Result<SubscriptionWorker, EventRepoError>
    where
        Repo: Clone + Send + Sync + 'static,
        FrameService: Service<Frame> + Send + Sync + 'static,
        FrameService::Future: Send,
        <FrameService as Service<Frame>>::Response: Into<Frame>,
        <FrameService as Service<Frame>>::Error: Into<Frame>,
    {
        Ok(SubscriptionWorker::new(self.clone(), service))
    }

    /// Process all pending events in the target stream.
    pub async fn call_with_service<FrameService>(
        &self,
        mut service: FrameService,
    ) -> Result<(), EventRepoError>
    where
        Repo: Clone + Send + Sync + 'static,
        FrameService: Service<Frame> + Send + Sync + 'static,
        FrameService::Future: Send,
    {
        for event in self.poll_events().await? {
            let _ = service.call(event.into()).await;
        }

        Ok(())
    }

    /// Get the last read position for a subscriber.
    pub async fn last_read_position(&self) -> i64 {
        match self.repo.last(&self.name, EventKind::Marker).await {
            Some(message) => {
                let marker = serde_json::from_slice::<SubscriptionMarker>(&message.data).unwrap();
                marker.position
            }
            None => -1_i64,
        }
    }

    /// Get all of the events in the target stream that have occurred since the last read
    /// marker.
    pub async fn pending_events(&self) -> Result<EventLog, EventRepoError> {
        let last_read = self.last_read_position().await;

        let events = self
            .repo
            .entries_since_position(&self.target_stream, last_read)
            .await?;

        Ok(events)
    }

    /// Poll for any pending events in the target stream.
    pub async fn poll_events(&self) -> Result<EventLog, EventRepoError>
    where
        Repo: Clone + Send + Sync + 'static,
    {
        let events = self.pending_events().await?;
        if let Some(last_message) = events.last() {
            self.log_last_read(last_message.position).await?;

            return Ok(events);
        }

        Ok(vec![])
    }

    /// Log the last read position for a subscriber. This leverages markers.
    pub async fn log_last_read(&self, position: i64) -> Result<(), EventRepoError> {
        let marker = Event::read_marker(&self.name, &self.target_stream, position);

        self.repo.publish(marker).await?;

        Ok(())
    }
}