Skip to main content

meerkat_mobkit/runtime/
event_transport.rs

1//! Event normalization and transport — line parsing, source validation, and envelope construction.
2
3use super::*;
4
5pub fn normalize_event_line(line: &str) -> Result<EventEnvelope<UnifiedEvent>, NormalizationError> {
6    if let Ok(envelope) = parse_unified_event_line(line) {
7        return enforce_source_consistency(envelope);
8    }
9
10    let value: Value = serde_json::from_str(line).map_err(|_| NormalizationError::InvalidJson)?;
11    let object = value.as_object().ok_or(NormalizationError::InvalidSchema)?;
12
13    let event_id = required_string(object.get("event_id"), "event_id")?;
14    let source = required_string(object.get("source"), "source")?;
15    let timestamp_ms = required_u64(object.get("timestamp_ms"), "timestamp_ms")?;
16
17    if let Some(module) = object.get("module") {
18        let module = required_string(Some(module), "module")?;
19        let event_type = required_string(object.get("event_type"), "event_type")?;
20        let payload = object
21            .get("payload")
22            .ok_or(NormalizationError::MissingField("payload"))?
23            .clone();
24        return enforce_source_consistency(EventEnvelope {
25            event_id,
26            source,
27            timestamp_ms,
28            event: UnifiedEvent::Module(ModuleEvent {
29                module,
30                event_type,
31                payload,
32            }),
33        });
34    }
35
36    let agent_id = required_string(object.get("agent_id"), "agent_id")?;
37    let event_type = required_string(object.get("event_type"), "event_type")?;
38    let payload = object.get("payload").cloned();
39
40    enforce_source_consistency(EventEnvelope {
41        event_id,
42        source,
43        timestamp_ms,
44        event: UnifiedEvent::Agent {
45            agent_id,
46            event_type,
47            payload,
48        },
49    })
50}
51
52impl MobkitRuntimeHandle {
53    pub(crate) fn append_normalized_event(
54        &mut self,
55        event: EventEnvelope<UnifiedEvent>,
56    ) -> Result<(), NormalizationError> {
57        let event = enforce_source_consistency(event)?;
58        insert_event_sorted(&mut self.merged_events, event);
59        Ok(())
60    }
61
62    pub fn merged_events(&self) -> &[EventEnvelope<UnifiedEvent>] {
63        &self.merged_events
64    }
65    pub fn subscribe_events(
66        &self,
67        request: SubscribeRequest,
68    ) -> Result<SubscribeResponse, SubscribeError> {
69        if let Some(checkpoint) = request.last_event_id.as_ref()
70            && checkpoint.trim().is_empty()
71        {
72            return Err(SubscribeError::EmptyCheckpoint);
73        }
74
75        if matches!(request.scope, SubscribeScope::Agent) {
76            let agent_id = request
77                .agent_id
78                .as_deref()
79                .ok_or(SubscribeError::MissingAgentId)?;
80            if agent_id.trim().is_empty() {
81                return Err(SubscribeError::InvalidAgentId);
82            }
83        }
84
85        let scoped_events: Vec<_> = self
86            .merged_events
87            .iter()
88            .filter(|event| event_matches_request(event, &request))
89            .collect();
90        let skip = scoped_events
91            .len()
92            .saturating_sub(SUBSCRIBE_REPLAY_EVENT_CAP);
93        let bounded = &scoped_events[skip..];
94
95        let replay_slice = match request.last_event_id.as_ref() {
96            Some(checkpoint) => {
97                let start_idx = bounded
98                    .iter()
99                    .position(|event| event.event_id == *checkpoint)
100                    .ok_or_else(|| SubscribeError::UnknownCheckpoint(checkpoint.clone()))?;
101                &bounded[start_idx..]
102            }
103            None => bounded,
104        };
105        let replay_events: Vec<_> = replay_slice.iter().map(|e| (*e).clone()).collect();
106        let event_frames = replay_events
107            .iter()
108            .map(build_sse_event_frame)
109            .collect::<Vec<_>>();
110
111        Ok(SubscribeResponse {
112            scope: request.scope,
113            replay_from_event_id: request.last_event_id,
114            keep_alive: SubscribeKeepAlive {
115                interval_ms: SSE_KEEP_ALIVE_INTERVAL_MS,
116                event: SSE_KEEP_ALIVE_EVENT_NAME.to_string(),
117            },
118            keep_alive_comment: SSE_KEEP_ALIVE_COMMENT_FRAME.to_string(),
119            event_frames,
120            events: replay_events,
121        })
122    }
123}
124
125pub(super) fn merge_unified_events(
126    mut module_events: Vec<EventEnvelope<UnifiedEvent>>,
127    mut agent_events: Vec<EventEnvelope<UnifiedEvent>>,
128) -> Vec<EventEnvelope<UnifiedEvent>> {
129    let mut merged = Vec::with_capacity(module_events.len() + agent_events.len());
130    merged.append(&mut module_events);
131    merged.append(&mut agent_events);
132    merged.sort_by(|left, right| {
133        left.timestamp_ms
134            .cmp(&right.timestamp_ms)
135            .then_with(|| left.event_id.cmp(&right.event_id))
136            .then_with(|| left.source.cmp(&right.source))
137    });
138    merged
139}
140
141fn event_matches_request(event: &EventEnvelope<UnifiedEvent>, request: &SubscribeRequest) -> bool {
142    match request.scope {
143        SubscribeScope::Mob => true,
144        SubscribeScope::Agent => match &event.event {
145            UnifiedEvent::Agent { agent_id, .. } => request
146                .agent_id
147                .as_deref()
148                .map(|selected| selected == agent_id)
149                .unwrap_or(false),
150            UnifiedEvent::Module(_) => false,
151        },
152        SubscribeScope::Interaction => match &event.event {
153            UnifiedEvent::Agent { event_type, .. } => event_type.starts_with("interaction"),
154            UnifiedEvent::Module(module_event) => {
155                module_event.event_type.starts_with("interaction")
156            }
157        },
158    }
159}
160
161fn build_sse_event_frame(event: &EventEnvelope<UnifiedEvent>) -> String {
162    let event_name = match &event.event {
163        UnifiedEvent::Agent { event_type, .. } => event_type.as_str(),
164        UnifiedEvent::Module(module_event) => module_event.event_type.as_str(),
165    };
166    let payload = serde_json::to_string(&event.event).unwrap_or_else(|_| "{}".to_string());
167    format!(
168        "id: {}\nevent: {}\ndata: {}\n\n",
169        event.event_id, event_name, payload
170    )
171}
172
173fn enforce_source_consistency(
174    envelope: EventEnvelope<UnifiedEvent>,
175) -> Result<EventEnvelope<UnifiedEvent>, NormalizationError> {
176    let expected = match &envelope.event {
177        UnifiedEvent::Agent { .. } => "agent",
178        UnifiedEvent::Module(_) => "module",
179    };
180    if envelope.source != expected {
181        return Err(NormalizationError::SourceMismatch {
182            expected,
183            got: envelope.source,
184        });
185    }
186    Ok(envelope)
187}
188
189fn required_string(
190    value: Option<&Value>,
191    field: &'static str,
192) -> Result<String, NormalizationError> {
193    let value = value.ok_or(NormalizationError::MissingField(field))?;
194    let text = value
195        .as_str()
196        .ok_or(NormalizationError::InvalidFieldType(field))?;
197    Ok(text.to_string())
198}
199
200fn required_u64(value: Option<&Value>, field: &'static str) -> Result<u64, NormalizationError> {
201    let value = value.ok_or(NormalizationError::MissingField(field))?;
202    value
203        .as_u64()
204        .ok_or(NormalizationError::InvalidFieldType(field))
205}
206
207pub(super) fn insert_event_sorted(
208    events: &mut Vec<EventEnvelope<UnifiedEvent>>,
209    event: EventEnvelope<UnifiedEvent>,
210) {
211    let insertion_index = events
212        .binary_search_by(|existing| {
213            existing
214                .timestamp_ms
215                .cmp(&event.timestamp_ms)
216                .then_with(|| existing.event_id.cmp(&event.event_id))
217                .then_with(|| existing.source.cmp(&event.source))
218        })
219        .unwrap_or_else(|index| index);
220    events.insert(insertion_index, event);
221}