Skip to main content

intrepid_model/subscriptions/
subscription_collection.rs

1use std::pin::Pin;
2
3use futures::Stream;
4use intrepid_core::Frame;
5use tokio_stream::StreamMap;
6
7use crate::EventRepo;
8
9use super::SubscriptionConfig;
10
11/// A collection of subscriptions to streams of events.
12pub struct SubscriptionCollection {
13    inner: Vec<SubscriptionConfig>,
14}
15
16impl SubscriptionCollection {
17    /// Create a new subscription collection.
18    pub fn new<T: Into<SubscriptionConfig>>(inner: Vec<T>) -> Self {
19        Self {
20            inner: inner.into_iter().map(Into::into).collect(),
21        }
22    }
23
24    /// Collect all the streams from the subscriptions in the collection into a single stream.
25    pub fn stream<Repo>(&self, repo: Repo) -> StreamMap<String, Pin<Box<impl Stream<Item = Frame>>>>
26    where
27        Repo: EventRepo + Clone + Send + Sync + 'static,
28    {
29        let mut stream_map = StreamMap::new();
30        for listener in self.inner.clone() {
31            stream_map.insert(listener.stream_id(), Box::pin(listener.stream(&repo)));
32        }
33
34        stream_map
35    }
36}