meerkat_mob/runtime/
event_router.rs1use crate::event::AttributedEvent;
13use crate::ids::{MeerkatId, ProfileName};
14use crate::store::MobEventStore;
15#[cfg(target_arch = "wasm32")]
16use crate::tokio;
17
18use super::RosterAuthority;
19use super::session_service::MobSessionService;
20use futures::stream::{SelectAll, StreamExt};
21use meerkat_core::comms::EventStream;
22use meerkat_core::service::SessionService;
23use meerkat_core::types::SessionId;
24use std::collections::HashSet;
25use std::sync::Arc;
26use std::time::Duration;
27use tokio::sync::{RwLock, mpsc};
28use tokio_util::sync::CancellationToken;
29
30pub struct MobEventRouterConfig {
32 pub poll_interval: Duration,
34 pub channel_capacity: usize,
36}
37
38impl Default for MobEventRouterConfig {
39 fn default() -> Self {
40 Self {
41 poll_interval: Duration::from_millis(500),
42 channel_capacity: 256,
43 }
44 }
45}
46
47pub struct MobEventRouterHandle {
49 pub event_rx: mpsc::Receiver<AttributedEvent>,
51 cancel: CancellationToken,
52}
53
54impl MobEventRouterHandle {
55 pub fn cancel(&self) {
57 self.cancel.cancel();
58 }
59}
60
61impl Drop for MobEventRouterHandle {
62 fn drop(&mut self) {
63 self.cancel.cancel();
64 }
65}
66
67pub(super) fn spawn_event_router(
69 session_service: Arc<dyn MobSessionService>,
70 events: Arc<dyn MobEventStore>,
71 roster: Arc<RwLock<RosterAuthority>>,
72 config: MobEventRouterConfig,
73) -> MobEventRouterHandle {
74 let (event_tx, event_rx) = mpsc::channel(config.channel_capacity);
75 let cancel = CancellationToken::new();
76 let cancel_clone = cancel.clone();
77
78 tokio::spawn(async move {
79 run_event_router(
80 session_service,
81 events,
82 roster,
83 config,
84 event_tx,
85 cancel_clone,
86 )
87 .await;
88 });
89
90 MobEventRouterHandle { event_rx, cancel }
91}
92
93#[allow(clippy::ignored_unit_patterns)]
94async fn run_event_router(
95 session_service: Arc<dyn MobSessionService>,
96 events: Arc<dyn MobEventStore>,
97 roster: Arc<RwLock<RosterAuthority>>,
98 config: MobEventRouterConfig,
99 event_tx: mpsc::Sender<AttributedEvent>,
100 cancel: CancellationToken,
101) {
102 let mut merged: SelectAll<TaggedStream> = SelectAll::new();
103 let mut tracked_ids: HashSet<MeerkatId> = HashSet::new();
104 let mut mob_cursor: u64 = 0;
105
106 {
108 let roster_snap = roster.read().await;
109 for entry in roster_snap.list() {
110 if let Some(session_id) = entry.member_ref.session_id()
111 && tracked_ids.insert(entry.meerkat_id.clone())
112 && let Some(stream) = subscribe_member(
113 &session_service,
114 session_id,
115 entry.meerkat_id.clone(),
116 entry.profile.clone(),
117 )
118 .await
119 {
120 merged.push(stream);
121 }
122 }
123 }
124
125 if let Ok(all_events) = events.poll(0, usize::MAX).await
127 && let Some(last) = all_events.last()
128 {
129 mob_cursor = last.cursor;
130 }
131
132 let mut poll_interval = tokio::time::interval(config.poll_interval);
133 #[cfg(not(target_arch = "wasm32"))]
134 poll_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
135
136 loop {
137 tokio::select! {
138 () = cancel.cancelled() => break,
139
140 Some((meerkat_id, profile, envelope)) = merged.next() => {
142 let attributed = AttributedEvent {
143 source: meerkat_id,
144 profile,
145 envelope,
146 };
147 if event_tx.send(attributed).await.is_err() {
148 break;
150 }
151 }
152
153 _ = poll_interval.tick() => {
155 let new_events = match events.poll(mob_cursor, 100).await {
156 Ok(evts) => evts,
157 Err(_) => continue,
158 };
159 for mob_event in new_events {
160 mob_cursor = mob_event.cursor;
161 match mob_event.kind {
162 crate::event::MobEventKind::MeerkatSpawned {
163 meerkat_id,
164 role,
165 member_ref,
166 ..
167 } => {
168 if let Some(sid) = member_ref.session_id()
169 && tracked_ids.insert(meerkat_id.clone())
170 && let Some(stream) = subscribe_member(
171 &session_service,
172 sid,
173 meerkat_id,
174 role,
175 )
176 .await
177 {
178 merged.push(stream);
179 }
180 }
181 crate::event::MobEventKind::MeerkatRetired {
182 meerkat_id, ..
183 } => {
184 tracked_ids.remove(&meerkat_id);
185 }
186 _ => {}
187 }
188 }
189 }
190 }
191 }
192}
193
194type TaggedItem = (
196 MeerkatId,
197 ProfileName,
198 meerkat_core::event::EventEnvelope<meerkat_core::event::AgentEvent>,
199);
200type TaggedStream = futures::stream::Map<
201 EventStream,
202 Box<
203 dyn FnMut(meerkat_core::event::EventEnvelope<meerkat_core::event::AgentEvent>) -> TaggedItem
204 + Send,
205 >,
206>;
207
208async fn subscribe_member(
209 session_service: &Arc<dyn MobSessionService>,
210 session_id: &SessionId,
211 meerkat_id: MeerkatId,
212 profile: ProfileName,
213) -> Option<TaggedStream> {
214 let stream = SessionService::subscribe_session_events(session_service.as_ref(), session_id)
215 .await
216 .ok()?;
217 let mid = meerkat_id;
218 let prof = profile;
219 Some(stream.map(
220 Box::new(move |envelope| (mid.clone(), prof.clone(), envelope))
221 as Box<
222 dyn FnMut(
223 meerkat_core::event::EventEnvelope<meerkat_core::event::AgentEvent>,
224 ) -> TaggedItem
225 + Send,
226 >,
227 ))
228}