intrepid_model/subscriptions/
subscription_worker_map.rs

1use std::collections::HashMap;
2
3use futures::Stream;
4use intrepid_core::Frame;
5use tokio_stream::StreamMap;
6use tower::Service;
7
8use crate::EventRepo;
9
10use super::{SubscriptionConfig, SubscriptionWorker};
11
12/// A map of subscription processes which can be run as a stream.
13pub struct SubscriptionWorkerMap {
14    inner: HashMap<String, (SubscriptionConfig, SubscriptionWorker)>,
15}
16
17impl SubscriptionWorkerMap {
18    /// Create a new subscription collection.
19    pub fn new() -> Self {
20        Self {
21            inner: HashMap::new(),
22        }
23    }
24
25    /// Insert a subscription process into the map using a config and a frame service.
26    /// The subscription process will be started immediately, but won't do anything until
27    /// it is polled.
28    pub fn insert<Repo, FrameService>(
29        &mut self,
30        repo: &Repo,
31        config: impl Into<SubscriptionConfig>,
32        service: FrameService,
33    ) where
34        Repo: EventRepo + Clone + Send + Sync + 'static,
35        FrameService: Service<Frame> + Clone + Send + Sync + 'static,
36        FrameService::Future: Send,
37        <FrameService as Service<Frame>>::Response: Into<Frame>,
38        <FrameService as Service<Frame>>::Error: Into<Frame>,
39    {
40        let config = config.into();
41        let subscription = config.subscription(repo).unwrap();
42        let process = SubscriptionWorker::new(subscription, service.clone());
43
44        self.inner.insert(config.stream_id(), (config, process));
45    }
46
47    /// Consume the process map and collect all the workers from it into a single stream.
48    pub fn stream(self) -> impl Stream<Item = (String, Frame)> {
49        let mut stream_map = StreamMap::new();
50
51        for (id, (config, process)) in self.inner {
52            stream_map.insert(id, Box::pin(config.worker(process)));
53        }
54
55        stream_map
56    }
57}
58
59impl Default for SubscriptionWorkerMap {
60    fn default() -> Self {
61        Self::new()
62    }
63}