use std::collections::BTreeMap;
use std::sync::Arc;
use meerkat_mob::MobError;
use meerkat_mob::event::{AttributedEvent, MobEvent, MobEventKind};
use meerkat_mob::runtime::MobEventsView;
use serde_json::Value;
use tokio::sync::broadcast;
use crate::runtime::{MetadataScope, RuntimeMetadataTable};
use crate::types::MobStructuralEventEnvelope;
use crate::unified_runtime::EventQuery;
pub(crate) const QUERY_BATCH_SIZE: usize = 128;
pub(crate) const DEFAULT_QUERY_LIMIT: usize = 256;
const MOB_EVENTS_CHANNEL_CAP: usize = 512;
#[derive(Clone)]
pub struct MobEventsStore {
event_tx: broadcast::Sender<MobStructuralEventEnvelope>,
metadata_table: Option<Arc<RuntimeMetadataTable>>,
}
impl Default for MobEventsStore {
fn default() -> Self {
Self::new()
}
}
impl MobEventsStore {
pub fn new() -> Self {
let (event_tx, _) = broadcast::channel(MOB_EVENTS_CHANNEL_CAP);
Self {
event_tx,
metadata_table: None,
}
}
#[must_use]
pub fn with_metadata_table(mut self, table: Arc<RuntimeMetadataTable>) -> Self {
self.metadata_table = Some(table);
self
}
pub fn subscribe(&self) -> broadcast::Receiver<MobStructuralEventEnvelope> {
self.event_tx.subscribe()
}
pub async fn project_attributed_event(
&self,
_event: &AttributedEvent,
) -> Option<MobStructuralEventEnvelope> {
None
}
pub async fn project_mob_event(&self, event: &MobEvent) -> MobStructuralEventEnvelope {
let envelope = self.build_envelope(event).await;
let _ = self.event_tx.send(envelope.clone());
envelope
}
pub async fn project_event_for_query(&self, event: &MobEvent) -> MobStructuralEventEnvelope {
self.build_envelope(event).await
}
async fn build_envelope(&self, event: &MobEvent) -> MobStructuralEventEnvelope {
let cursor = event.cursor;
let mob_id = event.mob_id.as_str().to_string();
let timestamp_ms = event.timestamp.timestamp_millis().max(0) as u64;
let kind = event_kind_label(&event.kind).to_string();
let (run_id, step_id, agent_identity) = extract_structural_fields(&event.kind);
let data = serde_json::to_value(&event.kind).unwrap_or(Value::Null);
let (mob_labels, run_labels) = self.lookup_labels(&mob_id, run_id.as_deref()).await;
MobStructuralEventEnvelope {
event_id: format!("mob-evt-{cursor}"),
cursor,
mob_id,
timestamp_ms,
kind,
run_id,
step_id,
agent_identity,
mob_labels,
run_labels,
data,
}
}
async fn lookup_labels(
&self,
mob_id: &str,
run_id: Option<&str>,
) -> (BTreeMap<String, String>, BTreeMap<String, String>) {
let Some(table) = &self.metadata_table else {
return (BTreeMap::new(), BTreeMap::new());
};
let mob_labels = table
.get_labels(&MetadataScope::Mob(mob_id.to_string()))
.await;
let run_labels = match run_id {
Some(run_id) => {
table
.get_labels(&MetadataScope::Run(mob_id.to_string(), run_id.to_string()))
.await
}
None => BTreeMap::new(),
};
(mob_labels, run_labels)
}
}
pub const MOB_EVENTS_STREAM_PATH: &str = "/mobkit/mob_events/stream";
pub(crate) fn build_subscribe_url(
query: &EventQuery,
next_after_seq: Option<u64>,
fallback_cursor: u64,
) -> String {
let after_seq = next_after_seq
.or(query.after_seq)
.unwrap_or(fallback_cursor);
let mut serializer = form_urlencoded::Serializer::new(String::new());
serializer.append_pair("after_seq", &after_seq.to_string());
if let Some(value) = query.mob_id.as_deref() {
serializer.append_pair("mob_id", value);
}
if let Some(value) = query.run_id.as_deref() {
serializer.append_pair("run_id", value);
}
if let Some(value) = query.step_id.as_deref() {
serializer.append_pair("step_id", value);
}
if let Some(value) = query.identity.as_deref() {
serializer.append_pair("identity", value);
}
if let Some(value) = query.member_id.as_deref() {
serializer.append_pair("member_id", value);
}
if let Some(value) = query.since_ms {
serializer.append_pair("since_ms", &value.to_string());
}
if let Some(value) = query.until_ms {
serializer.append_pair("until_ms", &value.to_string());
}
if !query.event_types.is_empty() {
serializer.append_pair("event_types", &query.event_types.join(","));
}
format!("{MOB_EVENTS_STREAM_PATH}?{}", serializer.finish())
}
#[derive(Debug)]
pub enum MobEventsQueryError {
Stale {
after_cursor: u64,
latest_cursor: u64,
},
Backend(MobError),
}
impl std::fmt::Display for MobEventsQueryError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Stale {
after_cursor,
latest_cursor,
} => write!(
f,
"stale mob event cursor: requested {after_cursor}, latest {latest_cursor}"
),
Self::Backend(err) => write!(f, "{err}"),
}
}
}
impl std::error::Error for MobEventsQueryError {}
impl From<MobError> for MobEventsQueryError {
fn from(err: MobError) -> Self {
if let MobError::StaleEventCursor {
after_cursor,
latest_cursor,
} = err
{
Self::Stale {
after_cursor,
latest_cursor,
}
} else {
Self::Backend(err)
}
}
}
pub(crate) fn envelope_matches(envelope: &MobStructuralEventEnvelope, query: &EventQuery) -> bool {
if let Some(since) = query.since_ms
&& envelope.timestamp_ms < since
{
return false;
}
if let Some(until) = query.until_ms
&& envelope.timestamp_ms >= until
{
return false;
}
if let Some(mob_id) = query.mob_id.as_deref()
&& envelope.mob_id != mob_id
{
return false;
}
if let Some(run_id) = query.run_id.as_deref()
&& envelope.run_id.as_deref() != Some(run_id)
{
return false;
}
if let Some(step_id) = query.step_id.as_deref()
&& envelope.step_id.as_deref() != Some(step_id)
{
return false;
}
let identity_filter = query.identity.as_deref().or(query.member_id.as_deref());
if let Some(identity) = identity_filter
&& envelope.agent_identity.as_deref() != Some(identity)
{
return false;
}
if !query.event_types.is_empty() && !query.event_types.iter().any(|ty| ty == &envelope.kind) {
return false;
}
true
}
pub(crate) async fn query_ledger_with_filter(
events: &MobEventsView,
store: &MobEventsStore,
query: &EventQuery,
) -> Result<Vec<MobStructuralEventEnvelope>, MobEventsQueryError> {
let limit = query.limit.unwrap_or(DEFAULT_QUERY_LIMIT);
if limit == 0 {
return Ok(Vec::new());
}
if let Some(after_seq) = query.after_seq {
return scan_forward(events, store, query, after_seq, limit).await;
}
scan_backward(events, store, query, limit).await
}
async fn scan_forward(
events: &MobEventsView,
store: &MobEventsStore,
query: &EventQuery,
after_seq: u64,
limit: usize,
) -> Result<Vec<MobStructuralEventEnvelope>, MobEventsQueryError> {
let mut results: Vec<MobStructuralEventEnvelope> =
Vec::with_capacity(limit.min(QUERY_BATCH_SIZE));
let mut cursor = after_seq;
loop {
let batch = events.poll_strict(cursor, QUERY_BATCH_SIZE).await?;
if batch.is_empty() {
break;
}
let cursor_before_batch = cursor;
for event in batch {
cursor = cursor.max(event.cursor);
let envelope = store.project_event_for_query(&event).await;
if envelope_matches(&envelope, query) {
results.push(envelope);
if results.len() >= limit {
return Ok(results);
}
}
}
if cursor <= cursor_before_batch {
break;
}
}
Ok(results)
}
async fn scan_backward(
events: &MobEventsView,
store: &MobEventsStore,
query: &EventQuery,
limit: usize,
) -> Result<Vec<MobStructuralEventEnvelope>, MobEventsQueryError> {
let latest = events.latest_cursor().await?;
if latest == 0 {
return Ok(Vec::new());
}
let batch_size = QUERY_BATCH_SIZE as u64;
let mut window_end = latest;
let mut accumulator: Vec<MobStructuralEventEnvelope> = Vec::new();
loop {
let from = window_end.saturating_sub(batch_size);
let take = (window_end - from) as usize;
if take == 0 {
break;
}
let batch = events.poll_strict(from, take).await?;
if batch.is_empty() {
break;
}
let mut window_matches: Vec<MobStructuralEventEnvelope> = Vec::with_capacity(batch.len());
for event in batch {
let envelope = store.project_event_for_query(&event).await;
if envelope_matches(&envelope, query) {
window_matches.push(envelope);
}
}
let mut combined = Vec::with_capacity(window_matches.len() + accumulator.len());
combined.append(&mut window_matches);
combined.append(&mut accumulator);
accumulator = combined;
if accumulator.len() >= limit || from == 0 {
break;
}
window_end = from;
}
if accumulator.len() > limit {
let drop = accumulator.len() - limit;
accumulator.drain(0..drop);
}
Ok(accumulator)
}
fn event_kind_label(kind: &MobEventKind) -> &'static str {
match kind {
MobEventKind::MobCreated { .. } => "mob_created",
MobEventKind::MobCompleted => "mob_completed",
MobEventKind::MobDestroying => "mob_destroying",
MobEventKind::MobDestroyStorageFinalizing => "mob_destroy_storage_finalizing",
MobEventKind::MobReset => "mob_reset",
MobEventKind::MemberSpawned(_) => "member_spawned",
MobEventKind::MemberRetired { .. } => "member_retired",
MobEventKind::MemberReset { .. } => "member_reset",
MobEventKind::MemberKickoffUpdated { .. } => "member_kickoff_updated",
MobEventKind::MembersWired { .. } => "members_wired",
MobEventKind::MembersWiredBatch { .. } => "members_wired_batch",
MobEventKind::MembersUnwired { .. } => "members_unwired",
MobEventKind::ExternalPeerWired { .. } => "external_peer_wired",
MobEventKind::ExternalPeerUnwired { .. } => "external_peer_unwired",
MobEventKind::FlowStarted { .. } => "flow_started",
MobEventKind::FlowCompleted { .. } => "flow_completed",
MobEventKind::FlowFailed { .. } => "flow_failed",
MobEventKind::FlowCanceled { .. } => "flow_canceled",
MobEventKind::StepDispatched { .. } => "step_dispatched",
MobEventKind::StepTargetCompleted { .. } => "step_target_completed",
MobEventKind::StepTargetFailed { .. } => "step_target_failed",
MobEventKind::StepCompleted { .. } => "step_completed",
MobEventKind::StepFailed { .. } => "step_failed",
MobEventKind::StepSkipped { .. } => "step_skipped",
MobEventKind::TopologyViolation { .. } => "topology_violation",
MobEventKind::SupervisorEscalation { .. } => "supervisor_escalation",
MobEventKind::OperatorActionRecorded { .. } => "operator_action_recorded",
}
}
pub(crate) fn extract_structural_fields(
kind: &MobEventKind,
) -> (Option<String>, Option<String>, Option<String>) {
match kind {
MobEventKind::FlowStarted { run_id, .. }
| MobEventKind::FlowCompleted { run_id, .. }
| MobEventKind::FlowFailed { run_id, .. }
| MobEventKind::FlowCanceled { run_id, .. } => (Some(run_id.to_string()), None, None),
MobEventKind::StepDispatched {
run_id,
step_id,
target,
}
| MobEventKind::StepTargetCompleted {
run_id,
step_id,
target,
} => (
Some(run_id.to_string()),
Some(step_id.as_str().to_string()),
Some(target.identity.as_str().to_string()),
),
MobEventKind::StepTargetFailed {
run_id,
step_id,
target,
..
} => (
Some(run_id.to_string()),
Some(step_id.as_str().to_string()),
Some(target.identity.as_str().to_string()),
),
MobEventKind::StepCompleted { run_id, step_id }
| MobEventKind::StepFailed {
run_id, step_id, ..
}
| MobEventKind::StepSkipped {
run_id, step_id, ..
} => (
Some(run_id.to_string()),
Some(step_id.as_str().to_string()),
None,
),
MobEventKind::SupervisorEscalation {
run_id,
step_id,
escalated_to,
} => (
Some(run_id.to_string()),
Some(step_id.as_str().to_string()),
Some(escalated_to.as_str().to_string()),
),
MobEventKind::MemberSpawned(event) => {
(None, None, Some(event.agent_identity.as_str().to_string()))
}
MobEventKind::MemberRetired { agent_identity, .. }
| MobEventKind::MemberReset { agent_identity, .. } => {
(None, None, Some(agent_identity.as_str().to_string()))
}
MobEventKind::MemberKickoffUpdated { member, .. } => {
(None, None, Some(member.as_str().to_string()))
}
MobEventKind::ExternalPeerWired { local, .. }
| MobEventKind::ExternalPeerUnwired { local, .. } => {
(None, None, Some(local.as_str().to_string()))
}
MobEventKind::MobCreated { .. }
| MobEventKind::MobCompleted
| MobEventKind::MobDestroying
| MobEventKind::MobDestroyStorageFinalizing
| MobEventKind::MobReset
| MobEventKind::MembersWired { .. }
| MobEventKind::MembersWiredBatch { .. }
| MobEventKind::MembersUnwired { .. }
| MobEventKind::TopologyViolation { .. }
| MobEventKind::OperatorActionRecorded { .. } => (None, None, None),
}
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
mod tests {
use super::*;
use chrono::Utc;
use meerkat_mob::event::{MemberSpawnedEvent, MemberWireEdge};
use meerkat_mob::ids::{
AgentIdentity, AgentRuntimeId, FenceToken, FlowId, Generation, MobId, ProfileName, RunId,
StepId,
};
fn mob_event(cursor: u64, kind: MobEventKind) -> MobEvent {
MobEvent {
cursor,
timestamp: Utc::now(),
mob_id: MobId::from("test-mob"),
kind,
}
}
#[tokio::test]
async fn projects_flow_started_with_run_id_and_upstream_cursor() {
let store = MobEventsStore::new();
let run_id = RunId::new();
let envelope = store
.project_mob_event(&mob_event(
42,
MobEventKind::FlowStarted {
run_id: run_id.clone(),
flow_id: FlowId::from("flow-a"),
params: serde_json::json!({}),
},
))
.await;
assert_eq!(envelope.kind, "flow_started");
assert_eq!(envelope.cursor, 42);
assert_eq!(envelope.event_id, "mob-evt-42");
assert_eq!(
envelope.run_id.as_deref(),
Some(run_id.to_string().as_str())
);
assert_eq!(envelope.step_id, None);
assert_eq!(envelope.mob_id, "test-mob");
}
#[tokio::test]
async fn projects_step_dispatched_with_run_step_target() {
let store = MobEventsStore::new();
let identity = AgentIdentity::from("worker-1");
let run_id = RunId::new();
let envelope = store
.project_mob_event(&mob_event(
7,
MobEventKind::StepDispatched {
run_id: run_id.clone(),
step_id: StepId::from("step-a"),
target: AgentRuntimeId::initial(identity),
},
))
.await;
assert_eq!(envelope.kind, "step_dispatched");
assert_eq!(envelope.cursor, 7);
assert_eq!(
envelope.run_id.as_deref(),
Some(run_id.to_string().as_str())
);
assert_eq!(envelope.step_id.as_deref(), Some("step-a"));
assert_eq!(envelope.agent_identity.as_deref(), Some("worker-1"));
}
#[tokio::test]
async fn projects_member_spawned_with_identity() {
let store = MobEventsStore::new();
let identity = AgentIdentity::from("researcher");
let envelope = store
.project_mob_event(&mob_event(
3,
MobEventKind::MemberSpawned(MemberSpawnedEvent::new(
identity.clone(),
Generation::INITIAL,
FenceToken::new(1),
AgentRuntimeId::initial(identity),
ProfileName::from("worker"),
)),
))
.await;
assert_eq!(envelope.kind, "member_spawned");
assert_eq!(envelope.agent_identity.as_deref(), Some("researcher"));
}
#[tokio::test]
async fn projects_members_wired_batch_as_compact_structural_event() {
let store = MobEventsStore::new();
let envelope = store
.project_mob_event(&mob_event(
8,
MobEventKind::MembersWiredBatch {
edges: vec![MemberWireEdge {
a: AgentIdentity::from("alpha"),
b: AgentIdentity::from("beta"),
}],
},
))
.await;
assert_eq!(envelope.kind, "members_wired_batch");
assert_eq!(envelope.agent_identity, None);
assert_eq!(envelope.data["edges"][0]["a"], serde_json::json!("alpha"));
assert_eq!(envelope.data["edges"][0]["b"], serde_json::json!("beta"));
}
#[tokio::test]
async fn project_event_for_query_does_not_broadcast() {
let store = MobEventsStore::new();
let mut rx = store.subscribe();
let _ = store
.project_event_for_query(&mob_event(
1,
MobEventKind::FlowStarted {
run_id: RunId::new(),
flow_id: FlowId::from("flow-a"),
params: serde_json::json!({}),
},
))
.await;
assert!(rx.try_recv().is_err());
}
}