1pub mod adapters;
2pub mod event;
3pub mod handler;
4pub mod stream;
5
6use futures::future;
7use std::collections::HashMap;
8use tokio::sync::broadcast::{self, Receiver, Sender};
9use tokio::task::JoinHandle;
10
11use crate::event::{Event, FromEvent};
12use crate::handler::Handler;
13use crate::stream::StreamSource;
14
15pub struct Rivers {
16 events: Vec<Event>,
17 topics: HashMap<String, Sender<Event>>,
18 join_handles: Vec<JoinHandle<()>>,
19}
20
21impl Rivers {
22 pub fn new(events: Vec<Event>) -> Self {
23 Rivers {
24 events,
25 topics: HashMap::new(),
26 join_handles: vec![],
27 }
28 }
29
30 pub fn stream<S, H, T>(mut self, topic: S, handler: H) -> Self
31 where
32 S: AsRef<str>,
33 H: Handler<T>,
34 T: FromEvent + Clone + Send + 'static,
35 {
36 let rx = self.get_channel_for_topic(topic);
37
38 let handle = tokio::spawn(handler.call(StreamSource::new(rx)));
39
40 self.join_handles.push(handle);
41 self
42 }
43
44 pub async fn run(self) {
45 for (_, sender) in self.topics {
46 for e in &self.events {
47 sender.send(e.clone()).unwrap();
48 }
49 }
50
51 future::join_all(self.join_handles).await;
52 }
53
54 fn get_channel_for_topic(&mut self, topic: impl AsRef<str>) -> Receiver<Event> {
55 if let Some(tx) = self.topics.get(topic.as_ref()) {
56 return tx.subscribe();
57 }
58
59 let (tx, rx) = broadcast::channel(16);
60 self.topics.insert(topic.as_ref().to_string(), tx);
61
62 rx
63 }
64}