Skip to main content

inferadb_ledger_state/
events.rs

1//! Events storage layer for the audit event system.
2//!
3//! Provides [`EventStore`] for low-level CRUD on the `Events` B+ tree table,
4//! and [`EventsDatabase`] as a managed wrapper around the dedicated `events.db`
5//! database file. The Events table lives in a separate database from `state.db`
6//! to avoid write lock contention between handler-phase event writes and
7//! Raft apply-phase state mutations.
8
9use std::{path::Path, sync::Arc};
10
11use inferadb_ledger_store::{
12    Database, FileBackend, InMemoryBackend, ReadTransaction, StorageBackend, Table, TableId,
13    WriteTransaction,
14};
15use inferadb_ledger_types::{
16    CodecError, OrganizationId,
17    events::{EventEntry, EventMeta},
18};
19use snafu::{ResultExt, Snafu};
20
21use crate::events_keys::{
22    encode_event_index_key, encode_event_index_value, encode_event_key, org_prefix,
23    primary_key_from_index_value,
24};
25
26/// Events table: stores audit event entries in a dedicated `events.db` database.
27///
28/// Uses `TableId::Entities` (value 0) as the table slot — safe because
29/// the Events table is the sole occupant of its own [`Database`] instance.
30pub struct Events;
31
32impl Table for Events {
33    const ID: TableId = TableId::Entities;
34    type KeyType = Vec<u8>;
35    type ValueType = Vec<u8>;
36}
37
38/// Secondary index for O(log n) event lookups by ID.
39///
40/// Maps `(org_id, event_id)` → `(timestamp_ns, event_hash)`. Given these
41/// components, the full 24-byte primary key can be reconstructed for a
42/// direct `Events` table lookup. Two B+ tree lookups total: index + primary.
43///
44/// Uses `TableId::Relationships` (value 1) — unused in `events.db`.
45pub struct EventIndex;
46
47impl Table for EventIndex {
48    const ID: TableId = TableId::Relationships;
49    type KeyType = Vec<u8>;
50    type ValueType = Vec<u8>;
51}
52
53/// Errors returned by [`EventStore`] operations.
54#[derive(Debug, Snafu)]
55pub enum EventStoreError {
56    /// Underlying storage operation failed.
57    #[snafu(display("Storage error: {source}"))]
58    Storage {
59        /// The underlying store error.
60        source: inferadb_ledger_store::Error,
61        /// Source code location for debugging.
62        #[snafu(implicit)]
63        location: snafu::Location,
64    },
65
66    /// Serialization or deserialization failed.
67    #[snafu(display("Codec error: {source}"))]
68    Codec {
69        /// The codec error.
70        source: CodecError,
71        /// Source code location for debugging.
72        #[snafu(implicit)]
73        location: snafu::Location,
74    },
75}
76
77/// Result type for event store operations.
78pub type Result<T> = std::result::Result<T, EventStoreError>;
79
80/// Low-level event storage operations on raw transactions.
81///
82/// Stateless — all operations take a transaction reference. Follows the
83/// [`EntityStore`](crate::EntityStore) pattern.
84pub struct EventStore;
85
86impl EventStore {
87    /// Writes an event entry to the Events table.
88    ///
89    /// The key is derived from the entry's `organization_id`, `timestamp`,
90    /// and `event_id` fields.
91    ///
92    /// # Errors
93    ///
94    /// Returns `EventStoreError::Codec` if serialization fails.
95    /// Returns `EventStoreError::Storage` if the write transaction fails.
96    pub fn write<B: StorageBackend>(
97        txn: &mut WriteTransaction<'_, B>,
98        entry: &EventEntry,
99    ) -> Result<()> {
100        let timestamp_ns = entry.timestamp.timestamp_nanos_opt().unwrap_or(0) as u64;
101        let key = encode_event_key(entry.organization_id, timestamp_ns, &entry.event_id);
102        let value = inferadb_ledger_types::encode(entry).context(CodecSnafu)?;
103        txn.insert::<Events>(&key, &value).context(StorageSnafu)?;
104
105        // Maintain secondary index: (org_id, event_id) → (timestamp_ns, event_hash)
106        let idx_key = encode_event_index_key(entry.organization_id, &entry.event_id);
107        let idx_val = encode_event_index_value(timestamp_ns, &entry.event_id);
108        txn.insert::<EventIndex>(&idx_key, &idx_val).context(StorageSnafu)?;
109
110        Ok(())
111    }
112
113    /// Retrieves a single event by its components.
114    ///
115    /// # Errors
116    ///
117    /// Returns `EventStoreError::Storage` if the read transaction fails.
118    /// Returns `EventStoreError::Codec` if deserialization fails.
119    pub fn get<B: StorageBackend>(
120        txn: &ReadTransaction<'_, B>,
121        org_id: OrganizationId,
122        timestamp_ns: u64,
123        event_id: &[u8; 16],
124    ) -> Result<Option<EventEntry>> {
125        let key = encode_event_key(org_id, timestamp_ns, event_id);
126        match txn.get::<Events>(&key).context(StorageSnafu)? {
127            Some(data) => {
128                let entry: EventEntry = inferadb_ledger_types::decode(&data).context(CodecSnafu)?;
129                Ok(Some(entry))
130            },
131            None => Ok(None),
132        }
133    }
134
135    /// Retrieves a single event by organization ID and event ID using the
136    /// secondary index for O(log n) lookup.
137    ///
138    /// Performs two B+ tree lookups:
139    /// 1. `EventIndex`: `(org_id, event_id)` → `(timestamp_ns, event_hash)`
140    /// 2. `Events`: reconstructed primary key → `EventEntry`
141    ///
142    /// Returns `None` if the index entry is missing or if the primary entry
143    /// has been GC'd (orphaned index entry).
144    ///
145    /// # Errors
146    ///
147    /// Returns `EventStoreError::Storage` if either read fails.
148    /// Returns `EventStoreError::Codec` if deserialization fails.
149    pub fn get_by_id<B: StorageBackend>(
150        txn: &ReadTransaction<'_, B>,
151        org_id: OrganizationId,
152        event_id: &[u8; 16],
153    ) -> Result<Option<EventEntry>> {
154        // Step 1: Index lookup
155        let idx_key = encode_event_index_key(org_id, event_id);
156        let idx_val = match txn.get::<EventIndex>(&idx_key).context(StorageSnafu)? {
157            Some(v) => v,
158            None => return Ok(None),
159        };
160
161        // Step 2: Reconstruct primary key and fetch
162        let primary_key = primary_key_from_index_value(org_id, &idx_val);
163        match txn.get::<Events>(&primary_key).context(StorageSnafu)? {
164            Some(data) => {
165                let entry: EventEntry = inferadb_ledger_types::decode(&data).context(CodecSnafu)?;
166                Ok(Some(entry))
167            },
168            // Orphaned index entry — primary was GC'd
169            None => Ok(None),
170        }
171    }
172
173    /// Lists events for an organization within a time range, with cursor-based pagination.
174    ///
175    /// Returns entries in chronological order (ascending timestamp). The optional
176    /// `after_key` parameter is an opaque 24-byte resume cursor from a previous call.
177    ///
178    /// Returns `(entries, next_cursor)` where `next_cursor` is `Some` if more
179    /// results are available.
180    ///
181    /// # Errors
182    ///
183    /// Returns `EventStoreError::Storage` if the read transaction or iterator fails.
184    /// Returns `EventStoreError::Codec` if deserialization of any entry fails.
185    pub fn list<B: StorageBackend>(
186        txn: &ReadTransaction<'_, B>,
187        org_id: OrganizationId,
188        start_ns: u64,
189        end_ns: u64,
190        limit: usize,
191        after_key: Option<&[u8]>,
192    ) -> Result<(Vec<EventEntry>, Option<Vec<u8>>)> {
193        let start_key = match after_key {
194            Some(cursor) => {
195                // Resume after the cursor key — increment last byte to get exclusive start
196                let mut resume = cursor.to_vec();
197                // Increment the key to skip the cursor entry itself
198                increment_key(&mut resume);
199                resume
200            },
201            None => {
202                let prefix = crate::events_keys::org_time_prefix(org_id, start_ns);
203                prefix.to_vec()
204            },
205        };
206
207        let end_key = crate::events_keys::org_time_prefix(org_id, end_ns);
208        let org_bytes = org_prefix(org_id);
209
210        // Fetch limit + 1 to detect if more results exist
211        let fetch_limit = limit.saturating_add(1);
212
213        let iter =
214            txn.range::<Events>(Some(&start_key), Some(&end_key.to_vec())).context(StorageSnafu)?;
215
216        let mut entries = Vec::with_capacity(limit);
217        let mut last_key: Option<Vec<u8>> = None;
218
219        for (key_bytes, value_bytes) in iter {
220            // Ensure we're still within the target org
221            if key_bytes.len() < 8 || key_bytes[..8] != org_bytes[..] {
222                break;
223            }
224
225            if entries.len() >= fetch_limit {
226                break;
227            }
228
229            let entry: EventEntry =
230                inferadb_ledger_types::decode(&value_bytes).context(CodecSnafu)?;
231            last_key = Some(key_bytes);
232            entries.push(entry);
233        }
234
235        if entries.len() > limit {
236            // More results available — return the cursor for the last included entry
237            entries.truncate(limit);
238            let cursor = last_key.map(|_| {
239                // The cursor is the key of the last entry we're returning
240                let last_entry = &entries[limit - 1];
241                let ts = last_entry.timestamp.timestamp_nanos_opt().unwrap_or(0) as u64;
242                encode_event_key(last_entry.organization_id, ts, &last_entry.event_id)
243            });
244            Ok((entries, cursor))
245        } else {
246            Ok((entries, None))
247        }
248    }
249
250    /// Deletes expired event entries using the collect-then-delete pattern.
251    ///
252    /// Scans all entries using thin deserialization ([`EventMeta`]) to check
253    /// `expires_at`. Expired entries are fully deserialized to extract
254    /// `event_id` and `organization_id` for secondary index cleanup.
255    ///
256    /// Returns the number of entries deleted.
257    ///
258    /// # Errors
259    ///
260    /// Returns `EventStoreError::Storage` if read/write operations fail.
261    /// Returns `EventStoreError::Codec` if deserialization fails.
262    pub fn delete_expired<B: StorageBackend>(
263        read_txn: &ReadTransaction<'_, B>,
264        write_txn: &mut WriteTransaction<'_, B>,
265        now_unix: u64,
266        max_batch: usize,
267    ) -> Result<usize> {
268        let mut expired: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
269
270        let iter = read_txn.iter::<Events>().context(StorageSnafu)?;
271
272        for (key_bytes, value_bytes) in iter {
273            if expired.len() >= max_batch {
274                break;
275            }
276
277            // Thin deserialization — reads emission (field 1) + expires_at (field 2)
278            let meta: EventMeta =
279                inferadb_ledger_types::decode(&value_bytes).context(CodecSnafu)?;
280
281            if meta.expires_at > 0 && meta.expires_at < now_unix {
282                expired.push((key_bytes, value_bytes));
283            }
284        }
285
286        let deleted = expired.len();
287        for (key, value_bytes) in expired {
288            // Full deserialization to get event_id + organization_id for index cleanup
289            let entry: EventEntry =
290                inferadb_ledger_types::decode(&value_bytes).context(CodecSnafu)?;
291
292            write_txn.delete::<Events>(&key).context(StorageSnafu)?;
293
294            // Remove secondary index entry
295            let idx_key = encode_event_index_key(entry.organization_id, &entry.event_id);
296            write_txn.delete::<EventIndex>(&idx_key).context(StorageSnafu)?;
297        }
298
299        Ok(deleted)
300    }
301
302    /// Counts entries for a specific organization.
303    ///
304    /// # Errors
305    ///
306    /// Returns `EventStoreError::Storage` if the iterator or read transaction fails.
307    pub fn count<B: StorageBackend>(
308        txn: &ReadTransaction<'_, B>,
309        org_id: OrganizationId,
310    ) -> Result<u64> {
311        let prefix = org_prefix(org_id);
312        let mut count = 0u64;
313
314        let iter = txn.iter::<Events>().context(StorageSnafu)?;
315
316        for (key_bytes, _) in iter {
317            if key_bytes.len() < 8 {
318                continue;
319            }
320            if key_bytes[..8] < prefix[..] {
321                continue;
322            }
323            if key_bytes[..8] > prefix[..] {
324                break;
325            }
326            count += 1;
327        }
328
329        Ok(count)
330    }
331
332    /// Scans all apply-phase events for inclusion in Raft snapshots.
333    ///
334    /// Uses thin deserialization ([`EmissionMeta`](inferadb_ledger_types::events::EmissionMeta)) to
335    /// read only the `emission` discriminant (~1–2 bytes) for each event, skipping full
336    /// deserialization of handler-phase events (~55% of total). Apply-phase events are then
337    /// fully deserialized, sorted newest-first, and truncated to `max_entries`.
338    ///
339    /// # Errors
340    ///
341    /// Returns `EventStoreError::Storage` if the iterator fails.
342    /// Returns `EventStoreError::Codec` if deserialization fails.
343    pub fn scan_apply_phase<B: StorageBackend>(
344        txn: &ReadTransaction<'_, B>,
345        max_entries: usize,
346    ) -> Result<Vec<inferadb_ledger_types::events::EventEntry>> {
347        use inferadb_ledger_types::events::{EmissionMeta, EventEmission};
348
349        let iter = txn.iter::<Events>().context(StorageSnafu)?;
350
351        let mut entries: Vec<inferadb_ledger_types::events::EventEntry> = Vec::new();
352
353        for (_key_bytes, value_bytes) in iter {
354            // Thin deserialization — reads only the emission discriminant (~1–2 bytes)
355            let meta: EmissionMeta =
356                inferadb_ledger_types::decode(&value_bytes).context(CodecSnafu)?;
357
358            if matches!(meta.emission, EventEmission::ApplyPhase) {
359                // Full deserialization only for apply-phase events
360                let entry: inferadb_ledger_types::events::EventEntry =
361                    inferadb_ledger_types::decode(&value_bytes).context(CodecSnafu)?;
362                entries.push(entry);
363            }
364        }
365
366        // Sort by timestamp descending (most recent first) to keep newest entries
367        entries.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
368
369        // Keep only the most recent `max_entries`
370        entries.truncate(max_entries);
371
372        Ok(entries)
373    }
374
375    /// Scans apply-phase events using per-organization range scans with a
376    /// timestamp cutoff.
377    ///
378    /// For each organization, performs a B-tree range scan starting at the
379    /// cutoff timestamp (inclusive) through the organization's maximum key.
380    /// This avoids loading all events into memory — only events newer than
381    /// the cutoff are read.
382    ///
383    /// Returns serialized event entries (postcard-encoded `EventEntry`).
384    /// Total results are capped at `max_entries`.
385    ///
386    /// # Arguments
387    ///
388    /// * `txn` — Read transaction on the events database
389    /// * `org_ids` — Organization IDs to scan (determines per-org range bounds)
390    /// * `cutoff_timestamp_ns` — Minimum timestamp (nanoseconds since epoch); events older than
391    ///   this are excluded. Use `0` to include all events.
392    /// * `max_entries` — Global cap on total returned events
393    ///
394    /// # Errors
395    ///
396    /// Returns `EventStoreError::Storage` if any iterator fails.
397    /// Returns `EventStoreError::Codec` if deserialization fails.
398    pub fn scan_apply_phase_ranged<B: StorageBackend>(
399        txn: &ReadTransaction<'_, B>,
400        org_ids: &[OrganizationId],
401        cutoff_timestamp_ns: u64,
402        max_entries: usize,
403    ) -> Result<Vec<Vec<u8>>> {
404        use inferadb_ledger_types::events::{EmissionMeta, EventEmission};
405
406        let mut result = Vec::new();
407
408        for &org_id in org_ids {
409            if result.len() >= max_entries {
410                break;
411            }
412
413            // Range: [org_id ++ cutoff_ts ++ 0x00..00, org_id ++ u64::MAX ++ u64::MAX]
414            let start = crate::events_keys::org_time_prefix(org_id, cutoff_timestamp_ns).to_vec();
415            let end = {
416                let mut e = crate::events_keys::org_time_prefix(org_id, u64::MAX).to_vec();
417                // Extend to full 24-byte key with max hash for inclusive upper bound
418                e.extend_from_slice(&u64::MAX.to_be_bytes());
419                e
420            };
421
422            let iter = txn.range::<Events>(Some(&start), Some(&end)).context(StorageSnafu)?;
423
424            let org_bytes = org_prefix(org_id);
425
426            for (key_bytes, value_bytes) in iter {
427                if result.len() >= max_entries {
428                    break;
429                }
430
431                // Ensure we're still within the target org
432                if key_bytes.len() < 8 || key_bytes[..8] != org_bytes[..] {
433                    break;
434                }
435
436                // Thin deserialization — reads only the emission discriminant
437                let meta: EmissionMeta =
438                    inferadb_ledger_types::decode(&value_bytes).context(CodecSnafu)?;
439
440                if matches!(meta.emission, EventEmission::ApplyPhase) {
441                    result.push(value_bytes);
442                }
443            }
444        }
445
446        Ok(result)
447    }
448}
449
450/// Increments a byte slice as a big-endian integer (for exclusive cursor starts).
451fn increment_key(key: &mut Vec<u8>) {
452    for byte in key.iter_mut().rev() {
453        if *byte < 0xFF {
454            *byte += 1;
455            return;
456        }
457        *byte = 0;
458    }
459    // Overflow — all bytes were 0xFF. Insert a leading 0x01.
460    key.insert(0, 1);
461}
462
463// ============================================================================
464// EventsDatabase wrapper
465// ============================================================================
466
467/// Errors returned by [`EventsDatabase`] operations.
468#[derive(Debug, Snafu)]
469pub enum EventsDatabaseError {
470    /// Failed to open or create the events database file.
471    #[snafu(display("Failed to open events database at {path}: {source}"))]
472    Open {
473        /// Filesystem path that failed to open.
474        path: String,
475        /// The underlying store error.
476        source: inferadb_ledger_store::Error,
477    },
478}
479
480/// Managed wrapper around the dedicated `events.db` database.
481///
482/// Wraps a [`Database`] instance with convenience methods for read/write
483/// transactions. The events database is separate from `state.db` — no write
484/// lock contention with [`StateLayer`](crate::StateLayer).
485///
486/// Generic over [`StorageBackend`] to support both file-backed (production)
487/// and in-memory (testing) backends.
488pub struct EventsDatabase<B: StorageBackend> {
489    db: Arc<Database<B>>,
490}
491
492impl EventsDatabase<FileBackend> {
493    /// Opens or creates the events database at `{data_dir}/events.db`.
494    ///
495    /// # Errors
496    ///
497    /// Returns `EventsDatabaseError::Open` if the database cannot be opened
498    /// or created.
499    pub fn open(data_dir: &Path) -> std::result::Result<Self, EventsDatabaseError> {
500        let path = data_dir.join("events.db");
501        let db =
502            if path.exists() { Database::open(&path) } else { Database::create(&path) }.map_err(
503                |e| EventsDatabaseError::Open { path: path.display().to_string(), source: e },
504            )?;
505
506        Ok(Self { db: Arc::new(db) })
507    }
508}
509
510impl EventsDatabase<InMemoryBackend> {
511    /// Creates an in-memory events database for testing.
512    ///
513    /// # Errors
514    ///
515    /// Returns `EventsDatabaseError::Open` if the in-memory database cannot
516    /// be created.
517    pub fn open_in_memory() -> std::result::Result<Self, EventsDatabaseError> {
518        let db = Database::open_in_memory()
519            .map_err(|e| EventsDatabaseError::Open { path: ":memory:".to_string(), source: e })?;
520
521        Ok(Self { db: Arc::new(db) })
522    }
523}
524
525impl<B: StorageBackend> EventsDatabase<B> {
526    /// Begins a read transaction on the events database.
527    ///
528    /// # Errors
529    ///
530    /// Returns the store error if the read transaction cannot be started.
531    pub fn read(&self) -> inferadb_ledger_store::Result<ReadTransaction<'_, B>> {
532        self.db.read()
533    }
534
535    /// Begins a write transaction on the events database.
536    ///
537    /// # Errors
538    ///
539    /// Returns the store error if the write transaction cannot be started.
540    pub fn write(&self) -> inferadb_ledger_store::Result<WriteTransaction<'_, B>> {
541        self.db.write()
542    }
543
544    /// Returns a shared reference to the underlying database.
545    pub fn db(&self) -> &Arc<Database<B>> {
546        &self.db
547    }
548}
549
550impl<B: StorageBackend> Clone for EventsDatabase<B> {
551    fn clone(&self) -> Self {
552        Self { db: Arc::clone(&self.db) }
553    }
554}
555
556#[cfg(test)]
557#[allow(clippy::unwrap_used, clippy::expect_used, clippy::disallowed_methods, unused_mut)]
558mod tests {
559    use std::collections::BTreeMap;
560
561    use chrono::{TimeZone, Utc};
562    use inferadb_ledger_types::events::{EventAction, EventEmission, EventOutcome, EventScope};
563
564    use super::*;
565
566    fn make_entry(
567        org_id: i64,
568        event_id: [u8; 16],
569        timestamp_secs: i64,
570        expires_at: u64,
571    ) -> EventEntry {
572        EventEntry {
573            expires_at,
574            event_id,
575            source_service: "ledger".to_string(),
576            event_type: "ledger.test.event".to_string(),
577            timestamp: Utc.timestamp_opt(timestamp_secs, 0).unwrap(),
578            scope: EventScope::Organization,
579            action: EventAction::WriteCommitted,
580            emission: EventEmission::ApplyPhase,
581            principal: "test-user".to_string(),
582            organization_id: OrganizationId::new(org_id),
583            organization: None,
584            vault: None,
585            outcome: EventOutcome::Success,
586            details: BTreeMap::new(),
587            block_height: None,
588            trace_id: None,
589            correlation_id: None,
590            operations_count: None,
591        }
592    }
593
594    #[test]
595    fn write_and_get_roundtrip() {
596        let events_db = EventsDatabase::open_in_memory().expect("open");
597        let entry = make_entry(1, [1u8; 16], 1_700_000_000, 0);
598        let ts_ns = entry.timestamp.timestamp_nanos_opt().unwrap() as u64;
599
600        {
601            let mut txn = events_db.write().expect("write txn");
602            EventStore::write(&mut txn, &entry).expect("write event");
603            txn.commit().expect("commit");
604        }
605
606        {
607            let txn = events_db.read().expect("read txn");
608            let got = EventStore::get(&txn, OrganizationId::new(1), ts_ns, &[1u8; 16])
609                .expect("get")
610                .expect("should exist");
611            assert_eq!(got.event_id, entry.event_id);
612            assert_eq!(got.source_service, "ledger");
613            assert_eq!(got.principal, "test-user");
614        }
615    }
616
617    #[test]
618    fn get_nonexistent_returns_none() {
619        let events_db = EventsDatabase::open_in_memory().expect("open");
620        let txn = events_db.read().expect("read txn");
621        let result = EventStore::get(&txn, OrganizationId::new(1), 0, &[0u8; 16]).expect("get");
622        assert!(result.is_none());
623    }
624
625    #[test]
626    fn list_chronological_order() {
627        let events_db = EventsDatabase::open_in_memory().expect("open");
628        let org_id = OrganizationId::new(1);
629
630        // Write entries with deliberately out-of-order timestamps
631        {
632            let mut txn = events_db.write().expect("write txn");
633            let timestamps = [1_700_000_003i64, 1_700_000_001, 1_700_000_002];
634            for (i, &ts) in timestamps.iter().enumerate() {
635                let mut entry = make_entry(1, [0u8; 16], ts, 0);
636                entry.event_id = [i as u8; 16];
637                EventStore::write(&mut txn, &entry).expect("write");
638            }
639            txn.commit().expect("commit");
640        }
641
642        {
643            let txn = events_db.read().expect("read txn");
644            let (entries, cursor) =
645                EventStore::list(&txn, org_id, 0, u64::MAX, 10, None).expect("list");
646
647            assert_eq!(entries.len(), 3);
648            assert!(cursor.is_none(), "no more results");
649
650            // Verify ascending timestamp order
651            assert!(entries[0].timestamp <= entries[1].timestamp);
652            assert!(entries[1].timestamp <= entries[2].timestamp);
653        }
654    }
655
656    #[test]
657    fn list_with_pagination() {
658        let events_db = EventsDatabase::open_in_memory().expect("open");
659        let org_id = OrganizationId::new(1);
660
661        {
662            let mut txn = events_db.write().expect("write txn");
663            for i in 0..5u8 {
664                let entry = make_entry(1, [i; 16], 1_700_000_000 + i64::from(i), 0);
665                EventStore::write(&mut txn, &entry).expect("write");
666            }
667            txn.commit().expect("commit");
668        }
669
670        // Page 1: first 2 entries
671        let txn = events_db.read().expect("read txn");
672        let (page1, cursor1) =
673            EventStore::list(&txn, org_id, 0, u64::MAX, 2, None).expect("list page 1");
674        assert_eq!(page1.len(), 2);
675        assert!(cursor1.is_some(), "should have more");
676
677        // Page 2: next 2 entries
678        let (page2, cursor2) = EventStore::list(&txn, org_id, 0, u64::MAX, 2, cursor1.as_deref())
679            .expect("list page 2");
680        assert_eq!(page2.len(), 2);
681        assert!(cursor2.is_some(), "should have more");
682
683        // Page 3: last entry
684        let (page3, cursor3) = EventStore::list(&txn, org_id, 0, u64::MAX, 2, cursor2.as_deref())
685            .expect("list page 3");
686        assert_eq!(page3.len(), 1);
687        assert!(cursor3.is_none(), "no more");
688
689        // Verify no duplicates across pages
690        let all_ids: Vec<[u8; 16]> =
691            page1.iter().chain(page2.iter()).chain(page3.iter()).map(|e| e.event_id).collect();
692        assert_eq!(all_ids.len(), 5);
693        for (i, id) in all_ids.iter().enumerate() {
694            for (j, other) in all_ids.iter().enumerate() {
695                if i != j {
696                    assert_ne!(id, other, "duplicate event ID at positions {i} and {j}");
697                }
698            }
699        }
700    }
701
702    #[test]
703    fn organization_isolation() {
704        let events_db = EventsDatabase::open_in_memory().expect("open");
705
706        {
707            let mut txn = events_db.write().expect("write txn");
708            // 10 events for org A
709            for i in 0..10u8 {
710                let entry = make_entry(1, [i; 16], 1_700_000_000 + i64::from(i), 0);
711                EventStore::write(&mut txn, &entry).expect("write org A");
712            }
713            // 10 events for org B
714            for i in 0..10u8 {
715                let entry = make_entry(2, [100 + i; 16], 1_700_000_000 + i64::from(i), 0);
716                EventStore::write(&mut txn, &entry).expect("write org B");
717            }
718            txn.commit().expect("commit");
719        }
720
721        let txn = events_db.read().expect("read txn");
722
723        // Org A query should return zero org B events
724        let (org_a_events, _) =
725            EventStore::list(&txn, OrganizationId::new(1), 0, u64::MAX, 100, None)
726                .expect("list org A");
727        assert_eq!(org_a_events.len(), 10);
728        for e in &org_a_events {
729            assert_eq!(e.organization_id, OrganizationId::new(1));
730        }
731
732        // Org B query should return zero org A events
733        let (org_b_events, _) =
734            EventStore::list(&txn, OrganizationId::new(2), 0, u64::MAX, 100, None)
735                .expect("list org B");
736        assert_eq!(org_b_events.len(), 10);
737        for e in &org_b_events {
738            assert_eq!(e.organization_id, OrganizationId::new(2));
739        }
740    }
741
742    #[test]
743    fn system_events_isolated() {
744        let events_db = EventsDatabase::open_in_memory().expect("open");
745        let system_org = OrganizationId::new(0);
746        let regular_org = OrganizationId::new(42);
747
748        {
749            let mut txn = events_db.write().expect("write txn");
750            // System event
751            let mut sys_entry = make_entry(0, [1u8; 16], 1_700_000_000, 0);
752            sys_entry.scope = EventScope::System;
753            sys_entry.action = EventAction::OrganizationCreated;
754            EventStore::write(&mut txn, &sys_entry).expect("write system");
755
756            // Regular org event
757            let org_entry = make_entry(42, [2u8; 16], 1_700_000_000, 0);
758            EventStore::write(&mut txn, &org_entry).expect("write org");
759            txn.commit().expect("commit");
760        }
761
762        let txn = events_db.read().expect("read txn");
763
764        let (system_events, _) =
765            EventStore::list(&txn, system_org, 0, u64::MAX, 100, None).expect("list system");
766        assert_eq!(system_events.len(), 1);
767        assert_eq!(system_events[0].organization_id, system_org);
768
769        let (org_events, _) =
770            EventStore::list(&txn, regular_org, 0, u64::MAX, 100, None).expect("list org");
771        assert_eq!(org_events.len(), 1);
772        assert_eq!(org_events[0].organization_id, regular_org);
773    }
774
775    #[test]
776    fn empty_org_scan_returns_empty() {
777        let events_db = EventsDatabase::open_in_memory().expect("open");
778        let txn = events_db.read().expect("read txn");
779        let (entries, cursor) =
780            EventStore::list(&txn, OrganizationId::new(999), 0, u64::MAX, 100, None).expect("list");
781        assert!(entries.is_empty());
782        assert!(cursor.is_none());
783    }
784
785    #[test]
786    fn count_consistent_with_list() {
787        let events_db = EventsDatabase::open_in_memory().expect("open");
788        let org_id = OrganizationId::new(5);
789
790        {
791            let mut txn = events_db.write().expect("write txn");
792            for i in 0..7u8 {
793                let entry = make_entry(5, [i; 16], 1_700_000_000 + i64::from(i), 0);
794                EventStore::write(&mut txn, &entry).expect("write");
795            }
796            txn.commit().expect("commit");
797        }
798
799        let txn = events_db.read().expect("read txn");
800        let count = EventStore::count(&txn, org_id).expect("count");
801        let (entries, _) = EventStore::list(&txn, org_id, 0, u64::MAX, 100, None).expect("list");
802        assert_eq!(count, entries.len() as u64);
803    }
804
805    #[test]
806    fn count_after_deletion() {
807        let events_db = EventsDatabase::open_in_memory().expect("open");
808        let org_id = OrganizationId::new(1);
809
810        {
811            let mut txn = events_db.write().expect("write txn");
812            // Write 3 entries, 2 expired
813            let e1 = make_entry(1, [1u8; 16], 1_700_000_000, 100); // expired
814            let e2 = make_entry(1, [2u8; 16], 1_700_000_001, 200); // expired
815            let e3 = make_entry(1, [3u8; 16], 1_700_000_002, 0); // no expiry
816            EventStore::write(&mut txn, &e1).expect("write");
817            EventStore::write(&mut txn, &e2).expect("write");
818            EventStore::write(&mut txn, &e3).expect("write");
819            txn.commit().expect("commit");
820        }
821
822        // Delete expired (now_unix > both expires_at values)
823        {
824            let read_txn = events_db.read().expect("read txn");
825            let mut write_txn = events_db.write().expect("write txn");
826            let deleted =
827                EventStore::delete_expired(&read_txn, &mut write_txn, 300, 100).expect("gc");
828            assert_eq!(deleted, 2);
829            write_txn.commit().expect("commit");
830        }
831
832        let txn = events_db.read().expect("read txn");
833        let count = EventStore::count(&txn, org_id).expect("count");
834        assert_eq!(count, 1);
835    }
836
837    #[test]
838    fn delete_expired_respects_batch_limit() {
839        let events_db = EventsDatabase::open_in_memory().expect("open");
840
841        {
842            let mut txn = events_db.write().expect("write txn");
843            for i in 0..10u8 {
844                let entry = make_entry(1, [i; 16], 1_700_000_000 + i64::from(i), 50);
845                EventStore::write(&mut txn, &entry).expect("write");
846            }
847            txn.commit().expect("commit");
848        }
849
850        // Delete with batch limit of 3
851        {
852            let read_txn = events_db.read().expect("read txn");
853            let mut write_txn = events_db.write().expect("write txn");
854            let deleted =
855                EventStore::delete_expired(&read_txn, &mut write_txn, 100, 3).expect("gc");
856            assert_eq!(deleted, 3, "should respect batch limit");
857            write_txn.commit().expect("commit");
858        }
859
860        let txn = events_db.read().expect("read txn");
861        let count = EventStore::count(&txn, OrganizationId::new(1)).expect("count");
862        assert_eq!(count, 7, "7 entries remain after deleting 3");
863    }
864
865    #[test]
866    fn delete_expired_preserves_non_expired() {
867        let events_db = EventsDatabase::open_in_memory().expect("open");
868
869        {
870            let mut txn = events_db.write().expect("write txn");
871            let expired = make_entry(1, [1u8; 16], 1_700_000_000, 50); // expired
872            let future = make_entry(1, [2u8; 16], 1_700_000_001, 999); // not yet expired
873            let no_expiry = make_entry(1, [3u8; 16], 1_700_000_002, 0); // no expiry
874            EventStore::write(&mut txn, &expired).expect("write");
875            EventStore::write(&mut txn, &future).expect("write");
876            EventStore::write(&mut txn, &no_expiry).expect("write");
877            txn.commit().expect("commit");
878        }
879
880        {
881            let read_txn = events_db.read().expect("read txn");
882            let mut write_txn = events_db.write().expect("write txn");
883            let deleted =
884                EventStore::delete_expired(&read_txn, &mut write_txn, 100, 100).expect("gc");
885            assert_eq!(deleted, 1, "only the expired entry");
886            write_txn.commit().expect("commit");
887        }
888
889        let txn = events_db.read().expect("read txn");
890        let count = EventStore::count(&txn, OrganizationId::new(1)).expect("count");
891        assert_eq!(count, 2, "future + no_expiry remain");
892    }
893
894    #[test]
895    fn delete_expired_zero_expires_at_never_deleted() {
896        let events_db = EventsDatabase::open_in_memory().expect("open");
897
898        {
899            let mut txn = events_db.write().expect("write txn");
900            for i in 0..5u8 {
901                let entry = make_entry(1, [i; 16], 1_700_000_000 + i64::from(i), 0);
902                EventStore::write(&mut txn, &entry).expect("write");
903            }
904            txn.commit().expect("commit");
905        }
906
907        {
908            let read_txn = events_db.read().expect("read txn");
909            let mut write_txn = events_db.write().expect("write txn");
910            let deleted =
911                EventStore::delete_expired(&read_txn, &mut write_txn, u64::MAX, 100).expect("gc");
912            assert_eq!(deleted, 0);
913            write_txn.commit().expect("commit");
914        }
915
916        let txn = events_db.read().expect("read txn");
917        let count = EventStore::count(&txn, OrganizationId::new(1)).expect("count");
918        assert_eq!(count, 5);
919    }
920
921    #[test]
922    fn mixed_batch_gc() {
923        let events_db = EventsDatabase::open_in_memory().expect("open");
924
925        {
926            let mut txn = events_db.write().expect("write txn");
927
928            // 3 expired, 2 future, 2 no-expiry
929            let entries = vec![
930                make_entry(1, [1u8; 16], 1_700_000_001, 10),  // expired
931                make_entry(1, [2u8; 16], 1_700_000_002, 20),  // expired
932                make_entry(1, [3u8; 16], 1_700_000_003, 30),  // expired
933                make_entry(1, [4u8; 16], 1_700_000_004, 500), // future
934                make_entry(1, [5u8; 16], 1_700_000_005, 600), // future
935                make_entry(1, [6u8; 16], 1_700_000_006, 0),   // no expiry
936                make_entry(1, [7u8; 16], 1_700_000_007, 0),   // no expiry
937            ];
938            for entry in &entries {
939                EventStore::write(&mut txn, entry).expect("write");
940            }
941            txn.commit().expect("commit");
942        }
943
944        {
945            let read_txn = events_db.read().expect("read txn");
946            let mut write_txn = events_db.write().expect("write txn");
947            let deleted =
948                EventStore::delete_expired(&read_txn, &mut write_txn, 100, 100).expect("gc");
949            assert_eq!(deleted, 3);
950            write_txn.commit().expect("commit");
951        }
952
953        let txn = events_db.read().expect("read txn");
954        let count = EventStore::count(&txn, OrganizationId::new(1)).expect("count");
955        assert_eq!(count, 4);
956    }
957
958    #[test]
959    fn scan_apply_phase_filters_handler_events() {
960        let events_db = EventsDatabase::open_in_memory().expect("open");
961
962        {
963            let mut txn = events_db.write().expect("write txn");
964
965            // Apply-phase event
966            let apply_entry = make_entry(1, [1u8; 16], 1_700_000_000, 0);
967            EventStore::write(&mut txn, &apply_entry).expect("write apply");
968
969            // Handler-phase event
970            let mut handler_entry = make_entry(1, [2u8; 16], 1_700_000_001, 0);
971            handler_entry.emission = EventEmission::HandlerPhase { node_id: 1 };
972            EventStore::write(&mut txn, &handler_entry).expect("write handler");
973
974            txn.commit().expect("commit");
975        }
976
977        let txn = events_db.read().expect("read txn");
978        let results = EventStore::scan_apply_phase(&txn, 100).expect("scan");
979
980        assert_eq!(results.len(), 1, "only apply-phase event should be included");
981        assert_eq!(results[0].event_id, [1u8; 16]);
982        assert!(matches!(results[0].emission, EventEmission::ApplyPhase));
983    }
984
985    #[test]
986    fn scan_apply_phase_respects_max_entries() {
987        let events_db = EventsDatabase::open_in_memory().expect("open");
988
989        {
990            let mut txn = events_db.write().expect("write txn");
991            for i in 0..10u8 {
992                let entry = make_entry(1, [i; 16], 1_700_000_000 + i64::from(i), 0);
993                EventStore::write(&mut txn, &entry).expect("write");
994            }
995            txn.commit().expect("commit");
996        }
997
998        let txn = events_db.read().expect("read txn");
999        let results = EventStore::scan_apply_phase(&txn, 3).expect("scan");
1000
1001        assert_eq!(results.len(), 3, "should truncate to max_entries");
1002        // Should keep the 3 newest (highest timestamps)
1003        assert!(results[0].timestamp >= results[1].timestamp, "sorted newest first");
1004        assert!(results[1].timestamp >= results[2].timestamp, "sorted newest first");
1005    }
1006
1007    #[test]
1008    fn scan_apply_phase_empty_db() {
1009        let events_db = EventsDatabase::open_in_memory().expect("open");
1010        let txn = events_db.read().expect("read txn");
1011        let results = EventStore::scan_apply_phase(&txn, 100).expect("scan");
1012        assert!(results.is_empty());
1013    }
1014
1015    #[test]
1016    fn scan_apply_phase_cross_org() {
1017        let events_db = EventsDatabase::open_in_memory().expect("open");
1018
1019        {
1020            let mut txn = events_db.write().expect("write txn");
1021            // Events across different orgs
1022            let e1 = make_entry(1, [1u8; 16], 1_700_000_000, 0);
1023            let e2 = make_entry(2, [2u8; 16], 1_700_000_001, 0);
1024            let e3 = make_entry(3, [3u8; 16], 1_700_000_002, 0);
1025            EventStore::write(&mut txn, &e1).expect("write");
1026            EventStore::write(&mut txn, &e2).expect("write");
1027            EventStore::write(&mut txn, &e3).expect("write");
1028            txn.commit().expect("commit");
1029        }
1030
1031        let txn = events_db.read().expect("read txn");
1032        let results = EventStore::scan_apply_phase(&txn, 100).expect("scan");
1033
1034        assert_eq!(results.len(), 3, "should scan across all organizations");
1035        // All three orgs should be represented
1036        let org_ids: Vec<_> = results.iter().map(|e| e.organization_id).collect();
1037        assert!(org_ids.contains(&OrganizationId::new(1)));
1038        assert!(org_ids.contains(&OrganizationId::new(2)));
1039        assert!(org_ids.contains(&OrganizationId::new(3)));
1040    }
1041
1042    #[test]
1043    fn events_database_clone_shares_state() {
1044        let db1 = EventsDatabase::open_in_memory().expect("open");
1045        let db2 = db1.clone();
1046
1047        // Write via db1
1048        {
1049            let mut txn = db1.write().expect("write txn");
1050            let entry = make_entry(1, [1u8; 16], 1_700_000_000, 0);
1051            EventStore::write(&mut txn, &entry).expect("write");
1052            txn.commit().expect("commit");
1053        }
1054
1055        // Read via db2
1056        {
1057            let txn = db2.read().expect("read txn");
1058            let count = EventStore::count(&txn, OrganizationId::new(1)).expect("count");
1059            assert_eq!(count, 1, "clone should share same database");
1060        }
1061    }
1062
1063    #[test]
1064    fn list_time_range_filtering() {
1065        let events_db = EventsDatabase::open_in_memory().expect("open");
1066        let org_id = OrganizationId::new(1);
1067
1068        {
1069            let mut txn = events_db.write().expect("write txn");
1070            // Events at t=1000s, t=2000s, t=3000s
1071            for (i, ts) in [1000i64, 2000, 3000].iter().enumerate() {
1072                let entry = make_entry(1, [i as u8; 16], *ts, 0);
1073                EventStore::write(&mut txn, &entry).expect("write");
1074            }
1075            txn.commit().expect("commit");
1076        }
1077
1078        let txn = events_db.read().expect("read txn");
1079
1080        // Full range
1081        let (all, _) = EventStore::list(&txn, org_id, 0, u64::MAX, 100, None).expect("list all");
1082        assert_eq!(all.len(), 3);
1083
1084        // Only middle entry (start_ns inclusive of 2000s, end_ns exclusive of 3000s)
1085        let start_ns = 2000u64 * 1_000_000_000;
1086        let end_ns = 3000u64 * 1_000_000_000;
1087        let (mid, _) =
1088            EventStore::list(&txn, org_id, start_ns, end_ns, 100, None).expect("list mid");
1089        assert_eq!(mid.len(), 1);
1090        assert_eq!(mid[0].timestamp.timestamp(), 2000);
1091    }
1092
1093    /// Verifies GC (`delete_expired`) works correctly after the `EventEntry` field
1094    /// reorder. `EventMeta` now reads `emission` (field 1) + `expires_at` (field 2),
1095    /// so the GC must still correctly identify expired entries.
1096    #[test]
1097    fn delete_expired_works_after_field_reorder() {
1098        let events_db = EventsDatabase::open_in_memory().expect("open");
1099
1100        {
1101            let mut txn = events_db.write().expect("write txn");
1102
1103            // Expired apply-phase event
1104            let e1 = make_entry(1, [1u8; 16], 1_700_000_000, 50);
1105            EventStore::write(&mut txn, &e1).expect("write");
1106
1107            // Expired handler-phase event
1108            let mut e2 = make_entry(1, [2u8; 16], 1_700_000_001, 50);
1109            e2.emission = EventEmission::HandlerPhase { node_id: 1 };
1110            EventStore::write(&mut txn, &e2).expect("write");
1111
1112            // Non-expired event
1113            let e3 = make_entry(1, [3u8; 16], 1_700_000_002, 0);
1114            EventStore::write(&mut txn, &e3).expect("write");
1115
1116            txn.commit().expect("commit");
1117        }
1118
1119        {
1120            let read_txn = events_db.read().expect("read txn");
1121            let mut write_txn = events_db.write().expect("write txn");
1122            let deleted =
1123                EventStore::delete_expired(&read_txn, &mut write_txn, 100, 100).expect("gc");
1124            assert_eq!(deleted, 2, "both expired entries should be deleted");
1125            write_txn.commit().expect("commit");
1126        }
1127
1128        let txn = events_db.read().expect("read txn");
1129        let count = EventStore::count(&txn, OrganizationId::new(1)).expect("count");
1130        assert_eq!(count, 1, "only non-expired entry remains");
1131    }
1132
1133    /// 50 apply-phase + 50 handler-phase events: scan returns exactly 50 apply-phase.
1134    #[test]
1135    fn scan_apply_phase_50_apply_50_handler() {
1136        let events_db = EventsDatabase::open_in_memory().expect("open");
1137
1138        {
1139            let mut txn = events_db.write().expect("write txn");
1140
1141            for i in 0..50u8 {
1142                let entry = make_entry(1, [i; 16], 1_700_000_000 + i64::from(i), 0);
1143                EventStore::write(&mut txn, &entry).expect("write apply");
1144            }
1145
1146            for i in 50..100u8 {
1147                let mut entry = make_entry(1, [i; 16], 1_700_000_000 + i64::from(i), 0);
1148                entry.emission = EventEmission::HandlerPhase { node_id: 1 };
1149                EventStore::write(&mut txn, &entry).expect("write handler");
1150            }
1151
1152            txn.commit().expect("commit");
1153        }
1154
1155        let txn = events_db.read().expect("read txn");
1156        let results = EventStore::scan_apply_phase(&txn, 1000).expect("scan");
1157
1158        assert_eq!(results.len(), 50, "exactly 50 apply-phase events");
1159        for entry in &results {
1160            assert!(
1161                matches!(entry.emission, EventEmission::ApplyPhase),
1162                "all entries must be apply-phase"
1163            );
1164        }
1165    }
1166
1167    /// scan_apply_phase with max_entries truncation returns newest N sorted desc.
1168    #[test]
1169    fn scan_apply_phase_truncates_to_newest() {
1170        let events_db = EventsDatabase::open_in_memory().expect("open");
1171
1172        {
1173            let mut txn = events_db.write().expect("write txn");
1174            for i in 0..20u8 {
1175                let entry = make_entry(1, [i; 16], 1_700_000_000 + i64::from(i), 0);
1176                EventStore::write(&mut txn, &entry).expect("write");
1177            }
1178            txn.commit().expect("commit");
1179        }
1180
1181        let txn = events_db.read().expect("read txn");
1182        let results = EventStore::scan_apply_phase(&txn, 5).expect("scan");
1183
1184        assert_eq!(results.len(), 5, "truncated to max_entries");
1185
1186        // Newest first
1187        for window in results.windows(2) {
1188            assert!(window[0].timestamp >= window[1].timestamp, "must be sorted newest-first");
1189        }
1190
1191        // The 5 newest timestamps correspond to i=15..19
1192        let min_ts = chrono::TimeZone::timestamp_opt(&Utc, 1_700_000_015, 0).unwrap().timestamp();
1193        for entry in &results {
1194            assert!(
1195                entry.timestamp.timestamp() >= min_ts,
1196                "truncated entries should be the 5 newest"
1197            );
1198        }
1199    }
1200
1201    // ====================================================================
1202    // EventIndex / get_by_id tests
1203    // ====================================================================
1204
1205    #[test]
1206    fn get_by_id_returns_written_event() {
1207        let events_db = EventsDatabase::open_in_memory().expect("open");
1208        let entry = make_entry(1, [10u8; 16], 1_700_000_000, 0);
1209
1210        {
1211            let mut txn = events_db.write().expect("write txn");
1212            EventStore::write(&mut txn, &entry).expect("write");
1213            txn.commit().expect("commit");
1214        }
1215
1216        let txn = events_db.read().expect("read txn");
1217        let found = EventStore::get_by_id(&txn, OrganizationId::new(1), &[10u8; 16])
1218            .expect("get_by_id")
1219            .expect("should exist");
1220
1221        assert_eq!(found.event_id, [10u8; 16]);
1222        assert_eq!(found.organization_id, OrganizationId::new(1));
1223        assert_eq!(found.source_service, "ledger");
1224    }
1225
1226    #[test]
1227    fn get_by_id_nonexistent_returns_none() {
1228        let events_db = EventsDatabase::open_in_memory().expect("open");
1229        let txn = events_db.read().expect("read txn");
1230        let result =
1231            EventStore::get_by_id(&txn, OrganizationId::new(1), &[99u8; 16]).expect("get_by_id");
1232        assert!(result.is_none());
1233    }
1234
1235    #[test]
1236    fn get_by_id_wrong_org_returns_none() {
1237        let events_db = EventsDatabase::open_in_memory().expect("open");
1238        let entry = make_entry(1, [10u8; 16], 1_700_000_000, 0);
1239
1240        {
1241            let mut txn = events_db.write().expect("write txn");
1242            EventStore::write(&mut txn, &entry).expect("write");
1243            txn.commit().expect("commit");
1244        }
1245
1246        // Look up with wrong org
1247        let txn = events_db.read().expect("read txn");
1248        let result =
1249            EventStore::get_by_id(&txn, OrganizationId::new(999), &[10u8; 16]).expect("get_by_id");
1250        assert!(result.is_none(), "different org should not find the event");
1251    }
1252
1253    #[test]
1254    fn get_by_id_finds_all_n_events() {
1255        let events_db = EventsDatabase::open_in_memory().expect("open");
1256        let n = 50u8;
1257
1258        {
1259            let mut txn = events_db.write().expect("write txn");
1260            for i in 0..n {
1261                let entry = make_entry(1, [i; 16], 1_700_000_000 + i64::from(i), 0);
1262                EventStore::write(&mut txn, &entry).expect("write");
1263            }
1264            txn.commit().expect("commit");
1265        }
1266
1267        let txn = events_db.read().expect("read txn");
1268        for i in 0..n {
1269            let found = EventStore::get_by_id(&txn, OrganizationId::new(1), &[i; 16])
1270                .expect("get_by_id")
1271                .expect("should exist");
1272            assert_eq!(found.event_id, [i; 16]);
1273        }
1274    }
1275
1276    #[test]
1277    fn gc_removes_index_entries() {
1278        let events_db = EventsDatabase::open_in_memory().expect("open");
1279
1280        {
1281            let mut txn = events_db.write().expect("write txn");
1282            let e1 = make_entry(1, [1u8; 16], 1_700_000_000, 100); // expired
1283            let e2 = make_entry(1, [2u8; 16], 1_700_000_001, 200); // expired
1284            let e3 = make_entry(1, [3u8; 16], 1_700_000_002, 0); // no expiry
1285            EventStore::write(&mut txn, &e1).expect("write");
1286            EventStore::write(&mut txn, &e2).expect("write");
1287            EventStore::write(&mut txn, &e3).expect("write");
1288            txn.commit().expect("commit");
1289        }
1290
1291        // All three should be findable before GC
1292        {
1293            let txn = events_db.read().expect("read txn");
1294            assert!(
1295                EventStore::get_by_id(&txn, OrganizationId::new(1), &[1u8; 16])
1296                    .expect("get_by_id")
1297                    .is_some()
1298            );
1299            assert!(
1300                EventStore::get_by_id(&txn, OrganizationId::new(1), &[2u8; 16])
1301                    .expect("get_by_id")
1302                    .is_some()
1303            );
1304            assert!(
1305                EventStore::get_by_id(&txn, OrganizationId::new(1), &[3u8; 16])
1306                    .expect("get_by_id")
1307                    .is_some()
1308            );
1309        }
1310
1311        // GC expired entries
1312        {
1313            let read_txn = events_db.read().expect("read txn");
1314            let mut write_txn = events_db.write().expect("write txn");
1315            let deleted =
1316                EventStore::delete_expired(&read_txn, &mut write_txn, 300, 100).expect("gc");
1317            assert_eq!(deleted, 2);
1318            write_txn.commit().expect("commit");
1319        }
1320
1321        // GC'd entries should be gone from index too
1322        {
1323            let txn = events_db.read().expect("read txn");
1324            assert!(
1325                EventStore::get_by_id(&txn, OrganizationId::new(1), &[1u8; 16])
1326                    .expect("get_by_id")
1327                    .is_none(),
1328                "GC'd event should not be in index"
1329            );
1330            assert!(
1331                EventStore::get_by_id(&txn, OrganizationId::new(1), &[2u8; 16])
1332                    .expect("get_by_id")
1333                    .is_none(),
1334                "GC'd event should not be in index"
1335            );
1336            // Non-expired entry survives
1337            assert!(
1338                EventStore::get_by_id(&txn, OrganizationId::new(1), &[3u8; 16])
1339                    .expect("get_by_id")
1340                    .is_some()
1341            );
1342        }
1343    }
1344
1345    #[test]
1346    fn snapshot_restore_rebuilds_index() {
1347        let events_db = EventsDatabase::open_in_memory().expect("open");
1348
1349        // Write events to the "source" database
1350        let entries: Vec<EventEntry> =
1351            (0..10u8).map(|i| make_entry(1, [i; 16], 1_700_000_000 + i64::from(i), 0)).collect();
1352
1353        {
1354            let mut txn = events_db.write().expect("write txn");
1355            for entry in &entries {
1356                EventStore::write(&mut txn, entry).expect("write");
1357            }
1358            txn.commit().expect("commit");
1359        }
1360
1361        // Simulate snapshot: collect all entries via scan_apply_phase
1362        let snapshot_entries = {
1363            let txn = events_db.read().expect("read txn");
1364            EventStore::scan_apply_phase(&txn, 100).expect("scan")
1365        };
1366
1367        // "Restore" to a fresh database via write()
1368        let restored_db = EventsDatabase::open_in_memory().expect("open");
1369        {
1370            let mut txn = restored_db.write().expect("write txn");
1371            for entry in &snapshot_entries {
1372                EventStore::write(&mut txn, entry).expect("write");
1373            }
1374            txn.commit().expect("commit");
1375        }
1376
1377        // Verify all events are findable via get_by_id on the restored DB
1378        let txn = restored_db.read().expect("read txn");
1379        for i in 0..10u8 {
1380            let found = EventStore::get_by_id(&txn, OrganizationId::new(1), &[i; 16])
1381                .expect("get_by_id")
1382                .expect("should exist after restore");
1383            assert_eq!(found.event_id, [i; 16]);
1384        }
1385    }
1386
1387    #[test]
1388    fn index_atomicity_with_primary() {
1389        let events_db = EventsDatabase::open_in_memory().expect("open");
1390
1391        // Write without committing — then drop the transaction
1392        {
1393            let mut txn = events_db.write().expect("write txn");
1394            let entry = make_entry(1, [50u8; 16], 1_700_000_000, 0);
1395            EventStore::write(&mut txn, &entry).expect("write");
1396            // Drop without commit — implicit abort
1397        }
1398
1399        // Neither primary nor index should contain the entry
1400        let txn = events_db.read().expect("read txn");
1401        let result =
1402            EventStore::get_by_id(&txn, OrganizationId::new(1), &[50u8; 16]).expect("get_by_id");
1403        assert!(result.is_none(), "aborted txn should not write index entry");
1404
1405        let ts_ns =
1406            Utc.timestamp_opt(1_700_000_000, 0).unwrap().timestamp_nanos_opt().unwrap() as u64;
1407        let primary =
1408            EventStore::get(&txn, OrganizationId::new(1), ts_ns, &[50u8; 16]).expect("get");
1409        assert!(primary.is_none(), "aborted txn should not write primary entry");
1410    }
1411
1412    #[test]
1413    fn get_by_id_multi_org_isolation() {
1414        let events_db = EventsDatabase::open_in_memory().expect("open");
1415
1416        {
1417            let mut txn = events_db.write().expect("write txn");
1418            // Same event_id in different orgs
1419            let e1 = make_entry(1, [99u8; 16], 1_700_000_000, 0);
1420            let e2 = make_entry(2, [99u8; 16], 1_700_000_001, 0);
1421            EventStore::write(&mut txn, &e1).expect("write");
1422            EventStore::write(&mut txn, &e2).expect("write");
1423            txn.commit().expect("commit");
1424        }
1425
1426        let txn = events_db.read().expect("read txn");
1427        let found1 = EventStore::get_by_id(&txn, OrganizationId::new(1), &[99u8; 16])
1428            .expect("get_by_id")
1429            .expect("org 1 should find");
1430        let found2 = EventStore::get_by_id(&txn, OrganizationId::new(2), &[99u8; 16])
1431            .expect("get_by_id")
1432            .expect("org 2 should find");
1433
1434        assert_eq!(found1.organization_id, OrganizationId::new(1));
1435        assert_eq!(found2.organization_id, OrganizationId::new(2));
1436    }
1437}