intrepid-model 0.3.0

Manage complex async business logic with ease
Documentation
use futures::Stream;
use intrepid_core::Frame;
use std::{str::FromStr, time::Duration};

use crate::EventRepo;

use super::{Subscription, SubscriptionName, SubscriptionWorker};

/// A configuration for a subscription to a stream of events.
#[derive(Clone)]
pub struct SubscriptionConfig {
    /// A unique identifier for the subscription. See [`SubscriptionConfig::stream_id`] for
    /// a fully qualified identifier.
    pub id: uuid::Uuid,
    /// The name of the stream that the subscription is listening to. This is the target
    /// stream that the subscription is reading from, and is combined with the id to form
    /// the fully qualified stream id.
    pub stream_name: SubscriptionName,
    /// The frequency at which the subscription should poll the target stream for new events.
    /// Used for throttling the stream, so that it doesn't consume too many resources or hammer
    /// any external services.
    pub frequency: Duration,
}

impl SubscriptionConfig {
    /// Create a new subscription configuration.
    pub fn new(id: uuid::Uuid, stream_name: SubscriptionName, frequency: Duration) -> Self {
        Self {
            id,
            stream_name,
            frequency,
        }
    }

    /// A unique identifier for the subscription.
    pub fn stream_id(&self) -> String {
        format!(
            "{id}@{stream_name}",
            id = self.id,
            stream_name = self.stream_name
        )
    }

    /// Resumes a subscription with the given id.
    pub fn subscription<Repo>(&self, repo: &Repo) -> Result<Subscription<Repo>, String>
    where
        Repo: EventRepo + Clone + Send + Sync + 'static,
    {
        Subscription::resume(repo, self.id, self.stream_name.clone())
    }

    /// Construct a throttled stream of frames from the subscription.
    pub fn stream<Repo>(&self, repo: &Repo) -> impl Stream<Item = Frame>
    where
        Repo: EventRepo + Clone + Send + Sync + 'static,
    {
        let stream = self.subscription(repo).unwrap().stream();
        tokio_stream::StreamExt::throttle(stream, self.frequency)
    }

    /// Start a worker stream with the given service.
    pub fn worker(&self, worker: SubscriptionWorker) -> impl Stream<Item = Frame> {
        tokio_stream::StreamExt::throttle(worker, self.frequency)
    }

    fn from_str<T: Into<SubscriptionName>>(stream_name: T) -> Self {
        let id = uuid::Uuid::from_str("0588c89e-b8e3-48af-ab2b-3a8d1961dcef").unwrap();

        Self::new(id, stream_name.into(), Duration::from_secs(1))
    }
}

impl<T: AsRef<str>> From<T> for SubscriptionConfig {
    fn from(inner: T) -> Self {
        Self::from_str(SubscriptionName::parse_string(inner.as_ref()).unwrap())
    }
}