rivers/
lib.rs

1pub mod adapters;
2pub mod event;
3pub mod handler;
4pub mod stream;
5
6use futures::future;
7use std::collections::HashMap;
8use tokio::sync::broadcast::{self, Receiver, Sender};
9use tokio::task::JoinHandle;
10
11use crate::event::{Event, FromEvent};
12use crate::handler::Handler;
13use crate::stream::StreamSource;
14
15pub struct Rivers {
16    events: Vec<Event>,
17    topics: HashMap<String, Sender<Event>>,
18    join_handles: Vec<JoinHandle<()>>,
19}
20
21impl Rivers {
22    pub fn new(events: Vec<Event>) -> Self {
23        Rivers {
24            events,
25            topics: HashMap::new(),
26            join_handles: vec![],
27        }
28    }
29
30    pub fn stream<S, H, T>(mut self, topic: S, handler: H) -> Self
31    where
32        S: AsRef<str>,
33        H: Handler<T>,
34        T: FromEvent + Clone + Send + 'static,
35    {
36        let rx = self.get_channel_for_topic(topic);
37
38        let handle = tokio::spawn(handler.call(StreamSource::new(rx)));
39
40        self.join_handles.push(handle);
41        self
42    }
43
44    pub async fn run(self) {
45        for (_, sender) in self.topics {
46            for e in &self.events {
47                sender.send(e.clone()).unwrap();
48            }
49        }
50
51        future::join_all(self.join_handles).await;
52    }
53
54    fn get_channel_for_topic(&mut self, topic: impl AsRef<str>) -> Receiver<Event> {
55        if let Some(tx) = self.topics.get(topic.as_ref()) {
56            return tx.subscribe();
57        }
58
59        let (tx, rx) = broadcast::channel(16);
60        self.topics.insert(topic.as_ref().to_string(), tx);
61
62        rx
63    }
64}