intrepid-model 0.3.0

Manage complex async business logic with ease
Documentation
use std::pin::Pin;

use futures::Stream;
use intrepid_core::Frame;
use tokio_stream::StreamMap;

use crate::EventRepo;

use super::SubscriptionConfig;

/// A collection of subscriptions to streams of events.
pub struct SubscriptionCollection {
    inner: Vec<SubscriptionConfig>,
}

impl SubscriptionCollection {
    /// Create a new subscription collection.
    pub fn new<T: Into<SubscriptionConfig>>(inner: Vec<T>) -> Self {
        Self {
            inner: inner.into_iter().map(Into::into).collect(),
        }
    }

    /// Collect all the streams from the subscriptions in the collection into a single stream.
    pub fn stream<Repo>(&self, repo: Repo) -> StreamMap<String, Pin<Box<impl Stream<Item = Frame>>>>
    where
        Repo: EventRepo + Clone + Send + Sync + 'static,
    {
        let mut stream_map = StreamMap::new();
        for listener in self.inner.clone() {
            stream_map.insert(listener.stream_id(), Box::pin(listener.stream(&repo)));
        }

        stream_map
    }
}