intrepid-model 0.3.0

Manage complex async business logic with ease
Documentation
use std::collections::HashMap;

use futures::Stream;
use intrepid_core::Frame;
use tokio_stream::StreamMap;
use tower::Service;

use crate::EventRepo;

use super::{SubscriptionConfig, SubscriptionWorker};

/// A map of subscription processes which can be run as a stream.
pub struct SubscriptionWorkerMap {
    inner: HashMap<String, (SubscriptionConfig, SubscriptionWorker)>,
}

impl SubscriptionWorkerMap {
    /// Create a new subscription collection.
    pub fn new() -> Self {
        Self {
            inner: HashMap::new(),
        }
    }

    /// Insert a subscription process into the map using a config and a frame service.
    /// The subscription process will be started immediately, but won't do anything until
    /// it is polled.
    pub fn insert<Repo, FrameService>(
        &mut self,
        repo: &Repo,
        config: impl Into<SubscriptionConfig>,
        service: FrameService,
    ) where
        Repo: EventRepo + Clone + Send + Sync + 'static,
        FrameService: Service<Frame> + Clone + Send + Sync + 'static,
        FrameService::Future: Send,
        <FrameService as Service<Frame>>::Response: Into<Frame>,
        <FrameService as Service<Frame>>::Error: Into<Frame>,
    {
        let config = config.into();
        let subscription = config.subscription(repo).unwrap();
        let process = SubscriptionWorker::new(subscription, service.clone());

        self.inner.insert(config.stream_id(), (config, process));
    }

    /// Consume the process map and collect all the workers from it into a single stream.
    pub fn stream(self) -> impl Stream<Item = (String, Frame)> {
        let mut stream_map = StreamMap::new();

        for (id, (config, process)) in self.inner {
            stream_map.insert(id, Box::pin(config.worker(process)));
        }

        stream_map
    }
}

impl Default for SubscriptionWorkerMap {
    fn default() -> Self {
        Self::new()
    }
}