Skip to main content

meerkat_mobkit/unified_runtime/
event_log.rs

1//! Persistent operational event log with buffered ingestion and pluggable storage.
2
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::Duration;
8
9use serde::{Deserialize, Serialize};
10use tokio::sync::mpsc;
11
12use crate::types::{EventEnvelope, UnifiedEvent};
13
14/// Boxed error type returned by [`EventLogStore`] methods.
15pub type EventLogError = Box<dyn std::error::Error + Send>;
16
17/// Optional event filter predicate.
18type EventFilter = Box<dyn Fn(&UnifiedEvent) -> bool + Send + Sync>;
19
20// ---------------------------------------------------------------------------
21// Persisted event model
22// ---------------------------------------------------------------------------
23
24/// A persisted operational event with monotonic ordering.
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct PersistedEvent {
27    /// Unique event ID (from the original event envelope).
28    pub id: String,
29    /// Monotonic sequence number assigned at ingestion time.
30    /// Deterministic ordering within and across batches.
31    pub seq: u64,
32    /// Millisecond timestamp from the original event.
33    pub timestamp_ms: u64,
34    /// Member/agent ID. `None` for module events.
35    pub member_id: Option<String>,
36    /// The full event payload.
37    pub event: UnifiedEvent,
38}
39
40// ---------------------------------------------------------------------------
41// Query model
42// ---------------------------------------------------------------------------
43
44/// Query parameters for historical event retrieval.
45///
46/// `after_seq` IS the cursor for pagination. Stores that surface a
47/// monotonic sequence (`PersistedEvent::seq`, `MobStructuralEventEnvelope::cursor`)
48/// MUST treat it as an exclusive lower bound and return events with
49/// `seq > after_seq`. Callers paginate by passing the last-seen `seq` as
50/// `after_seq` on the next query.
51#[derive(Debug, Clone, Default, Serialize, Deserialize)]
52pub struct EventQuery {
53    /// Only events after this timestamp (inclusive).
54    #[serde(default, skip_serializing_if = "Option::is_none")]
55    pub since_ms: Option<u64>,
56    /// Only events before this timestamp (exclusive).
57    #[serde(default, skip_serializing_if = "Option::is_none")]
58    pub until_ms: Option<u64>,
59    /// Filter to events from a specific member/agent.
60    #[serde(default, skip_serializing_if = "Option::is_none")]
61    pub member_id: Option<String>,
62    /// Filter to events from a specific identity when the store supports identity-native rows.
63    #[serde(default, skip_serializing_if = "Option::is_none")]
64    pub identity: Option<String>,
65    /// Filter to events from a specific mob (structural event surface).
66    #[serde(default, skip_serializing_if = "Option::is_none")]
67    pub mob_id: Option<String>,
68    /// Filter to events scoped to a specific flow run.
69    #[serde(default, skip_serializing_if = "Option::is_none")]
70    pub run_id: Option<String>,
71    /// Filter to events scoped to a specific flow step.
72    #[serde(default, skip_serializing_if = "Option::is_none")]
73    pub step_id: Option<String>,
74    /// Filter to specific event types (e.g. "run_completed", "run_failed").
75    #[serde(default, skip_serializing_if = "Vec::is_empty")]
76    pub event_types: Vec<String>,
77    /// Maximum number of events to return.
78    #[serde(default, skip_serializing_if = "Option::is_none")]
79    pub limit: Option<usize>,
80    /// Resume after this sequence number (exclusive; the cursor for pagination).
81    #[serde(default, skip_serializing_if = "Option::is_none")]
82    pub after_seq: Option<u64>,
83}
84
85// ---------------------------------------------------------------------------
86// Storage trait
87// ---------------------------------------------------------------------------
88
89/// Trait for persisting and querying operational events.
90///
91/// MobKit defines the contract; apps provide the implementation for their
92/// storage backend (BigQuery, Postgres, SQLite, in-memory, etc.).
93///
94/// Same pattern as `Discovery`, `EdgeDiscovery`, `SessionAgentBuilder`.
95pub trait EventLogStore: Send + Sync {
96    /// Persist a batch of events. Called periodically by the ingestion engine.
97    ///
98    /// Must be idempotent — duplicate events (same `id`) should be ignored.
99    /// Failures are logged via the error hook but never block agent execution.
100    fn append_batch(
101        &self,
102        events: Vec<PersistedEvent>,
103    ) -> Pin<Box<dyn Future<Output = Result<(), EventLogError>> + Send + '_>>;
104
105    /// Query historical events matching the given criteria.
106    fn query(
107        &self,
108        query: EventQuery,
109    ) -> Pin<Box<dyn Future<Output = Result<Vec<PersistedEvent>, EventLogError>> + Send + '_>>;
110}
111
112// ---------------------------------------------------------------------------
113// Configuration
114// ---------------------------------------------------------------------------
115
116/// Configuration for the event log ingestion engine.
117pub struct EventLogConfig {
118    /// App-provided storage backend.
119    pub store: Box<dyn EventLogStore>,
120    /// Optional filter — return `true` to persist, `false` to skip.
121    /// If `None`, all events are persisted.
122    pub filter: Option<EventFilter>,
123    /// Number of events to buffer before flushing to storage.
124    /// Default: 64.
125    pub batch_size: usize,
126    /// Maximum time between flushes, even if batch is not full.
127    /// Default: 1 second.
128    pub flush_interval: Duration,
129}
130
131impl Default for EventLogConfig {
132    fn default() -> Self {
133        Self {
134            store: Box::new(NullEventLogStore),
135            filter: None,
136            batch_size: 64,
137            flush_interval: Duration::from_secs(1),
138        }
139    }
140}
141
142/// No-op store for when event logging is not configured.
143struct NullEventLogStore;
144
145impl EventLogStore for NullEventLogStore {
146    fn append_batch(
147        &self,
148        _events: Vec<PersistedEvent>,
149    ) -> Pin<Box<dyn Future<Output = Result<(), EventLogError>> + Send + '_>> {
150        Box::pin(async { Ok(()) })
151    }
152
153    fn query(
154        &self,
155        _query: EventQuery,
156    ) -> Pin<Box<dyn Future<Output = Result<Vec<PersistedEvent>, EventLogError>> + Send + '_>> {
157        Box::pin(async { Ok(Vec::new()) })
158    }
159}
160
161// ---------------------------------------------------------------------------
162// Ingestion engine
163// ---------------------------------------------------------------------------
164
165/// Shared handle to the event log, held by UnifiedRuntime.
166pub(crate) struct EventLogHandle {
167    store: Arc<dyn EventLogStore>,
168    /// Sender for the ingestion buffer. Events are sent here and flushed
169    /// in batches by a background task.
170    ingress_tx: mpsc::Sender<EventEnvelope<UnifiedEvent>>,
171}
172
173impl EventLogHandle {
174    /// Return a cloned reference to the underlying store.
175    pub fn store(&self) -> std::sync::Arc<dyn EventLogStore> {
176        self.store.clone()
177    }
178
179    /// Ingest an event into the log (non-blocking, buffered).
180    pub fn ingest(&self, event: EventEnvelope<UnifiedEvent>) {
181        // Non-blocking: drop if the buffer is full (backpressure protection)
182        let _ = self.ingress_tx.try_send(event);
183    }
184}
185
186/// Hard cap on retained events while the store is unavailable. Beyond
187/// this, the oldest events in the retry buffer are dropped — bounded
188/// loss is preferable to OOM.
189const EVENT_LOG_RETRY_BUFFER_CAP: usize = 4096;
190
191/// Start the event log ingestion engine. Returns a handle for the runtime
192/// and spawns a background flush task.
193pub(crate) fn start_event_log(
194    config: EventLogConfig,
195    error_hook: Option<super::ErrorHook>,
196) -> EventLogHandle {
197    let store: Arc<dyn EventLogStore> = Arc::from(config.store);
198    let seq = Arc::new(AtomicU64::new(1));
199    // Clamp batch_size — `mpsc::channel(0)` panics, and `batch_size = 0`
200    // would also defeat batching by triggering an immediate flush per
201    // event.
202    let batch_size = config.batch_size.max(1);
203    // Buffer capacity: 4x batch size to absorb bursts; floor at 4 so a
204    // tiny batch_size doesn't starve.
205    let channel_capacity = (batch_size * 4).max(4);
206    let (ingress_tx, ingress_rx) = mpsc::channel(channel_capacity);
207
208    let handle = EventLogHandle {
209        store: store.clone(),
210        ingress_tx,
211    };
212
213    tokio::spawn(run_flush_loop(
214        ingress_rx,
215        store,
216        seq,
217        config.filter,
218        batch_size,
219        config.flush_interval,
220        error_hook,
221    ));
222
223    handle
224}
225
226async fn run_flush_loop(
227    mut rx: mpsc::Receiver<EventEnvelope<UnifiedEvent>>,
228    store: Arc<dyn EventLogStore>,
229    seq: Arc<AtomicU64>,
230    filter: Option<EventFilter>,
231    batch_size: usize,
232    flush_interval: Duration,
233    error_hook: Option<super::ErrorHook>,
234) {
235    let mut batch: Vec<PersistedEvent> = Vec::with_capacity(batch_size);
236    let mut interval = tokio::time::interval(flush_interval);
237    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
238
239    loop {
240        tokio::select! {
241            maybe_event = rx.recv() => {
242                match maybe_event {
243                    Some(envelope) => {
244                        if let Some(ref f) = filter
245                            && !f(&envelope.event)
246                        {
247                            continue;
248                        }
249                        let persisted = to_persisted(&seq, &envelope);
250                        batch.push(persisted);
251                        if batch.len() >= batch_size {
252                            flush_batch(&store, &mut batch, &error_hook).await;
253                        }
254                    }
255                    None => {
256                        // Channel closed — best-effort final flush and exit.
257                        // If this final flush fails, events on the floor
258                        // are unrecoverable; the error hook fires so
259                        // operators see it.
260                        if !batch.is_empty() {
261                            flush_batch(&store, &mut batch, &error_hook).await;
262                        }
263                        break;
264                    }
265                }
266            }
267            _ = interval.tick() => {
268                if !batch.is_empty() {
269                    flush_batch(&store, &mut batch, &error_hook).await;
270                }
271            }
272        }
273    }
274}
275
276/// Drop the oldest events from a retry buffer when it exceeds
277/// [`EVENT_LOG_RETRY_BUFFER_CAP`]. Returns the count dropped. Bounded
278/// loss is preferable to OOM under sustained store failure.
279fn enforce_retry_cap(batch: &mut Vec<PersistedEvent>) -> usize {
280    if batch.len() <= EVENT_LOG_RETRY_BUFFER_CAP {
281        return 0;
282    }
283    let drop = batch.len() - EVENT_LOG_RETRY_BUFFER_CAP;
284    batch.drain(0..drop);
285    drop
286}
287
288fn to_persisted(seq: &AtomicU64, envelope: &EventEnvelope<UnifiedEvent>) -> PersistedEvent {
289    let member_id = match &envelope.event {
290        UnifiedEvent::Agent { agent_id, .. } => Some(agent_id.clone()),
291        UnifiedEvent::Module(_) => None,
292    };
293    PersistedEvent {
294        id: envelope.event_id.clone(),
295        seq: seq.fetch_add(1, Ordering::Relaxed),
296        timestamp_ms: envelope.timestamp_ms,
297        member_id,
298        event: envelope.event.clone(),
299    }
300}
301
302async fn flush_batch(
303    store: &Arc<dyn EventLogStore>,
304    batch: &mut Vec<PersistedEvent>,
305    error_hook: &Option<super::ErrorHook>,
306) {
307    let events = std::mem::take(batch);
308    if let Err(err) = store.append_batch(events.clone()).await {
309        // Restore the failed batch so the next tick / arrival retries
310        // — silent loss on transient store errors was the prior bug.
311        // Bound the retry buffer so a persistently-failing store can't
312        // drive mobkit OOM.
313        let mut restored = events;
314        restored.append(batch); // events that arrived after the failure
315        let dropped = enforce_retry_cap(&mut restored);
316        *batch = restored;
317
318        if let Some(hook) = error_hook {
319            let hook = hook.clone();
320            let msg = if dropped > 0 {
321                format!(
322                    "event log flush failed: {err}; dropped {dropped} oldest events to bound the retry buffer at {EVENT_LOG_RETRY_BUFFER_CAP}"
323                )
324            } else {
325                format!("event log flush failed: {err}; will retry")
326            };
327            tokio::spawn(async move {
328                let () = hook(super::types::ErrorEvent::EventLogFlushFailure { error: msg }).await;
329            });
330        }
331    }
332}
333
334#[cfg(test)]
335#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
336mod tests {
337    use super::*;
338    use std::sync::Mutex;
339
340    /// Stub store that fails its first N `append_batch` calls and
341    /// succeeds thereafter, recording every event it eventually accepts.
342    struct FlakyStore {
343        failures_remaining: Mutex<usize>,
344        persisted: Mutex<Vec<PersistedEvent>>,
345        attempts: Mutex<usize>,
346    }
347
348    impl EventLogStore for FlakyStore {
349        fn append_batch(
350            &self,
351            events: Vec<PersistedEvent>,
352        ) -> Pin<Box<dyn Future<Output = Result<(), EventLogError>> + Send + '_>> {
353            Box::pin(async move {
354                *self.attempts.lock().expect("attempts") += 1;
355                let mut left = self.failures_remaining.lock().expect("failures");
356                if *left > 0 {
357                    *left -= 1;
358                    #[derive(Debug)]
359                    struct Transient;
360                    impl std::fmt::Display for Transient {
361                        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
362                            write!(f, "transient")
363                        }
364                    }
365                    impl std::error::Error for Transient {}
366                    return Err(Box::new(Transient) as Box<dyn std::error::Error + Send>);
367                }
368                self.persisted.lock().expect("persisted").extend(events);
369                Ok(())
370            })
371        }
372
373        fn query(
374            &self,
375            _query: EventQuery,
376        ) -> Pin<Box<dyn Future<Output = Result<Vec<PersistedEvent>, EventLogError>> + Send + '_>>
377        {
378            Box::pin(async { Ok(Vec::new()) })
379        }
380    }
381
382    fn sample_event(id: &str) -> PersistedEvent {
383        PersistedEvent {
384            id: id.to_string(),
385            seq: 0,
386            timestamp_ms: 0,
387            member_id: None,
388            event: UnifiedEvent::Module(crate::types::ModuleEvent {
389                module: "test-module".into(),
390                event_type: "x".into(),
391                payload: serde_json::Value::Null,
392            }),
393        }
394    }
395
396    /// Regression: pre-fix, `flush_batch` did `mem::take(batch)` and
397    /// dropped the events on store error. After the fix, the events
398    /// stay in the batch until a subsequent flush succeeds.
399    #[tokio::test]
400    async fn flush_failure_retries_instead_of_dropping_events() {
401        let flaky = Arc::new(FlakyStore {
402            failures_remaining: Mutex::new(2),
403            persisted: Mutex::new(Vec::new()),
404            attempts: Mutex::new(0),
405        });
406        let store: Arc<dyn EventLogStore> = flaky.clone();
407        let mut batch = vec![sample_event("a"), sample_event("b")];
408
409        // First flush — fails. Batch must remain non-empty (events
410        // pushed back into the retry buffer).
411        flush_batch(&store, &mut batch, &None).await;
412        assert_eq!(batch.len(), 2, "events must be retained on flush failure");
413
414        // Second flush — fails again. Still retained.
415        flush_batch(&store, &mut batch, &None).await;
416        assert_eq!(batch.len(), 2);
417
418        // Third flush — succeeds. Batch drained, events persisted.
419        flush_batch(&store, &mut batch, &None).await;
420        assert!(batch.is_empty(), "batch must drain on successful flush");
421
422        assert_eq!(*flaky.attempts.lock().expect("attempts"), 3);
423        assert_eq!(flaky.persisted.lock().expect("persisted").len(), 2);
424    }
425
426    #[test]
427    fn enforce_retry_cap_drops_oldest() {
428        let mut batch: Vec<PersistedEvent> = (0..(EVENT_LOG_RETRY_BUFFER_CAP + 100))
429            .map(|i| sample_event(&format!("evt-{i}")))
430            .collect();
431        let dropped = enforce_retry_cap(&mut batch);
432        assert_eq!(dropped, 100);
433        assert_eq!(batch.len(), EVENT_LOG_RETRY_BUFFER_CAP);
434        // Newest 4096 retained — first remaining id is evt-100.
435        assert_eq!(batch.first().expect("first").id, "evt-100");
436    }
437}