intrepid_model/subscriptions/
subscription_worker_map.rs1use std::collections::HashMap;
2
3use futures::Stream;
4use intrepid_core::Frame;
5use tokio_stream::StreamMap;
6use tower::Service;
7
8use crate::EventRepo;
9
10use super::{SubscriptionConfig, SubscriptionWorker};
11
12pub struct SubscriptionWorkerMap {
14 inner: HashMap<String, (SubscriptionConfig, SubscriptionWorker)>,
15}
16
17impl SubscriptionWorkerMap {
18 pub fn new() -> Self {
20 Self {
21 inner: HashMap::new(),
22 }
23 }
24
25 pub fn insert<Repo, FrameService>(
29 &mut self,
30 repo: &Repo,
31 config: impl Into<SubscriptionConfig>,
32 service: FrameService,
33 ) where
34 Repo: EventRepo + Clone + Send + Sync + 'static,
35 FrameService: Service<Frame> + Clone + Send + Sync + 'static,
36 FrameService::Future: Send,
37 <FrameService as Service<Frame>>::Response: Into<Frame>,
38 <FrameService as Service<Frame>>::Error: Into<Frame>,
39 {
40 let config = config.into();
41 let subscription = config.subscription(repo).unwrap();
42 let process = SubscriptionWorker::new(subscription, service.clone());
43
44 self.inner.insert(config.stream_id(), (config, process));
45 }
46
47 pub fn stream(self) -> impl Stream<Item = (String, Frame)> {
49 let mut stream_map = StreamMap::new();
50
51 for (id, (config, process)) in self.inner {
52 stream_map.insert(id, Box::pin(config.worker(process)));
53 }
54
55 stream_map
56 }
57}
58
59impl Default for SubscriptionWorkerMap {
60 fn default() -> Self {
61 Self::new()
62 }
63}