Skip to main content

meerkat_mob/runtime/
event_router.rs

1//! Mob-level event bus that merges per-member session streams into a
2//! single `mpsc` channel of [`AttributedEvent`]s.
3//!
4//! The router runs as an independent tokio task:
5//! 1. Bootstraps by subscribing to all current roster members.
6//! 2. Polls the [`MobEventStore`] for `MeerkatSpawned`/`MeerkatRetired` to
7//!    track roster changes and subscribe/unsubscribe streams.
8//! 3. Tags events with [`AttributedEvent`] and forwards to the receiver.
9//!
10//! Streams for retired members end naturally when sessions are archived.
11
12use crate::event::AttributedEvent;
13use crate::ids::{MeerkatId, ProfileName};
14use crate::store::MobEventStore;
15#[cfg(target_arch = "wasm32")]
16use crate::tokio;
17
18use super::RosterAuthority;
19use super::session_service::MobSessionService;
20use futures::stream::{SelectAll, StreamExt};
21use meerkat_core::comms::EventStream;
22use meerkat_core::service::SessionService;
23use meerkat_core::types::SessionId;
24use std::collections::HashSet;
25use std::sync::Arc;
26use std::time::Duration;
27use tokio::sync::{RwLock, mpsc};
28use tokio_util::sync::CancellationToken;
29
30/// Configuration for the [`MobEventRouter`].
31pub struct MobEventRouterConfig {
32    /// How often to poll the mob event store for roster changes.
33    pub poll_interval: Duration,
34    /// Capacity of the output `mpsc` channel.
35    pub channel_capacity: usize,
36}
37
38impl Default for MobEventRouterConfig {
39    fn default() -> Self {
40        Self {
41            poll_interval: Duration::from_millis(500),
42            channel_capacity: 256,
43        }
44    }
45}
46
47/// Handle returned by [`spawn_event_router`]. Drop to stop the router.
48pub struct MobEventRouterHandle {
49    /// Receive attributed events from all mob members.
50    pub event_rx: mpsc::Receiver<AttributedEvent>,
51    cancel: CancellationToken,
52}
53
54impl MobEventRouterHandle {
55    /// Explicitly cancel the router task.
56    pub fn cancel(&self) {
57        self.cancel.cancel();
58    }
59}
60
61impl Drop for MobEventRouterHandle {
62    fn drop(&mut self) {
63        self.cancel.cancel();
64    }
65}
66
67/// Spawn the event router task and return its handle.
68pub(super) fn spawn_event_router(
69    session_service: Arc<dyn MobSessionService>,
70    events: Arc<dyn MobEventStore>,
71    roster: Arc<RwLock<RosterAuthority>>,
72    config: MobEventRouterConfig,
73) -> MobEventRouterHandle {
74    let (event_tx, event_rx) = mpsc::channel(config.channel_capacity);
75    let cancel = CancellationToken::new();
76    let cancel_clone = cancel.clone();
77
78    tokio::spawn(async move {
79        run_event_router(
80            session_service,
81            events,
82            roster,
83            config,
84            event_tx,
85            cancel_clone,
86        )
87        .await;
88    });
89
90    MobEventRouterHandle { event_rx, cancel }
91}
92
93#[allow(clippy::ignored_unit_patterns)]
94async fn run_event_router(
95    session_service: Arc<dyn MobSessionService>,
96    events: Arc<dyn MobEventStore>,
97    roster: Arc<RwLock<RosterAuthority>>,
98    config: MobEventRouterConfig,
99    event_tx: mpsc::Sender<AttributedEvent>,
100    cancel: CancellationToken,
101) {
102    let mut merged: SelectAll<TaggedStream> = SelectAll::new();
103    let mut tracked_ids: HashSet<MeerkatId> = HashSet::new();
104    let mut mob_cursor: u64 = 0;
105
106    // Bootstrap: subscribe to all current roster members.
107    {
108        let roster_snap = roster.read().await;
109        for entry in roster_snap.list() {
110            if let Some(session_id) = entry.member_ref.session_id()
111                && tracked_ids.insert(entry.meerkat_id.clone())
112                && let Some(stream) = subscribe_member(
113                    &session_service,
114                    session_id,
115                    entry.meerkat_id.clone(),
116                    entry.profile.clone(),
117                )
118                .await
119            {
120                merged.push(stream);
121            }
122        }
123    }
124
125    // Seed cursor from current event store.
126    if let Ok(all_events) = events.poll(0, usize::MAX).await
127        && let Some(last) = all_events.last()
128    {
129        mob_cursor = last.cursor;
130    }
131
132    let mut poll_interval = tokio::time::interval(config.poll_interval);
133    #[cfg(not(target_arch = "wasm32"))]
134    poll_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
135
136    loop {
137        tokio::select! {
138            () = cancel.cancelled() => break,
139
140            // Forward attributed events from member streams.
141            Some((meerkat_id, profile, envelope)) = merged.next() => {
142                let attributed = AttributedEvent {
143                    source: meerkat_id,
144                    profile,
145                    envelope,
146                };
147                if event_tx.send(attributed).await.is_err() {
148                    // Receiver dropped — shut down.
149                    break;
150                }
151            }
152
153            // Poll mob events for roster changes.
154            _ = poll_interval.tick() => {
155                let new_events = match events.poll(mob_cursor, 100).await {
156                    Ok(evts) => evts,
157                    Err(_) => continue,
158                };
159                for mob_event in new_events {
160                    mob_cursor = mob_event.cursor;
161                    match mob_event.kind {
162                        crate::event::MobEventKind::MeerkatSpawned {
163                            meerkat_id,
164                            role,
165                            member_ref,
166                            ..
167                        } => {
168                            if let Some(sid) = member_ref.session_id()
169                                && tracked_ids.insert(meerkat_id.clone())
170                                && let Some(stream) = subscribe_member(
171                                    &session_service,
172                                    sid,
173                                    meerkat_id,
174                                    role,
175                                )
176                                .await
177                            {
178                                merged.push(stream);
179                            }
180                        }
181                        crate::event::MobEventKind::MeerkatRetired {
182                            meerkat_id, ..
183                        } => {
184                            tracked_ids.remove(&meerkat_id);
185                        }
186                        _ => {}
187                    }
188                }
189            }
190        }
191    }
192}
193
194/// A tagged stream that yields (MeerkatId, ProfileName, EventEnvelope).
195type TaggedItem = (
196    MeerkatId,
197    ProfileName,
198    meerkat_core::event::EventEnvelope<meerkat_core::event::AgentEvent>,
199);
200type TaggedStream = futures::stream::Map<
201    EventStream,
202    Box<
203        dyn FnMut(meerkat_core::event::EventEnvelope<meerkat_core::event::AgentEvent>) -> TaggedItem
204            + Send,
205    >,
206>;
207
208async fn subscribe_member(
209    session_service: &Arc<dyn MobSessionService>,
210    session_id: &SessionId,
211    meerkat_id: MeerkatId,
212    profile: ProfileName,
213) -> Option<TaggedStream> {
214    let stream = SessionService::subscribe_session_events(session_service.as_ref(), session_id)
215        .await
216        .ok()?;
217    let mid = meerkat_id;
218    let prof = profile;
219    Some(stream.map(
220        Box::new(move |envelope| (mid.clone(), prof.clone(), envelope))
221            as Box<
222                dyn FnMut(
223                        meerkat_core::event::EventEnvelope<meerkat_core::event::AgentEvent>,
224                    ) -> TaggedItem
225                    + Send,
226            >,
227    ))
228}