meerkat_mobkit/unified_runtime/
event_log.rs1use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::Duration;
8
9use serde::{Deserialize, Serialize};
10use tokio::sync::mpsc;
11
12use crate::types::{EventEnvelope, UnifiedEvent};
13
14pub type EventLogError = Box<dyn std::error::Error + Send>;
16
17type EventFilter = Box<dyn Fn(&UnifiedEvent) -> bool + Send + Sync>;
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct PersistedEvent {
27 pub id: String,
29 pub seq: u64,
32 pub timestamp_ms: u64,
34 pub member_id: Option<String>,
36 pub event: UnifiedEvent,
38}
39
40#[derive(Debug, Clone, Default, Serialize, Deserialize)]
52pub struct EventQuery {
53 #[serde(default, skip_serializing_if = "Option::is_none")]
55 pub since_ms: Option<u64>,
56 #[serde(default, skip_serializing_if = "Option::is_none")]
58 pub until_ms: Option<u64>,
59 #[serde(default, skip_serializing_if = "Option::is_none")]
61 pub member_id: Option<String>,
62 #[serde(default, skip_serializing_if = "Option::is_none")]
64 pub identity: Option<String>,
65 #[serde(default, skip_serializing_if = "Option::is_none")]
67 pub mob_id: Option<String>,
68 #[serde(default, skip_serializing_if = "Option::is_none")]
70 pub run_id: Option<String>,
71 #[serde(default, skip_serializing_if = "Option::is_none")]
73 pub step_id: Option<String>,
74 #[serde(default, skip_serializing_if = "Vec::is_empty")]
76 pub event_types: Vec<String>,
77 #[serde(default, skip_serializing_if = "Option::is_none")]
79 pub limit: Option<usize>,
80 #[serde(default, skip_serializing_if = "Option::is_none")]
82 pub after_seq: Option<u64>,
83}
84
85pub trait EventLogStore: Send + Sync {
96 fn append_batch(
101 &self,
102 events: Vec<PersistedEvent>,
103 ) -> Pin<Box<dyn Future<Output = Result<(), EventLogError>> + Send + '_>>;
104
105 fn query(
107 &self,
108 query: EventQuery,
109 ) -> Pin<Box<dyn Future<Output = Result<Vec<PersistedEvent>, EventLogError>> + Send + '_>>;
110}
111
112pub struct EventLogConfig {
118 pub store: Box<dyn EventLogStore>,
120 pub filter: Option<EventFilter>,
123 pub batch_size: usize,
126 pub flush_interval: Duration,
129}
130
131impl Default for EventLogConfig {
132 fn default() -> Self {
133 Self {
134 store: Box::new(NullEventLogStore),
135 filter: None,
136 batch_size: 64,
137 flush_interval: Duration::from_secs(1),
138 }
139 }
140}
141
142struct NullEventLogStore;
144
145impl EventLogStore for NullEventLogStore {
146 fn append_batch(
147 &self,
148 _events: Vec<PersistedEvent>,
149 ) -> Pin<Box<dyn Future<Output = Result<(), EventLogError>> + Send + '_>> {
150 Box::pin(async { Ok(()) })
151 }
152
153 fn query(
154 &self,
155 _query: EventQuery,
156 ) -> Pin<Box<dyn Future<Output = Result<Vec<PersistedEvent>, EventLogError>> + Send + '_>> {
157 Box::pin(async { Ok(Vec::new()) })
158 }
159}
160
161pub(crate) struct EventLogHandle {
167 store: Arc<dyn EventLogStore>,
168 ingress_tx: mpsc::Sender<EventEnvelope<UnifiedEvent>>,
171}
172
173impl EventLogHandle {
174 pub fn store(&self) -> std::sync::Arc<dyn EventLogStore> {
176 self.store.clone()
177 }
178
179 pub fn ingest(&self, event: EventEnvelope<UnifiedEvent>) {
181 let _ = self.ingress_tx.try_send(event);
183 }
184}
185
186const EVENT_LOG_RETRY_BUFFER_CAP: usize = 4096;
190
191pub(crate) fn start_event_log(
194 config: EventLogConfig,
195 error_hook: Option<super::ErrorHook>,
196) -> EventLogHandle {
197 let store: Arc<dyn EventLogStore> = Arc::from(config.store);
198 let seq = Arc::new(AtomicU64::new(1));
199 let batch_size = config.batch_size.max(1);
203 let channel_capacity = (batch_size * 4).max(4);
206 let (ingress_tx, ingress_rx) = mpsc::channel(channel_capacity);
207
208 let handle = EventLogHandle {
209 store: store.clone(),
210 ingress_tx,
211 };
212
213 tokio::spawn(run_flush_loop(
214 ingress_rx,
215 store,
216 seq,
217 config.filter,
218 batch_size,
219 config.flush_interval,
220 error_hook,
221 ));
222
223 handle
224}
225
226async fn run_flush_loop(
227 mut rx: mpsc::Receiver<EventEnvelope<UnifiedEvent>>,
228 store: Arc<dyn EventLogStore>,
229 seq: Arc<AtomicU64>,
230 filter: Option<EventFilter>,
231 batch_size: usize,
232 flush_interval: Duration,
233 error_hook: Option<super::ErrorHook>,
234) {
235 let mut batch: Vec<PersistedEvent> = Vec::with_capacity(batch_size);
236 let mut interval = tokio::time::interval(flush_interval);
237 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
238
239 loop {
240 tokio::select! {
241 maybe_event = rx.recv() => {
242 match maybe_event {
243 Some(envelope) => {
244 if let Some(ref f) = filter
245 && !f(&envelope.event)
246 {
247 continue;
248 }
249 let persisted = to_persisted(&seq, &envelope);
250 batch.push(persisted);
251 if batch.len() >= batch_size {
252 flush_batch(&store, &mut batch, &error_hook).await;
253 }
254 }
255 None => {
256 if !batch.is_empty() {
261 flush_batch(&store, &mut batch, &error_hook).await;
262 }
263 break;
264 }
265 }
266 }
267 _ = interval.tick() => {
268 if !batch.is_empty() {
269 flush_batch(&store, &mut batch, &error_hook).await;
270 }
271 }
272 }
273 }
274}
275
276fn enforce_retry_cap(batch: &mut Vec<PersistedEvent>) -> usize {
280 if batch.len() <= EVENT_LOG_RETRY_BUFFER_CAP {
281 return 0;
282 }
283 let drop = batch.len() - EVENT_LOG_RETRY_BUFFER_CAP;
284 batch.drain(0..drop);
285 drop
286}
287
288fn to_persisted(seq: &AtomicU64, envelope: &EventEnvelope<UnifiedEvent>) -> PersistedEvent {
289 let member_id = match &envelope.event {
290 UnifiedEvent::Agent { agent_id, .. } => Some(agent_id.clone()),
291 UnifiedEvent::Module(_) => None,
292 };
293 PersistedEvent {
294 id: envelope.event_id.clone(),
295 seq: seq.fetch_add(1, Ordering::Relaxed),
296 timestamp_ms: envelope.timestamp_ms,
297 member_id,
298 event: envelope.event.clone(),
299 }
300}
301
302async fn flush_batch(
303 store: &Arc<dyn EventLogStore>,
304 batch: &mut Vec<PersistedEvent>,
305 error_hook: &Option<super::ErrorHook>,
306) {
307 let events = std::mem::take(batch);
308 if let Err(err) = store.append_batch(events.clone()).await {
309 let mut restored = events;
314 restored.append(batch); let dropped = enforce_retry_cap(&mut restored);
316 *batch = restored;
317
318 if let Some(hook) = error_hook {
319 let hook = hook.clone();
320 let msg = if dropped > 0 {
321 format!(
322 "event log flush failed: {err}; dropped {dropped} oldest events to bound the retry buffer at {EVENT_LOG_RETRY_BUFFER_CAP}"
323 )
324 } else {
325 format!("event log flush failed: {err}; will retry")
326 };
327 tokio::spawn(async move {
328 let () = hook(super::types::ErrorEvent::EventLogFlushFailure { error: msg }).await;
329 });
330 }
331 }
332}
333
334#[cfg(test)]
335#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
336mod tests {
337 use super::*;
338 use std::sync::Mutex;
339
340 struct FlakyStore {
343 failures_remaining: Mutex<usize>,
344 persisted: Mutex<Vec<PersistedEvent>>,
345 attempts: Mutex<usize>,
346 }
347
348 impl EventLogStore for FlakyStore {
349 fn append_batch(
350 &self,
351 events: Vec<PersistedEvent>,
352 ) -> Pin<Box<dyn Future<Output = Result<(), EventLogError>> + Send + '_>> {
353 Box::pin(async move {
354 *self.attempts.lock().expect("attempts") += 1;
355 let mut left = self.failures_remaining.lock().expect("failures");
356 if *left > 0 {
357 *left -= 1;
358 #[derive(Debug)]
359 struct Transient;
360 impl std::fmt::Display for Transient {
361 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
362 write!(f, "transient")
363 }
364 }
365 impl std::error::Error for Transient {}
366 return Err(Box::new(Transient) as Box<dyn std::error::Error + Send>);
367 }
368 self.persisted.lock().expect("persisted").extend(events);
369 Ok(())
370 })
371 }
372
373 fn query(
374 &self,
375 _query: EventQuery,
376 ) -> Pin<Box<dyn Future<Output = Result<Vec<PersistedEvent>, EventLogError>> + Send + '_>>
377 {
378 Box::pin(async { Ok(Vec::new()) })
379 }
380 }
381
382 fn sample_event(id: &str) -> PersistedEvent {
383 PersistedEvent {
384 id: id.to_string(),
385 seq: 0,
386 timestamp_ms: 0,
387 member_id: None,
388 event: UnifiedEvent::Module(crate::types::ModuleEvent {
389 module: "test-module".into(),
390 event_type: "x".into(),
391 payload: serde_json::Value::Null,
392 }),
393 }
394 }
395
396 #[tokio::test]
400 async fn flush_failure_retries_instead_of_dropping_events() {
401 let flaky = Arc::new(FlakyStore {
402 failures_remaining: Mutex::new(2),
403 persisted: Mutex::new(Vec::new()),
404 attempts: Mutex::new(0),
405 });
406 let store: Arc<dyn EventLogStore> = flaky.clone();
407 let mut batch = vec![sample_event("a"), sample_event("b")];
408
409 flush_batch(&store, &mut batch, &None).await;
412 assert_eq!(batch.len(), 2, "events must be retained on flush failure");
413
414 flush_batch(&store, &mut batch, &None).await;
416 assert_eq!(batch.len(), 2);
417
418 flush_batch(&store, &mut batch, &None).await;
420 assert!(batch.is_empty(), "batch must drain on successful flush");
421
422 assert_eq!(*flaky.attempts.lock().expect("attempts"), 3);
423 assert_eq!(flaky.persisted.lock().expect("persisted").len(), 2);
424 }
425
426 #[test]
427 fn enforce_retry_cap_drops_oldest() {
428 let mut batch: Vec<PersistedEvent> = (0..(EVENT_LOG_RETRY_BUFFER_CAP + 100))
429 .map(|i| sample_event(&format!("evt-{i}")))
430 .collect();
431 let dropped = enforce_retry_cap(&mut batch);
432 assert_eq!(dropped, 100);
433 assert_eq!(batch.len(), EVENT_LOG_RETRY_BUFFER_CAP);
434 assert_eq!(batch.first().expect("first").id, "evt-100");
436 }
437}