Skip to main content

hirn_engine/observability/
event_log.rs

1//! Append-only event log backed by LanceDB.
2//!
3//! The [`EventLog`] is the foundation for event sourcing in hirn
4//!. Every mutation is appended to the `events` dataset
5//! before being materialized, enabling replay, streaming, audit, and
6//! time-travel queries.
7//!
8//! # Architecture
9//!
10//! Three tiers:
11//! 1. `events.lance` — durable, queryable event history (this module)
12//! 2. `tokio::sync::broadcast` — real-time in-memory WATCH subscriptions
13//! 3. LanceDB table versions/tags — coarse checkpoints (snapshots)
14
15use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use futures::TryStreamExt;
19
20use hirn_core::HirnResult;
21
22use hirn_storage::PhysicalStore;
23use hirn_storage::datasets::events::{self, DATASET_NAME, EventRow};
24use hirn_storage::store::{ScanOptions, ScanOrdering};
25
26use crate::event::{EventEnvelope, MemoryEvent};
27
28/// Filter for reading events from the log.
29#[derive(Debug, Default, Clone)]
30pub struct EventFilter {
31    /// Filter by realm.
32    pub realm: Option<String>,
33    /// Filter by namespace.
34    pub namespace: Option<String>,
35    /// Filter by event type string.
36    pub event_type: Option<String>,
37    /// Filter by agent ID.
38    pub agent_id: Option<String>,
39    /// Filter events after this timestamp (microseconds, inclusive).
40    pub after_us: Option<i64>,
41    /// Filter events before this timestamp (microseconds, inclusive).
42    pub before_us: Option<i64>,
43}
44
45/// Snapshot metadata stored alongside LanceDB tags.
46#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
47pub struct SnapshotMeta {
48    /// Sequence number at which the snapshot was taken.
49    pub seq: u64,
50    /// Wall-clock time of snapshot (microseconds).
51    pub timestamp_us: i64,
52    /// Number of events in the log at snapshot time.
53    pub event_count: u64,
54}
55
56/// Retention policy for event log compaction.
57#[derive(Debug, Clone)]
58pub enum RetentionPolicy {
59    /// Keep events newer than the last snapshot.
60    SnapshotBased,
61    /// Keep at most N events; compact the oldest.
62    MaxEvents(u64),
63    /// Keep events from the last N seconds.
64    TimeBased(u64),
65}
66
67/// Result of a compaction operation.
68#[derive(Debug, Clone)]
69pub struct CompactionResult {
70    /// Number of events removed.
71    pub events_removed: u64,
72    /// Sequence number up to which events were removed.
73    pub compacted_before_seq: u64,
74}
75
76/// Append-only event log backed by a LanceDB dataset.
77///
78/// Thread-safe: the atomic seq counter ensures gap-free sequence numbers
79/// from a single writer. For multi-writer (Raft), the leader assigns seqs.
80pub struct EventLog {
81    storage: Arc<dyn PhysicalStore>,
82    /// Next sequence number to assign.
83    next_seq: AtomicU64,
84    /// Broadcast channel for real-time push to WATCH subscribers.
85    tx: tokio::sync::broadcast::Sender<EventEnvelope>,
86}
87
88impl EventLog {
89    /// Create a new event log on the given storage backend.
90    ///
91    /// Scans the existing `events` dataset (if any) to recover the next
92    /// sequence number, ensuring gap-free continuation after restart.
93    pub async fn open(storage: Arc<dyn PhysicalStore>) -> HirnResult<Self> {
94        let (tx, _) = tokio::sync::broadcast::channel(4096);
95
96        // Recover next seq from existing events.
97        let next_seq = Self::recover_next_seq(&*storage).await?;
98
99        Ok(Self {
100            storage,
101            next_seq: AtomicU64::new(next_seq),
102            tx,
103        })
104    }
105
106    /// Recover the next sequence number by finding the max seq in the dataset.
107    async fn recover_next_seq(storage: &dyn PhysicalStore) -> HirnResult<u64> {
108        let exists = storage.exists(DATASET_NAME).await?;
109        if !exists {
110            return Ok(0);
111        }
112
113        let count = storage.count(DATASET_NAME, None).await?;
114        if count == 0 {
115            return Ok(0);
116        }
117
118        // Scan for the maximum seq value. We scan just the seq column,
119        // sorted by seq descending, limit 1.
120        let mut batches = storage
121            .scan_stream(
122                DATASET_NAME,
123                ScanOptions {
124                    columns: Some(vec!["seq".into()]),
125                    filter: None,
126                    exact_filter: None,
127                    order_by: Some(vec![ScanOrdering::desc("seq")]),
128                    limit: Some(1),
129                    offset: None,
130                },
131            )
132            .await?;
133
134        let mut max_seq: u64 = 0;
135        while let Some(batch) = batches.try_next().await? {
136            if let Some(col) = batch.column_by_name("seq") {
137                let arr = col
138                    .as_any()
139                    .downcast_ref::<arrow_array::UInt64Array>()
140                    .ok_or_else(|| {
141                        hirn_core::HirnError::storage("event_log seq column is not UInt64")
142                    })?;
143                for i in 0..arr.len() {
144                    if arr.value(i) > max_seq {
145                        max_seq = arr.value(i);
146                    }
147                }
148            }
149        }
150
151        Ok(max_seq + 1)
152    }
153
154    /// Get a broadcast receiver for real-time event subscriptions.
155    pub fn subscribe(&self) -> tokio::sync::broadcast::Receiver<EventEnvelope> {
156        self.tx.subscribe()
157    }
158
159    /// Get a filtered receiver that only delivers events matching the filter.
160    ///
161    /// Spawns a background task that reads from the broadcast channel and
162    /// forwards matching events to the returned `mpsc::Receiver`. The task
163    /// terminates when the receiver is dropped or the broadcast sender is
164    /// closed.
165    pub fn subscribe_filtered(
166        &self,
167        filter: EventFilter,
168    ) -> tokio::sync::mpsc::Receiver<EventEnvelope> {
169        let mut rx = self.tx.subscribe();
170        let (tx, filtered_rx) = tokio::sync::mpsc::channel(256);
171
172        tokio::spawn(async move {
173            loop {
174                match rx.recv().await {
175                    Ok(env) => {
176                        if filter_matches(&filter, &env) {
177                            if tx.send(env).await.is_err() {
178                                break; // receiver dropped
179                            }
180                        }
181                    }
182                    Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
183                    Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
184                        tracing::warn!(skipped = n, "event subscriber lagged, lost events");
185                        metrics::counter!("hirn_event_subscriber_lagged_total").increment(n);
186                        continue;
187                    }
188                }
189            }
190        });
191
192        filtered_rx
193    }
194
195    /// Current next sequence number (the number of events appended so far,
196    /// if no compaction has occurred).
197    pub fn next_seq(&self) -> u64 {
198        self.next_seq.load(Ordering::Acquire)
199    }
200
201    // ── Event Log Writer ─────────────────────────────────────
202
203    /// Append a single event to the log.
204    ///
205    /// Assigns a monotonic seq number, writes to LanceDB, and broadcasts
206    /// to real-time subscribers.
207    pub async fn append(
208        &self,
209        realm: impl Into<String>,
210        namespace: impl Into<String>,
211        agent_id: impl Into<String>,
212        event: MemoryEvent,
213    ) -> HirnResult<EventEnvelope> {
214        let seq = self.next_seq.fetch_add(1, Ordering::AcqRel);
215        let envelope = EventEnvelope::new(seq, realm, namespace, agent_id, event);
216
217        let payload = bincode::serialize(&envelope.event)
218            .map_err(|e| hirn_core::HirnError::storage(format!("event serialize: {e}")))?;
219
220        let row = EventRow {
221            seq: envelope.seq,
222            timestamp_us: envelope.timestamp_us,
223            realm: envelope.realm.clone(),
224            namespace: envelope.namespace.clone(),
225            agent_id: envelope.agent_id.clone(),
226            event_type: envelope.event_type().to_string(),
227            payload,
228            hmac: envelope.hmac.clone(),
229        };
230
231        let batch = events::to_batch(std::slice::from_ref(&row))?;
232        self.storage.append(DATASET_NAME, batch).await?;
233
234        // Best-effort broadcast (receivers may be lagging — that's OK).
235        let _ = self.tx.send(envelope.clone());
236
237        Ok(envelope)
238    }
239
240    /// Append a single event with HMAC signing.
241    ///
242    /// Same as [`Self::append`] but signs the event envelope with the provided secret
243    /// before persisting it. Auditors can later call [`Self::verify_integrity`] to
244    /// confirm no events have been tampered with.
245    pub async fn append_signed(
246        &self,
247        event: MemoryEvent,
248        realm: impl Into<String>,
249        namespace: impl Into<String>,
250        agent_id: impl Into<String>,
251        secret: &[u8],
252    ) -> HirnResult<EventEnvelope> {
253        let seq = self.next_seq.fetch_add(1, Ordering::AcqRel);
254        let mut envelope = EventEnvelope::new(seq, realm, namespace, agent_id, event);
255        envelope.sign(secret);
256
257        let payload = bincode::serialize(&envelope.event)
258            .map_err(|e| hirn_core::HirnError::storage(format!("event serialize: {e}")))?;
259
260        let row = EventRow {
261            seq: envelope.seq,
262            timestamp_us: envelope.timestamp_us,
263            realm: envelope.realm.clone(),
264            namespace: envelope.namespace.clone(),
265            agent_id: envelope.agent_id.clone(),
266            event_type: envelope.event_type().to_string(),
267            payload,
268            hmac: envelope.hmac.clone(),
269        };
270
271        let batch = events::to_batch(std::slice::from_ref(&row))?;
272        self.storage.append(DATASET_NAME, batch).await?;
273
274        let _ = self.tx.send(envelope.clone());
275
276        Ok(envelope)
277    }
278
279    /// Append a batch of events atomically.
280    ///
281    /// All events get consecutive seq numbers.
282    pub async fn append_batch(
283        &self,
284        realm: &str,
285        namespace: &str,
286        agent_id: &str,
287        events_in: Vec<MemoryEvent>,
288    ) -> HirnResult<Vec<EventEnvelope>> {
289        if events_in.is_empty() {
290            return Ok(vec![]);
291        }
292
293        let base_seq = self
294            .next_seq
295            .fetch_add(events_in.len() as u64, Ordering::AcqRel);
296
297        let mut envelopes = Vec::with_capacity(events_in.len());
298        let mut rows = Vec::with_capacity(events_in.len());
299
300        for (i, event) in events_in.into_iter().enumerate() {
301            let seq = base_seq + i as u64;
302            let envelope = EventEnvelope::new(seq, realm, namespace, agent_id, event);
303
304            let payload = bincode::serialize(&envelope.event)
305                .map_err(|e| hirn_core::HirnError::storage(format!("event serialize: {e}")))?;
306
307            rows.push(EventRow {
308                seq: envelope.seq,
309                timestamp_us: envelope.timestamp_us,
310                realm: envelope.realm.clone(),
311                namespace: envelope.namespace.clone(),
312                agent_id: envelope.agent_id.clone(),
313                event_type: envelope.event_type().to_string(),
314                payload,
315                hmac: envelope.hmac.clone(),
316            });
317
318            envelopes.push(envelope);
319        }
320
321        let batch = events::to_batch(&rows)?;
322        self.storage.append(DATASET_NAME, batch).await?;
323
324        // Broadcast all envelopes.
325        for env in &envelopes {
326            let _ = self.tx.send(env.clone());
327        }
328
329        Ok(envelopes)
330    }
331
332    // ── Event Log Reader & Replay ────────────────────────────
333
334    /// Read events in a sequence range [from_seq, to_seq] inclusive.
335    pub async fn read(&self, from_seq: u64, to_seq: u64) -> HirnResult<Vec<EventEnvelope>> {
336        let filter = format!("seq >= {from_seq} AND seq <= {to_seq}");
337        self.read_filtered(Some(&filter)).await
338    }
339
340    /// Read all events from a sequence number onward.
341    pub async fn tail(&self, from_seq: u64) -> HirnResult<Vec<EventEnvelope>> {
342        let filter = format!("seq >= {from_seq}");
343        self.read_filtered(Some(&filter)).await
344    }
345
346    /// Read all events matching an optional filter.
347    pub async fn read_all(&self) -> HirnResult<Vec<EventEnvelope>> {
348        self.read_filtered(None).await
349    }
350
351    /// Read events with an advanced filter.
352    pub async fn read_with_filter(&self, filter: &EventFilter) -> HirnResult<Vec<EventEnvelope>> {
353        let mut predicates = Vec::new();
354
355        if let Some(ref realm) = filter.realm {
356            let escaped = realm.replace('\'', "''");
357            predicates.push(format!("realm = '{escaped}'"));
358        }
359        if let Some(ref ns) = filter.namespace {
360            let escaped = ns.replace('\'', "''");
361            predicates.push(format!("namespace = '{escaped}'"));
362        }
363        if let Some(ref et) = filter.event_type {
364            let escaped = et.replace('\'', "''");
365            predicates.push(format!("event_type = '{escaped}'"));
366        }
367        if let Some(ref aid) = filter.agent_id {
368            let escaped = aid.replace('\'', "''");
369            predicates.push(format!("agent_id = '{escaped}'"));
370        }
371        if let Some(after) = filter.after_us {
372            predicates.push(format!("timestamp_us >= {after}"));
373        }
374        if let Some(before) = filter.before_us {
375            predicates.push(format!("timestamp_us <= {before}"));
376        }
377
378        let combined = if predicates.is_empty() {
379            None
380        } else {
381            Some(predicates.join(" AND "))
382        };
383
384        self.read_filtered(combined.as_deref()).await
385    }
386
387    /// Replay all events through a handler function to reconstruct state.
388    ///
389    /// Events are read in seq order and passed one-by-one to `handler`.
390    pub async fn replay<F>(&self, mut handler: F) -> HirnResult<u64>
391    where
392        F: FnMut(&EventEnvelope) -> HirnResult<()>,
393    {
394        let envelopes = self.read_all().await?;
395        let count = envelopes.len() as u64;
396        for env in &envelopes {
397            handler(env)?;
398        }
399        Ok(count)
400    }
401
402    /// Verify HMAC integrity of all events in the log.
403    ///
404    /// Returns the sequence numbers of events whose HMAC validation failed
405    /// (missing HMAC or tampered data). An empty vec means all events are valid.
406    /// Intended for use by external auditors.
407    pub async fn verify_integrity(&self, secret: &[u8]) -> HirnResult<Vec<u64>> {
408        let events = self.read_all().await?;
409        let failures: Vec<u64> = events
410            .iter()
411            .filter(|env| !env.verify_hmac(secret))
412            .map(|env| env.seq)
413            .collect();
414        Ok(failures)
415    }
416
417    /// Replay events from a specific seq onward.
418    pub async fn replay_from<F>(&self, from_seq: u64, mut handler: F) -> HirnResult<u64>
419    where
420        F: FnMut(&EventEnvelope) -> HirnResult<()>,
421    {
422        let envelopes = self.tail(from_seq).await?;
423        let count = envelopes.len() as u64;
424        for env in &envelopes {
425            handler(env)?;
426        }
427        Ok(count)
428    }
429
430    /// Internal: read events with an optional SQL filter predicate.
431    async fn read_filtered(&self, filter: Option<&str>) -> HirnResult<Vec<EventEnvelope>> {
432        self.read_filtered_limited(filter, None).await
433    }
434
435    /// Internal: read events with optional filter and limit.
436    async fn read_filtered_limited(
437        &self,
438        filter: Option<&str>,
439        limit: Option<usize>,
440    ) -> HirnResult<Vec<EventEnvelope>> {
441        self.read_filtered_limited_ordered(filter, limit, vec![ScanOrdering::asc("seq")])
442            .await
443    }
444
445    async fn read_filtered_limited_ordered(
446        &self,
447        filter: Option<&str>,
448        limit: Option<usize>,
449        order_by: Vec<ScanOrdering>,
450    ) -> HirnResult<Vec<EventEnvelope>> {
451        let exists = self.storage.exists(DATASET_NAME).await?;
452        if !exists {
453            return Ok(vec![]);
454        }
455
456        let mut batches = self
457            .storage
458            .scan_stream(
459                DATASET_NAME,
460                ScanOptions {
461                    columns: None,
462                    filter: filter.map(String::from),
463                    exact_filter: None,
464                    order_by: Some(order_by),
465                    limit,
466                    offset: None,
467                },
468            )
469            .await?;
470
471        let mut envelopes = Vec::new();
472        while let Some(batch) = batches.try_next().await? {
473            let rows = events::from_batch(&batch)?;
474            for row in rows {
475                let event: MemoryEvent = bincode::deserialize(&row.payload).map_err(|e| {
476                    hirn_core::HirnError::storage(format!(
477                        "event deserialize at seq {}: {e}",
478                        row.seq
479                    ))
480                })?;
481
482                envelopes.push(EventEnvelope {
483                    seq: row.seq,
484                    timestamp_us: row.timestamp_us,
485                    realm: row.realm,
486                    namespace: row.namespace,
487                    agent_id: row.agent_id,
488                    event: event,
489                    hmac: row.hmac,
490                });
491            }
492        }
493        Ok(envelopes)
494    }
495
496    // ── Snapshots & Compaction ───────────────────────────────
497
498    /// Take a snapshot at the current seq, creating LanceDB tags on
499    /// materialized tables.
500    ///
501    /// Returns the snapshot metadata including the seq at which it was taken.
502    pub async fn snapshot(&self, materialized_tables: &[&str]) -> HirnResult<SnapshotMeta> {
503        let current_seq = self.next_seq.load(Ordering::Acquire).saturating_sub(1);
504        let tag = format!("snapshot-{current_seq}");
505
506        // Tag each materialized table at its current version.
507        for table_name in materialized_tables {
508            if self.storage.exists(table_name).await? {
509                self.storage.tag(table_name, &tag).await?;
510            }
511        }
512
513        // Log the snapshot event.
514        let _ = self
515            .append(
516                "system",
517                "system",
518                "system",
519                MemoryEvent::SnapshotTaken {
520                    seq: current_seq,
521                    tag: tag.clone(),
522                },
523            )
524            .await?;
525
526        let event_count = self.storage.count(DATASET_NAME, None).await.unwrap_or(0);
527
528        let meta = SnapshotMeta {
529            seq: current_seq,
530            timestamp_us: chrono::Utc::now().timestamp_micros(),
531            event_count,
532        };
533
534        Ok(meta)
535    }
536
537    /// Compact (prune) events before the given sequence number.
538    ///
539    /// Events with `seq < before_seq` are deleted from the events dataset.
540    /// Call `optimize` afterward to reclaim storage.
541    pub async fn compact(&self, before_seq: u64) -> HirnResult<CompactionResult> {
542        let exists = self.storage.exists(DATASET_NAME).await?;
543        if !exists {
544            return Ok(CompactionResult {
545                events_removed: 0,
546                compacted_before_seq: before_seq,
547            });
548        }
549
550        let predicate = format!(
551            "seq < {before_seq} AND event_type NOT IN ('access_granted', 'access_denied', 'policy_changed')"
552        );
553        let deleted = self.storage.delete(DATASET_NAME, &predicate).await?;
554
555        // Optimize: compact + optimize indices.
556        self.storage
557            .compact(DATASET_NAME, Default::default())
558            .await?;
559        self.storage.optimize_indices(DATASET_NAME).await?;
560
561        // Log the compaction event.
562        let _ = self
563            .append(
564                "system",
565                "system",
566                "system",
567                MemoryEvent::CompactionCompleted {
568                    before_seq,
569                    events_removed: deleted,
570                },
571            )
572            .await?;
573
574        Ok(CompactionResult {
575            events_removed: deleted,
576            compacted_before_seq: before_seq,
577        })
578    }
579
580    /// Apply a retention policy to compact old events.
581    pub async fn apply_retention(&self, policy: &RetentionPolicy) -> HirnResult<CompactionResult> {
582        match policy {
583            RetentionPolicy::SnapshotBased => {
584                let snapshots = self
585                    .read_filtered_limited_ordered(
586                        Some("event_type = 'snapshot_taken'"),
587                        Some(1),
588                        vec![ScanOrdering::desc("seq")],
589                    )
590                    .await?;
591                let last_snapshot_seq = snapshots.iter().find_map(|e| {
592                    if let MemoryEvent::SnapshotTaken { seq, .. } = &e.event {
593                        Some(*seq)
594                    } else {
595                        None
596                    }
597                });
598
599                match last_snapshot_seq {
600                    Some(seq) => self.compact(seq).await,
601                    None => Ok(CompactionResult {
602                        events_removed: 0,
603                        compacted_before_seq: 0,
604                    }),
605                }
606            }
607            RetentionPolicy::MaxEvents(max) => {
608                let count = self.storage.count(DATASET_NAME, None).await.unwrap_or(0);
609                if count <= *max {
610                    return Ok(CompactionResult {
611                        events_removed: 0,
612                        compacted_before_seq: 0,
613                    });
614                }
615                let to_remove = count - max;
616                // Read only the oldest events up to the cutoff point + 1 to find
617                // the seq boundary, instead of loading the entire log.
618                let cutoff_events = self
619                    .read_filtered_limited(None, Some((to_remove + 1) as usize))
620                    .await?;
621                if let Some(env) = cutoff_events.get(to_remove as usize) {
622                    self.compact(env.seq).await
623                } else {
624                    Ok(CompactionResult {
625                        events_removed: 0,
626                        compacted_before_seq: 0,
627                    })
628                }
629            }
630            RetentionPolicy::TimeBased(max_age_secs) => {
631                let cutoff_us =
632                    chrono::Utc::now().timestamp_micros() - (*max_age_secs as i64 * 1_000_000);
633                // Scan only events at/after the cutoff to find the compact boundary,
634                // instead of loading all events into memory.
635                let filter = format!("timestamp_us >= {cutoff_us}");
636                let after_cutoff = self.read_filtered_limited(Some(&filter), Some(1)).await?;
637                let compact_seq = after_cutoff.first().map(|e| e.seq);
638                match compact_seq {
639                    Some(seq) => self.compact(seq).await,
640                    None => Ok(CompactionResult {
641                        events_removed: 0,
642                        compacted_before_seq: 0,
643                    }),
644                }
645            }
646        }
647    }
648}
649
650/// Check whether an event envelope matches the given filter criteria.
651fn filter_matches(filter: &EventFilter, env: &EventEnvelope) -> bool {
652    if let Some(ref realm) = filter.realm {
653        if env.realm != *realm {
654            return false;
655        }
656    }
657    if let Some(ref ns) = filter.namespace {
658        if env.namespace != *ns {
659            return false;
660        }
661    }
662    if let Some(ref et) = filter.event_type {
663        if env.event_type() != et.as_str() {
664            return false;
665        }
666    }
667    if let Some(ref aid) = filter.agent_id {
668        if env.agent_id != *aid {
669            return false;
670        }
671    }
672    if let Some(after) = filter.after_us {
673        if env.timestamp_us < after {
674            return false;
675        }
676    }
677    if let Some(before) = filter.before_us {
678        if env.timestamp_us > before {
679            return false;
680        }
681    }
682    true
683}
684
685#[cfg(test)]
686mod tests {
687    use super::*;
688    use hirn_storage::memory_store::MemoryStore;
689
690    fn null_storage() -> Arc<dyn PhysicalStore> {
691        Arc::new(MemoryStore::new())
692    }
693
694    #[tokio::test]
695    async fn open_on_empty_storage() {
696        let log = EventLog::open(null_storage()).await.unwrap();
697        assert_eq!(log.next_seq(), 0);
698    }
699
700    #[tokio::test]
701    async fn append_assigns_sequential_seqs() {
702        let log = EventLog::open(null_storage()).await.unwrap();
703
704        let e1 = log
705            .append(
706                "r",
707                "ns",
708                "a",
709                MemoryEvent::WorkingPushed {
710                    id: hirn_core::id::MemoryId::new(),
711                },
712            )
713            .await
714            .unwrap();
715        assert_eq!(e1.seq, 0);
716
717        let e2 = log
718            .append(
719                "r",
720                "ns",
721                "a",
722                MemoryEvent::Archived {
723                    id: hirn_core::id::MemoryId::new(),
724                },
725            )
726            .await
727            .unwrap();
728        assert_eq!(e2.seq, 1);
729
730        assert_eq!(log.next_seq(), 2);
731    }
732
733    #[tokio::test]
734    async fn append_batch_consecutive_seqs() {
735        let log = EventLog::open(null_storage()).await.unwrap();
736
737        let events = vec![
738            MemoryEvent::WorkingPushed {
739                id: hirn_core::id::MemoryId::new(),
740            },
741            MemoryEvent::Archived {
742                id: hirn_core::id::MemoryId::new(),
743            },
744            MemoryEvent::Consolidated {
745                records_processed: 5,
746            },
747        ];
748
749        let envs = log.append_batch("r", "ns", "a", events).await.unwrap();
750        assert_eq!(envs.len(), 3);
751        assert_eq!(envs[0].seq, 0);
752        assert_eq!(envs[1].seq, 1);
753        assert_eq!(envs[2].seq, 2);
754        assert_eq!(log.next_seq(), 3);
755    }
756
757    #[tokio::test]
758    async fn broadcast_subscriber_receives_events() {
759        let log = EventLog::open(null_storage()).await.unwrap();
760        let mut rx = log.subscribe();
761
762        let id = hirn_core::id::MemoryId::new();
763        log.append("r", "ns", "a", MemoryEvent::WorkingPushed { id })
764            .await
765            .unwrap();
766
767        let received = rx.try_recv().unwrap();
768        assert_eq!(received.seq, 0);
769        assert_eq!(received.event_type(), "working_pushed");
770    }
771}