nitinol_eventstream/
eventstream.rs1use tokio::sync::broadcast::error::RecvError;
2use tokio::sync::broadcast::{self, Sender as BroadcastSender, Receiver};
3use nitinol_core::event::Event;
4use nitinol_core::identifier::EntityId;
5use nitinol_process::Status;
6use nitinol_protocol::Payload;
7use nitinol_resolver::mapping::{Mapper, ResolveMapping};
8use crate::process::WithEventSubscriber;
9
10#[derive(Clone)]
11pub struct EventStream {
12 root: BroadcastSender<Payload>,
13}
14
15impl Default for EventStream {
16 fn default() -> Self {
17 Self::new()
18 }
19}
20
21impl EventStream {
22 fn new() -> Self {
23 let (root, terminal) = broadcast::channel(256);
24
25 tokio::spawn(async move {
26 let mut dead_letter: Receiver<Payload> = terminal;
27 while let Ok(payload) = dead_letter.recv().await {
28 tracing::trace!("Streamed event {}#{}", payload.id, payload.registry_key);
29 }
30 });
31
32 Self { root }
33 }
34}
35
36impl EventStream {
37 pub async fn publish<E: Event>(&self, id: EntityId, seq: i64, event: &E) {
38 self.root.send(Payload::new(id, seq, event).unwrap()).unwrap();
39 }
40
41 pub async fn subscribe<S: ResolveMapping>(&self, subscriber: S) {
42 let mut mapping = Mapper::default();
43 S::mapping(&mut mapping);
44
45 let mapping = mapping.filter(|key| key.handler().eq(crate::resolver::HANDLER_TYPE));
46
47 let rx = self.root.subscribe();
48
49 tokio::spawn(async move {
50 let mapping = mapping;
51 let mut rx = rx;
52 let mut subscriber = Some(subscriber);
53
54 loop {
55 match rx.recv().await {
56 Ok(payload) => {
57 if let Some(resolver) = mapping.find(|key| key.event().eq(&payload.registry_key)) {
58 if let Err(e) = resolver.resolve(&mut subscriber, &payload.bytes).await {
59 tracing::error!("{:?}", e);
60 }
61 }
62 }
63 Err(RecvError::Closed) => {
64 break;
65 }
66 Err(RecvError::Lagged(seq)) => {
67 tracing::warn!("Lagged event stream: {}", seq);
68 }
69 }
70 }
71 });
72 }
73
74 pub(crate) async fn subscribe_in_process<E: Event, P: WithEventSubscriber<E>>(&self, mapping: Mapper<P>, status: Status) {
75 let mapping = mapping.filter(|key| key.handler().eq(crate::process::resolver::RESOLVE_TYPE));
76 let rx = self.root.subscribe();
77 tokio::spawn(async move {
78 let mut rx = rx;
79 loop {
80 match rx.recv().await {
81 Ok(payload) => {
82 if let Some(resolver) = mapping.find(|key| key.event().eq(&payload.registry_key)) {
83 if let Err(e) = resolver.resolve(&mut None, &payload.bytes).await {
84 tracing::error!("{:?}", e);
85 }
86 }
87 }
88 Err(RecvError::Closed) => {
89 break;
90 }
91 Err(RecvError::Lagged(seq)) => {
92 tracing::warn!("Lagged event stream: {}", seq);
93 continue;
94 }
95 }
96 if !status.is_active().await {
97 break;
98 }
99 }
100 });
101 }
102}