use crate::event::AttributedEvent;
use crate::ids::{MeerkatId, ProfileName};
use crate::roster::Roster;
use crate::store::MobEventStore;
#[cfg(target_arch = "wasm32")]
use crate::tokio;
use super::session_service::MobSessionService;
use futures::stream::{SelectAll, StreamExt};
use meerkat_core::comms::EventStream;
use meerkat_core::service::SessionService;
use meerkat_core::types::SessionId;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{RwLock, mpsc};
use tokio_util::sync::CancellationToken;
pub struct MobEventRouterConfig {
pub poll_interval: Duration,
pub channel_capacity: usize,
}
impl Default for MobEventRouterConfig {
fn default() -> Self {
Self {
poll_interval: Duration::from_millis(500),
channel_capacity: 256,
}
}
}
pub struct MobEventRouterHandle {
pub event_rx: mpsc::Receiver<AttributedEvent>,
cancel: CancellationToken,
}
impl MobEventRouterHandle {
pub fn cancel(&self) {
self.cancel.cancel();
}
}
impl Drop for MobEventRouterHandle {
fn drop(&mut self) {
self.cancel.cancel();
}
}
pub(super) fn spawn_event_router(
session_service: Arc<dyn MobSessionService>,
events: Arc<dyn MobEventStore>,
roster: Arc<RwLock<Roster>>,
config: MobEventRouterConfig,
) -> MobEventRouterHandle {
let (event_tx, event_rx) = mpsc::channel(config.channel_capacity);
let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
tokio::spawn(async move {
run_event_router(
session_service,
events,
roster,
config,
event_tx,
cancel_clone,
)
.await;
});
MobEventRouterHandle { event_rx, cancel }
}
#[allow(clippy::ignored_unit_patterns)]
async fn run_event_router(
session_service: Arc<dyn MobSessionService>,
events: Arc<dyn MobEventStore>,
roster: Arc<RwLock<Roster>>,
config: MobEventRouterConfig,
event_tx: mpsc::Sender<AttributedEvent>,
cancel: CancellationToken,
) {
let mut merged: SelectAll<TaggedStream> = SelectAll::new();
let mut tracked_ids: HashSet<MeerkatId> = HashSet::new();
let mut mob_cursor: u64 = 0;
{
let roster_snap = roster.read().await;
for entry in roster_snap.list() {
if let Some(session_id) = entry.member_ref.session_id()
&& tracked_ids.insert(entry.meerkat_id.clone())
&& let Some(stream) = subscribe_member(
&session_service,
session_id,
entry.meerkat_id.clone(),
entry.profile.clone(),
)
.await
{
merged.push(stream);
}
}
}
if let Ok(all_events) = events.poll(0, usize::MAX).await
&& let Some(last) = all_events.last()
{
mob_cursor = last.cursor;
}
let mut poll_interval = tokio::time::interval(config.poll_interval);
#[cfg(not(target_arch = "wasm32"))]
poll_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
() = cancel.cancelled() => break,
Some((meerkat_id, profile, envelope)) = merged.next() => {
let attributed = AttributedEvent {
source: meerkat_id,
profile,
envelope,
};
if event_tx.send(attributed).await.is_err() {
break;
}
}
_ = poll_interval.tick() => {
let new_events = match events.poll(mob_cursor, 100).await {
Ok(evts) => evts,
Err(_) => continue,
};
for mob_event in new_events {
mob_cursor = mob_event.cursor;
match mob_event.kind {
crate::event::MobEventKind::MeerkatSpawned {
meerkat_id,
role,
member_ref,
..
} => {
if let Some(sid) = member_ref.session_id()
&& tracked_ids.insert(meerkat_id.clone())
&& let Some(stream) = subscribe_member(
&session_service,
sid,
meerkat_id,
role,
)
.await
{
merged.push(stream);
}
}
crate::event::MobEventKind::MeerkatRetired {
meerkat_id, ..
} => {
tracked_ids.remove(&meerkat_id);
}
_ => {}
}
}
}
}
}
}
type TaggedItem = (
MeerkatId,
ProfileName,
meerkat_core::event::EventEnvelope<meerkat_core::event::AgentEvent>,
);
type TaggedStream = futures::stream::Map<
EventStream,
Box<
dyn FnMut(meerkat_core::event::EventEnvelope<meerkat_core::event::AgentEvent>) -> TaggedItem
+ Send,
>,
>;
async fn subscribe_member(
session_service: &Arc<dyn MobSessionService>,
session_id: &SessionId,
meerkat_id: MeerkatId,
profile: ProfileName,
) -> Option<TaggedStream> {
let stream = SessionService::subscribe_session_events(session_service.as_ref(), session_id)
.await
.ok()?;
let mid = meerkat_id;
let prof = profile;
Some(stream.map(
Box::new(move |envelope| (mid.clone(), prof.clone(), envelope))
as Box<
dyn FnMut(
meerkat_core::event::EventEnvelope<meerkat_core::event::AgentEvent>,
) -> TaggedItem
+ Send,
>,
))
}