use super::*;
pub fn normalize_event_line(line: &str) -> Result<EventEnvelope<UnifiedEvent>, NormalizationError> {
if let Ok(envelope) = parse_unified_event_line(line) {
return enforce_source_consistency(envelope);
}
let value: Value = serde_json::from_str(line).map_err(|_| NormalizationError::InvalidJson)?;
let object = value.as_object().ok_or(NormalizationError::InvalidSchema)?;
let event_id = required_string(object.get("event_id"), "event_id")?;
let source = required_string(object.get("source"), "source")?;
let timestamp_ms = required_u64(object.get("timestamp_ms"), "timestamp_ms")?;
if let Some(module) = object.get("module") {
let module = required_string(Some(module), "module")?;
let event_type = required_string(object.get("event_type"), "event_type")?;
let payload = object
.get("payload")
.ok_or(NormalizationError::MissingField("payload"))?
.clone();
return enforce_source_consistency(EventEnvelope {
event_id,
source,
timestamp_ms,
event: UnifiedEvent::Module(ModuleEvent {
module,
event_type,
payload,
}),
});
}
let agent_id = required_string(object.get("agent_id"), "agent_id")?;
let event_type = required_string(object.get("event_type"), "event_type")?;
let payload = object.get("payload").cloned();
enforce_source_consistency(EventEnvelope {
event_id,
source,
timestamp_ms,
event: UnifiedEvent::Agent {
agent_id,
event_type,
payload,
},
})
}
impl MobkitRuntimeHandle {
pub(crate) fn append_normalized_event(
&mut self,
event: EventEnvelope<UnifiedEvent>,
) -> Result<(), NormalizationError> {
let event = enforce_source_consistency(event)?;
insert_event_sorted(&mut self.merged_events, event);
Ok(())
}
pub fn merged_events(&self) -> &[EventEnvelope<UnifiedEvent>] {
&self.merged_events
}
pub fn subscribe_events(
&self,
request: SubscribeRequest,
) -> Result<SubscribeResponse, SubscribeError> {
if let Some(checkpoint) = request.last_event_id.as_ref()
&& checkpoint.trim().is_empty()
{
return Err(SubscribeError::EmptyCheckpoint);
}
if matches!(request.scope, SubscribeScope::Agent) {
let agent_id = request
.agent_id
.as_deref()
.ok_or(SubscribeError::MissingAgentId)?;
if agent_id.trim().is_empty() {
return Err(SubscribeError::InvalidAgentId);
}
}
let scoped_events: Vec<_> = self
.merged_events
.iter()
.filter(|event| event_matches_request(event, &request))
.collect();
let skip = scoped_events
.len()
.saturating_sub(SUBSCRIBE_REPLAY_EVENT_CAP);
let bounded = &scoped_events[skip..];
let replay_slice = match request.last_event_id.as_ref() {
Some(checkpoint) => {
let start_idx = bounded
.iter()
.position(|event| event.event_id == *checkpoint)
.ok_or_else(|| SubscribeError::UnknownCheckpoint(checkpoint.clone()))?;
&bounded[start_idx..]
}
None => bounded,
};
let replay_events: Vec<_> = replay_slice.iter().map(|e| (*e).clone()).collect();
let event_frames = replay_events
.iter()
.map(build_sse_event_frame)
.collect::<Vec<_>>();
Ok(SubscribeResponse {
scope: request.scope,
replay_from_event_id: request.last_event_id,
keep_alive: SubscribeKeepAlive {
interval_ms: SSE_KEEP_ALIVE_INTERVAL_MS,
event: SSE_KEEP_ALIVE_EVENT_NAME.to_string(),
},
keep_alive_comment: SSE_KEEP_ALIVE_COMMENT_FRAME.to_string(),
event_frames,
events: replay_events,
})
}
}
pub(super) fn merge_unified_events(
mut module_events: Vec<EventEnvelope<UnifiedEvent>>,
mut agent_events: Vec<EventEnvelope<UnifiedEvent>>,
) -> Vec<EventEnvelope<UnifiedEvent>> {
let mut merged = Vec::with_capacity(module_events.len() + agent_events.len());
merged.append(&mut module_events);
merged.append(&mut agent_events);
merged.sort_by(|left, right| {
left.timestamp_ms
.cmp(&right.timestamp_ms)
.then_with(|| left.event_id.cmp(&right.event_id))
.then_with(|| left.source.cmp(&right.source))
});
merged
}
fn event_matches_request(event: &EventEnvelope<UnifiedEvent>, request: &SubscribeRequest) -> bool {
match request.scope {
SubscribeScope::Mob => true,
SubscribeScope::Agent => match &event.event {
UnifiedEvent::Agent { agent_id, .. } => request
.agent_id
.as_deref()
.map(|selected| selected == agent_id)
.unwrap_or(false),
UnifiedEvent::Module(_) => false,
},
SubscribeScope::Interaction => match &event.event {
UnifiedEvent::Agent { event_type, .. } => event_type.starts_with("interaction"),
UnifiedEvent::Module(module_event) => {
module_event.event_type.starts_with("interaction")
}
},
}
}
fn build_sse_event_frame(event: &EventEnvelope<UnifiedEvent>) -> String {
let event_name = match &event.event {
UnifiedEvent::Agent { event_type, .. } => event_type.as_str(),
UnifiedEvent::Module(module_event) => module_event.event_type.as_str(),
};
let payload = serde_json::to_string(&event.event).unwrap_or_else(|_| "{}".to_string());
format!(
"id: {}\nevent: {}\ndata: {}\n\n",
event.event_id, event_name, payload
)
}
fn enforce_source_consistency(
envelope: EventEnvelope<UnifiedEvent>,
) -> Result<EventEnvelope<UnifiedEvent>, NormalizationError> {
let expected = match &envelope.event {
UnifiedEvent::Agent { .. } => "agent",
UnifiedEvent::Module(_) => "module",
};
if envelope.source != expected {
return Err(NormalizationError::SourceMismatch {
expected,
got: envelope.source,
});
}
Ok(envelope)
}
fn required_string(
value: Option<&Value>,
field: &'static str,
) -> Result<String, NormalizationError> {
let value = value.ok_or(NormalizationError::MissingField(field))?;
let text = value
.as_str()
.ok_or(NormalizationError::InvalidFieldType(field))?;
Ok(text.to_string())
}
fn required_u64(value: Option<&Value>, field: &'static str) -> Result<u64, NormalizationError> {
let value = value.ok_or(NormalizationError::MissingField(field))?;
value
.as_u64()
.ok_or(NormalizationError::InvalidFieldType(field))
}
pub(super) fn insert_event_sorted(
events: &mut Vec<EventEnvelope<UnifiedEvent>>,
event: EventEnvelope<UnifiedEvent>,
) {
let insertion_index = events
.binary_search_by(|existing| {
existing
.timestamp_ms
.cmp(&event.timestamp_ms)
.then_with(|| existing.event_id.cmp(&event.event_id))
.then_with(|| existing.source.cmp(&event.source))
})
.unwrap_or_else(|index| index);
events.insert(insertion_index, event);
}