meerkat_mobkit/runtime/
event_transport.rs1use super::*;
4
5pub fn normalize_event_line(line: &str) -> Result<EventEnvelope<UnifiedEvent>, NormalizationError> {
6 if let Ok(envelope) = parse_unified_event_line(line) {
7 return enforce_source_consistency(envelope);
8 }
9
10 let value: Value = serde_json::from_str(line).map_err(|_| NormalizationError::InvalidJson)?;
11 let object = value.as_object().ok_or(NormalizationError::InvalidSchema)?;
12
13 let event_id = required_string(object.get("event_id"), "event_id")?;
14 let source = required_string(object.get("source"), "source")?;
15 let timestamp_ms = required_u64(object.get("timestamp_ms"), "timestamp_ms")?;
16
17 if let Some(module) = object.get("module") {
18 let module = required_string(Some(module), "module")?;
19 let event_type = required_string(object.get("event_type"), "event_type")?;
20 let payload = object
21 .get("payload")
22 .ok_or(NormalizationError::MissingField("payload"))?
23 .clone();
24 return enforce_source_consistency(EventEnvelope {
25 event_id,
26 source,
27 timestamp_ms,
28 event: UnifiedEvent::Module(ModuleEvent {
29 module,
30 event_type,
31 payload,
32 }),
33 });
34 }
35
36 let agent_id = required_string(object.get("agent_id"), "agent_id")?;
37 let event_type = required_string(object.get("event_type"), "event_type")?;
38 let payload = object.get("payload").cloned();
39
40 enforce_source_consistency(EventEnvelope {
41 event_id,
42 source,
43 timestamp_ms,
44 event: UnifiedEvent::Agent {
45 agent_id,
46 event_type,
47 payload,
48 },
49 })
50}
51
52impl MobkitRuntimeHandle {
53 pub(crate) fn append_normalized_event(
54 &mut self,
55 event: EventEnvelope<UnifiedEvent>,
56 ) -> Result<(), NormalizationError> {
57 let event = enforce_source_consistency(event)?;
58 insert_event_sorted(&mut self.merged_events, event);
59 Ok(())
60 }
61
62 pub fn merged_events(&self) -> &[EventEnvelope<UnifiedEvent>] {
63 &self.merged_events
64 }
65 pub fn subscribe_events(
66 &self,
67 request: SubscribeRequest,
68 ) -> Result<SubscribeResponse, SubscribeError> {
69 if let Some(checkpoint) = request.last_event_id.as_ref()
70 && checkpoint.trim().is_empty()
71 {
72 return Err(SubscribeError::EmptyCheckpoint);
73 }
74
75 if matches!(request.scope, SubscribeScope::Agent) {
76 let agent_id = request
77 .agent_id
78 .as_deref()
79 .ok_or(SubscribeError::MissingAgentId)?;
80 if agent_id.trim().is_empty() {
81 return Err(SubscribeError::InvalidAgentId);
82 }
83 }
84
85 let scoped_events: Vec<_> = self
86 .merged_events
87 .iter()
88 .filter(|event| event_matches_request(event, &request))
89 .collect();
90 let skip = scoped_events
91 .len()
92 .saturating_sub(SUBSCRIBE_REPLAY_EVENT_CAP);
93 let bounded = &scoped_events[skip..];
94
95 let replay_slice = match request.last_event_id.as_ref() {
96 Some(checkpoint) => {
97 let start_idx = bounded
98 .iter()
99 .position(|event| event.event_id == *checkpoint)
100 .ok_or_else(|| SubscribeError::UnknownCheckpoint(checkpoint.clone()))?;
101 &bounded[start_idx..]
102 }
103 None => bounded,
104 };
105 let replay_events: Vec<_> = replay_slice.iter().map(|e| (*e).clone()).collect();
106 let event_frames = replay_events
107 .iter()
108 .map(build_sse_event_frame)
109 .collect::<Vec<_>>();
110
111 Ok(SubscribeResponse {
112 scope: request.scope,
113 replay_from_event_id: request.last_event_id,
114 keep_alive: SubscribeKeepAlive {
115 interval_ms: SSE_KEEP_ALIVE_INTERVAL_MS,
116 event: SSE_KEEP_ALIVE_EVENT_NAME.to_string(),
117 },
118 keep_alive_comment: SSE_KEEP_ALIVE_COMMENT_FRAME.to_string(),
119 event_frames,
120 events: replay_events,
121 })
122 }
123}
124
125pub(super) fn merge_unified_events(
126 mut module_events: Vec<EventEnvelope<UnifiedEvent>>,
127 mut agent_events: Vec<EventEnvelope<UnifiedEvent>>,
128) -> Vec<EventEnvelope<UnifiedEvent>> {
129 let mut merged = Vec::with_capacity(module_events.len() + agent_events.len());
130 merged.append(&mut module_events);
131 merged.append(&mut agent_events);
132 merged.sort_by(|left, right| {
133 left.timestamp_ms
134 .cmp(&right.timestamp_ms)
135 .then_with(|| left.event_id.cmp(&right.event_id))
136 .then_with(|| left.source.cmp(&right.source))
137 });
138 merged
139}
140
141fn event_matches_request(event: &EventEnvelope<UnifiedEvent>, request: &SubscribeRequest) -> bool {
142 match request.scope {
143 SubscribeScope::Mob => true,
144 SubscribeScope::Agent => match &event.event {
145 UnifiedEvent::Agent { agent_id, .. } => request
146 .agent_id
147 .as_deref()
148 .map(|selected| selected == agent_id)
149 .unwrap_or(false),
150 UnifiedEvent::Module(_) => false,
151 },
152 SubscribeScope::Interaction => match &event.event {
153 UnifiedEvent::Agent { event_type, .. } => event_type.starts_with("interaction"),
154 UnifiedEvent::Module(module_event) => {
155 module_event.event_type.starts_with("interaction")
156 }
157 },
158 }
159}
160
161fn build_sse_event_frame(event: &EventEnvelope<UnifiedEvent>) -> String {
162 let event_name = match &event.event {
163 UnifiedEvent::Agent { event_type, .. } => event_type.as_str(),
164 UnifiedEvent::Module(module_event) => module_event.event_type.as_str(),
165 };
166 let payload = serde_json::to_string(&event.event).unwrap_or_else(|_| "{}".to_string());
167 format!(
168 "id: {}\nevent: {}\ndata: {}\n\n",
169 event.event_id, event_name, payload
170 )
171}
172
173fn enforce_source_consistency(
174 envelope: EventEnvelope<UnifiedEvent>,
175) -> Result<EventEnvelope<UnifiedEvent>, NormalizationError> {
176 let expected = match &envelope.event {
177 UnifiedEvent::Agent { .. } => "agent",
178 UnifiedEvent::Module(_) => "module",
179 };
180 if envelope.source != expected {
181 return Err(NormalizationError::SourceMismatch {
182 expected,
183 got: envelope.source,
184 });
185 }
186 Ok(envelope)
187}
188
189fn required_string(
190 value: Option<&Value>,
191 field: &'static str,
192) -> Result<String, NormalizationError> {
193 let value = value.ok_or(NormalizationError::MissingField(field))?;
194 let text = value
195 .as_str()
196 .ok_or(NormalizationError::InvalidFieldType(field))?;
197 Ok(text.to_string())
198}
199
200fn required_u64(value: Option<&Value>, field: &'static str) -> Result<u64, NormalizationError> {
201 let value = value.ok_or(NormalizationError::MissingField(field))?;
202 value
203 .as_u64()
204 .ok_or(NormalizationError::InvalidFieldType(field))
205}
206
207pub(super) fn insert_event_sorted(
208 events: &mut Vec<EventEnvelope<UnifiedEvent>>,
209 event: EventEnvelope<UnifiedEvent>,
210) {
211 let insertion_index = events
212 .binary_search_by(|existing| {
213 existing
214 .timestamp_ms
215 .cmp(&event.timestamp_ms)
216 .then_with(|| existing.event_id.cmp(&event.event_id))
217 .then_with(|| existing.source.cmp(&event.source))
218 })
219 .unwrap_or_else(|index| index);
220 events.insert(insertion_index, event);
221}