nitinol_eventstream/
eventstream.rs

1use tokio::sync::broadcast::error::RecvError;
2use tokio::sync::broadcast::{self, Sender as BroadcastSender, Receiver};
3use nitinol_core::event::Event;
4use nitinol_core::identifier::EntityId;
5use nitinol_process::Status;
6use nitinol_protocol::Payload;
7use nitinol_resolver::mapping::{Mapper, ResolveMapping};
8use crate::process::WithEventSubscriber;
9
10#[derive(Clone)]
11pub struct EventStream {
12    root: BroadcastSender<Payload>,
13}
14
15impl Default for EventStream {
16    fn default() -> Self {
17        Self::new()
18    }
19}
20
21impl EventStream {
22    fn new() -> Self {
23        let (root, terminal) = broadcast::channel(256);
24       
25        tokio::spawn(async move { 
26            let mut dead_letter: Receiver<Payload> = terminal;
27            while let Ok(payload) = dead_letter.recv().await { 
28                tracing::trace!("Streamed event {}#{}", payload.id, payload.registry_key);
29            }
30        });
31        
32        Self { root }
33    }
34}
35
36impl EventStream {
37    pub async fn publish<E: Event>(&self, id: EntityId, seq: i64, event: &E) {
38        self.root.send(Payload::new(id, seq, event).unwrap()).unwrap();
39    }
40
41    pub async fn subscribe<S: ResolveMapping>(&self, subscriber: S) {
42        let mut mapping = Mapper::default();
43        S::mapping(&mut mapping);
44        
45        let mapping = mapping.filter(|key| key.handler().eq(crate::resolver::HANDLER_TYPE));
46        
47        let rx = self.root.subscribe();
48
49        tokio::spawn(async move {
50            let mapping = mapping;
51            let mut rx = rx;
52            let mut subscriber = Some(subscriber);
53            
54            loop {
55                match rx.recv().await {
56                    Ok(payload) => {
57                        if let Some(resolver) = mapping.find(|key| key.event().eq(&payload.registry_key)) {
58                            if let Err(e) = resolver.resolve(&mut subscriber, &payload.bytes).await {
59                                tracing::error!("{:?}", e);
60                            }
61                        }
62                    }
63                    Err(RecvError::Closed) => {
64                        break;
65                    }
66                    Err(RecvError::Lagged(seq)) => {
67                        tracing::warn!("Lagged event stream: {}", seq);
68                    }
69                }
70            }
71        });
72    }
73    
74    pub(crate) async fn subscribe_in_process<E: Event, P: WithEventSubscriber<E>>(&self, mapping: Mapper<P>, status: Status) {
75        let mapping = mapping.filter(|key| key.handler().eq(crate::process::resolver::RESOLVE_TYPE));
76        let rx = self.root.subscribe();
77        tokio::spawn(async move {
78            let mut rx = rx;
79            loop {
80                match rx.recv().await {
81                    Ok(payload) => {
82                        if let Some(resolver) = mapping.find(|key| key.event().eq(&payload.registry_key)) {
83                            if let Err(e) = resolver.resolve(&mut None, &payload.bytes).await {
84                                tracing::error!("{:?}", e);
85                            }
86                        }
87                    }
88                    Err(RecvError::Closed) => {
89                        break;
90                    }
91                    Err(RecvError::Lagged(seq)) => {
92                        tracing::warn!("Lagged event stream: {}", seq);
93                        continue;
94                    }
95                }
96                if !status.is_active().await {
97                    break;
98                }
99            }
100        });
101    }
102}