use std::collections::HashMap;
use futures::Stream;
use intrepid_core::Frame;
use tokio_stream::StreamMap;
use tower::Service;
use crate::EventRepo;
use super::{SubscriptionConfig, SubscriptionWorker};
pub struct SubscriptionWorkerMap {
inner: HashMap<String, (SubscriptionConfig, SubscriptionWorker)>,
}
impl SubscriptionWorkerMap {
pub fn new() -> Self {
Self {
inner: HashMap::new(),
}
}
pub fn insert<Repo, FrameService>(
&mut self,
repo: &Repo,
config: impl Into<SubscriptionConfig>,
service: FrameService,
) where
Repo: EventRepo + Clone + Send + Sync + 'static,
FrameService: Service<Frame> + Clone + Send + Sync + 'static,
FrameService::Future: Send,
<FrameService as Service<Frame>>::Response: Into<Frame>,
<FrameService as Service<Frame>>::Error: Into<Frame>,
{
let config = config.into();
let subscription = config.subscription(repo).unwrap();
let process = SubscriptionWorker::new(subscription, service.clone());
self.inner.insert(config.stream_id(), (config, process));
}
pub fn stream(self) -> impl Stream<Item = (String, Frame)> {
let mut stream_map = StreamMap::new();
for (id, (config, process)) in self.inner {
stream_map.insert(id, Box::pin(config.worker(process)));
}
stream_map
}
}
impl Default for SubscriptionWorkerMap {
fn default() -> Self {
Self::new()
}
}