1use std::{path::Path, sync::Arc};
10
11use inferadb_ledger_store::{
12 Database, FileBackend, InMemoryBackend, ReadTransaction, StorageBackend, Table, TableId,
13 WriteTransaction,
14};
15use inferadb_ledger_types::{
16 CodecError, OrganizationId,
17 events::{EventEntry, EventMeta},
18};
19use snafu::{ResultExt, Snafu};
20
21use crate::events_keys::{
22 encode_event_index_key, encode_event_index_value, encode_event_key, org_prefix,
23 primary_key_from_index_value,
24};
25
26pub struct Events;
31
32impl Table for Events {
33 const ID: TableId = TableId::Entities;
34 type KeyType = Vec<u8>;
35 type ValueType = Vec<u8>;
36}
37
38pub struct EventIndex;
46
47impl Table for EventIndex {
48 const ID: TableId = TableId::Relationships;
49 type KeyType = Vec<u8>;
50 type ValueType = Vec<u8>;
51}
52
53#[derive(Debug, Snafu)]
55pub enum EventStoreError {
56 #[snafu(display("Storage error: {source}"))]
58 Storage {
59 source: inferadb_ledger_store::Error,
61 #[snafu(implicit)]
63 location: snafu::Location,
64 },
65
66 #[snafu(display("Codec error: {source}"))]
68 Codec {
69 source: CodecError,
71 #[snafu(implicit)]
73 location: snafu::Location,
74 },
75}
76
77pub type Result<T> = std::result::Result<T, EventStoreError>;
79
80pub struct EventStore;
85
86impl EventStore {
87 pub fn write<B: StorageBackend>(
97 txn: &mut WriteTransaction<'_, B>,
98 entry: &EventEntry,
99 ) -> Result<()> {
100 let timestamp_ns = entry.timestamp.timestamp_nanos_opt().unwrap_or(0) as u64;
101 let key = encode_event_key(entry.organization_id, timestamp_ns, &entry.event_id);
102 let value = inferadb_ledger_types::encode(entry).context(CodecSnafu)?;
103 txn.insert::<Events>(&key, &value).context(StorageSnafu)?;
104
105 let idx_key = encode_event_index_key(entry.organization_id, &entry.event_id);
107 let idx_val = encode_event_index_value(timestamp_ns, &entry.event_id);
108 txn.insert::<EventIndex>(&idx_key, &idx_val).context(StorageSnafu)?;
109
110 Ok(())
111 }
112
113 pub fn get<B: StorageBackend>(
120 txn: &ReadTransaction<'_, B>,
121 org_id: OrganizationId,
122 timestamp_ns: u64,
123 event_id: &[u8; 16],
124 ) -> Result<Option<EventEntry>> {
125 let key = encode_event_key(org_id, timestamp_ns, event_id);
126 match txn.get::<Events>(&key).context(StorageSnafu)? {
127 Some(data) => {
128 let entry: EventEntry = inferadb_ledger_types::decode(&data).context(CodecSnafu)?;
129 Ok(Some(entry))
130 },
131 None => Ok(None),
132 }
133 }
134
135 pub fn get_by_id<B: StorageBackend>(
150 txn: &ReadTransaction<'_, B>,
151 org_id: OrganizationId,
152 event_id: &[u8; 16],
153 ) -> Result<Option<EventEntry>> {
154 let idx_key = encode_event_index_key(org_id, event_id);
156 let idx_val = match txn.get::<EventIndex>(&idx_key).context(StorageSnafu)? {
157 Some(v) => v,
158 None => return Ok(None),
159 };
160
161 let primary_key = primary_key_from_index_value(org_id, &idx_val);
163 match txn.get::<Events>(&primary_key).context(StorageSnafu)? {
164 Some(data) => {
165 let entry: EventEntry = inferadb_ledger_types::decode(&data).context(CodecSnafu)?;
166 Ok(Some(entry))
167 },
168 None => Ok(None),
170 }
171 }
172
173 pub fn list<B: StorageBackend>(
186 txn: &ReadTransaction<'_, B>,
187 org_id: OrganizationId,
188 start_ns: u64,
189 end_ns: u64,
190 limit: usize,
191 after_key: Option<&[u8]>,
192 ) -> Result<(Vec<EventEntry>, Option<Vec<u8>>)> {
193 let start_key = match after_key {
194 Some(cursor) => {
195 let mut resume = cursor.to_vec();
197 increment_key(&mut resume);
199 resume
200 },
201 None => {
202 let prefix = crate::events_keys::org_time_prefix(org_id, start_ns);
203 prefix.to_vec()
204 },
205 };
206
207 let end_key = crate::events_keys::org_time_prefix(org_id, end_ns);
208 let org_bytes = org_prefix(org_id);
209
210 let fetch_limit = limit.saturating_add(1);
212
213 let iter =
214 txn.range::<Events>(Some(&start_key), Some(&end_key.to_vec())).context(StorageSnafu)?;
215
216 let mut entries = Vec::with_capacity(limit);
217 let mut last_key: Option<Vec<u8>> = None;
218
219 for (key_bytes, value_bytes) in iter {
220 if key_bytes.len() < 8 || key_bytes[..8] != org_bytes[..] {
222 break;
223 }
224
225 if entries.len() >= fetch_limit {
226 break;
227 }
228
229 let entry: EventEntry =
230 inferadb_ledger_types::decode(&value_bytes).context(CodecSnafu)?;
231 last_key = Some(key_bytes);
232 entries.push(entry);
233 }
234
235 if entries.len() > limit {
236 entries.truncate(limit);
238 let cursor = last_key.map(|_| {
239 let last_entry = &entries[limit - 1];
241 let ts = last_entry.timestamp.timestamp_nanos_opt().unwrap_or(0) as u64;
242 encode_event_key(last_entry.organization_id, ts, &last_entry.event_id)
243 });
244 Ok((entries, cursor))
245 } else {
246 Ok((entries, None))
247 }
248 }
249
250 pub fn delete_expired<B: StorageBackend>(
263 read_txn: &ReadTransaction<'_, B>,
264 write_txn: &mut WriteTransaction<'_, B>,
265 now_unix: u64,
266 max_batch: usize,
267 ) -> Result<usize> {
268 let mut expired: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
269
270 let iter = read_txn.iter::<Events>().context(StorageSnafu)?;
271
272 for (key_bytes, value_bytes) in iter {
273 if expired.len() >= max_batch {
274 break;
275 }
276
277 let meta: EventMeta =
279 inferadb_ledger_types::decode(&value_bytes).context(CodecSnafu)?;
280
281 if meta.expires_at > 0 && meta.expires_at < now_unix {
282 expired.push((key_bytes, value_bytes));
283 }
284 }
285
286 let deleted = expired.len();
287 for (key, value_bytes) in expired {
288 let entry: EventEntry =
290 inferadb_ledger_types::decode(&value_bytes).context(CodecSnafu)?;
291
292 write_txn.delete::<Events>(&key).context(StorageSnafu)?;
293
294 let idx_key = encode_event_index_key(entry.organization_id, &entry.event_id);
296 write_txn.delete::<EventIndex>(&idx_key).context(StorageSnafu)?;
297 }
298
299 Ok(deleted)
300 }
301
302 pub fn count<B: StorageBackend>(
308 txn: &ReadTransaction<'_, B>,
309 org_id: OrganizationId,
310 ) -> Result<u64> {
311 let prefix = org_prefix(org_id);
312 let mut count = 0u64;
313
314 let iter = txn.iter::<Events>().context(StorageSnafu)?;
315
316 for (key_bytes, _) in iter {
317 if key_bytes.len() < 8 {
318 continue;
319 }
320 if key_bytes[..8] < prefix[..] {
321 continue;
322 }
323 if key_bytes[..8] > prefix[..] {
324 break;
325 }
326 count += 1;
327 }
328
329 Ok(count)
330 }
331
332 pub fn scan_apply_phase<B: StorageBackend>(
344 txn: &ReadTransaction<'_, B>,
345 max_entries: usize,
346 ) -> Result<Vec<inferadb_ledger_types::events::EventEntry>> {
347 use inferadb_ledger_types::events::{EmissionMeta, EventEmission};
348
349 let iter = txn.iter::<Events>().context(StorageSnafu)?;
350
351 let mut entries: Vec<inferadb_ledger_types::events::EventEntry> = Vec::new();
352
353 for (_key_bytes, value_bytes) in iter {
354 let meta: EmissionMeta =
356 inferadb_ledger_types::decode(&value_bytes).context(CodecSnafu)?;
357
358 if matches!(meta.emission, EventEmission::ApplyPhase) {
359 let entry: inferadb_ledger_types::events::EventEntry =
361 inferadb_ledger_types::decode(&value_bytes).context(CodecSnafu)?;
362 entries.push(entry);
363 }
364 }
365
366 entries.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
368
369 entries.truncate(max_entries);
371
372 Ok(entries)
373 }
374
375 pub fn scan_apply_phase_ranged<B: StorageBackend>(
399 txn: &ReadTransaction<'_, B>,
400 org_ids: &[OrganizationId],
401 cutoff_timestamp_ns: u64,
402 max_entries: usize,
403 ) -> Result<Vec<Vec<u8>>> {
404 use inferadb_ledger_types::events::{EmissionMeta, EventEmission};
405
406 let mut result = Vec::new();
407
408 for &org_id in org_ids {
409 if result.len() >= max_entries {
410 break;
411 }
412
413 let start = crate::events_keys::org_time_prefix(org_id, cutoff_timestamp_ns).to_vec();
415 let end = {
416 let mut e = crate::events_keys::org_time_prefix(org_id, u64::MAX).to_vec();
417 e.extend_from_slice(&u64::MAX.to_be_bytes());
419 e
420 };
421
422 let iter = txn.range::<Events>(Some(&start), Some(&end)).context(StorageSnafu)?;
423
424 let org_bytes = org_prefix(org_id);
425
426 for (key_bytes, value_bytes) in iter {
427 if result.len() >= max_entries {
428 break;
429 }
430
431 if key_bytes.len() < 8 || key_bytes[..8] != org_bytes[..] {
433 break;
434 }
435
436 let meta: EmissionMeta =
438 inferadb_ledger_types::decode(&value_bytes).context(CodecSnafu)?;
439
440 if matches!(meta.emission, EventEmission::ApplyPhase) {
441 result.push(value_bytes);
442 }
443 }
444 }
445
446 Ok(result)
447 }
448}
449
450fn increment_key(key: &mut Vec<u8>) {
452 for byte in key.iter_mut().rev() {
453 if *byte < 0xFF {
454 *byte += 1;
455 return;
456 }
457 *byte = 0;
458 }
459 key.insert(0, 1);
461}
462
463#[derive(Debug, Snafu)]
469pub enum EventsDatabaseError {
470 #[snafu(display("Failed to open events database at {path}: {source}"))]
472 Open {
473 path: String,
475 source: inferadb_ledger_store::Error,
477 },
478}
479
480pub struct EventsDatabase<B: StorageBackend> {
489 db: Arc<Database<B>>,
490}
491
492impl EventsDatabase<FileBackend> {
493 pub fn open(data_dir: &Path) -> std::result::Result<Self, EventsDatabaseError> {
500 let path = data_dir.join("events.db");
501 let db =
502 if path.exists() { Database::open(&path) } else { Database::create(&path) }.map_err(
503 |e| EventsDatabaseError::Open { path: path.display().to_string(), source: e },
504 )?;
505
506 Ok(Self { db: Arc::new(db) })
507 }
508}
509
510impl EventsDatabase<InMemoryBackend> {
511 pub fn open_in_memory() -> std::result::Result<Self, EventsDatabaseError> {
518 let db = Database::open_in_memory()
519 .map_err(|e| EventsDatabaseError::Open { path: ":memory:".to_string(), source: e })?;
520
521 Ok(Self { db: Arc::new(db) })
522 }
523}
524
525impl<B: StorageBackend> EventsDatabase<B> {
526 pub fn read(&self) -> inferadb_ledger_store::Result<ReadTransaction<'_, B>> {
532 self.db.read()
533 }
534
535 pub fn write(&self) -> inferadb_ledger_store::Result<WriteTransaction<'_, B>> {
541 self.db.write()
542 }
543
544 pub fn db(&self) -> &Arc<Database<B>> {
546 &self.db
547 }
548}
549
550impl<B: StorageBackend> Clone for EventsDatabase<B> {
551 fn clone(&self) -> Self {
552 Self { db: Arc::clone(&self.db) }
553 }
554}
555
556#[cfg(test)]
557#[allow(clippy::unwrap_used, clippy::expect_used, clippy::disallowed_methods, unused_mut)]
558mod tests {
559 use std::collections::BTreeMap;
560
561 use chrono::{TimeZone, Utc};
562 use inferadb_ledger_types::events::{EventAction, EventEmission, EventOutcome, EventScope};
563
564 use super::*;
565
566 fn make_entry(
567 org_id: i64,
568 event_id: [u8; 16],
569 timestamp_secs: i64,
570 expires_at: u64,
571 ) -> EventEntry {
572 EventEntry {
573 expires_at,
574 event_id,
575 source_service: "ledger".to_string(),
576 event_type: "ledger.test.event".to_string(),
577 timestamp: Utc.timestamp_opt(timestamp_secs, 0).unwrap(),
578 scope: EventScope::Organization,
579 action: EventAction::WriteCommitted,
580 emission: EventEmission::ApplyPhase,
581 principal: "test-user".to_string(),
582 organization_id: OrganizationId::new(org_id),
583 organization: None,
584 vault: None,
585 outcome: EventOutcome::Success,
586 details: BTreeMap::new(),
587 block_height: None,
588 trace_id: None,
589 correlation_id: None,
590 operations_count: None,
591 }
592 }
593
594 #[test]
595 fn write_and_get_roundtrip() {
596 let events_db = EventsDatabase::open_in_memory().expect("open");
597 let entry = make_entry(1, [1u8; 16], 1_700_000_000, 0);
598 let ts_ns = entry.timestamp.timestamp_nanos_opt().unwrap() as u64;
599
600 {
601 let mut txn = events_db.write().expect("write txn");
602 EventStore::write(&mut txn, &entry).expect("write event");
603 txn.commit().expect("commit");
604 }
605
606 {
607 let txn = events_db.read().expect("read txn");
608 let got = EventStore::get(&txn, OrganizationId::new(1), ts_ns, &[1u8; 16])
609 .expect("get")
610 .expect("should exist");
611 assert_eq!(got.event_id, entry.event_id);
612 assert_eq!(got.source_service, "ledger");
613 assert_eq!(got.principal, "test-user");
614 }
615 }
616
617 #[test]
618 fn get_nonexistent_returns_none() {
619 let events_db = EventsDatabase::open_in_memory().expect("open");
620 let txn = events_db.read().expect("read txn");
621 let result = EventStore::get(&txn, OrganizationId::new(1), 0, &[0u8; 16]).expect("get");
622 assert!(result.is_none());
623 }
624
625 #[test]
626 fn list_chronological_order() {
627 let events_db = EventsDatabase::open_in_memory().expect("open");
628 let org_id = OrganizationId::new(1);
629
630 {
632 let mut txn = events_db.write().expect("write txn");
633 let timestamps = [1_700_000_003i64, 1_700_000_001, 1_700_000_002];
634 for (i, &ts) in timestamps.iter().enumerate() {
635 let mut entry = make_entry(1, [0u8; 16], ts, 0);
636 entry.event_id = [i as u8; 16];
637 EventStore::write(&mut txn, &entry).expect("write");
638 }
639 txn.commit().expect("commit");
640 }
641
642 {
643 let txn = events_db.read().expect("read txn");
644 let (entries, cursor) =
645 EventStore::list(&txn, org_id, 0, u64::MAX, 10, None).expect("list");
646
647 assert_eq!(entries.len(), 3);
648 assert!(cursor.is_none(), "no more results");
649
650 assert!(entries[0].timestamp <= entries[1].timestamp);
652 assert!(entries[1].timestamp <= entries[2].timestamp);
653 }
654 }
655
656 #[test]
657 fn list_with_pagination() {
658 let events_db = EventsDatabase::open_in_memory().expect("open");
659 let org_id = OrganizationId::new(1);
660
661 {
662 let mut txn = events_db.write().expect("write txn");
663 for i in 0..5u8 {
664 let entry = make_entry(1, [i; 16], 1_700_000_000 + i64::from(i), 0);
665 EventStore::write(&mut txn, &entry).expect("write");
666 }
667 txn.commit().expect("commit");
668 }
669
670 let txn = events_db.read().expect("read txn");
672 let (page1, cursor1) =
673 EventStore::list(&txn, org_id, 0, u64::MAX, 2, None).expect("list page 1");
674 assert_eq!(page1.len(), 2);
675 assert!(cursor1.is_some(), "should have more");
676
677 let (page2, cursor2) = EventStore::list(&txn, org_id, 0, u64::MAX, 2, cursor1.as_deref())
679 .expect("list page 2");
680 assert_eq!(page2.len(), 2);
681 assert!(cursor2.is_some(), "should have more");
682
683 let (page3, cursor3) = EventStore::list(&txn, org_id, 0, u64::MAX, 2, cursor2.as_deref())
685 .expect("list page 3");
686 assert_eq!(page3.len(), 1);
687 assert!(cursor3.is_none(), "no more");
688
689 let all_ids: Vec<[u8; 16]> =
691 page1.iter().chain(page2.iter()).chain(page3.iter()).map(|e| e.event_id).collect();
692 assert_eq!(all_ids.len(), 5);
693 for (i, id) in all_ids.iter().enumerate() {
694 for (j, other) in all_ids.iter().enumerate() {
695 if i != j {
696 assert_ne!(id, other, "duplicate event ID at positions {i} and {j}");
697 }
698 }
699 }
700 }
701
702 #[test]
703 fn organization_isolation() {
704 let events_db = EventsDatabase::open_in_memory().expect("open");
705
706 {
707 let mut txn = events_db.write().expect("write txn");
708 for i in 0..10u8 {
710 let entry = make_entry(1, [i; 16], 1_700_000_000 + i64::from(i), 0);
711 EventStore::write(&mut txn, &entry).expect("write org A");
712 }
713 for i in 0..10u8 {
715 let entry = make_entry(2, [100 + i; 16], 1_700_000_000 + i64::from(i), 0);
716 EventStore::write(&mut txn, &entry).expect("write org B");
717 }
718 txn.commit().expect("commit");
719 }
720
721 let txn = events_db.read().expect("read txn");
722
723 let (org_a_events, _) =
725 EventStore::list(&txn, OrganizationId::new(1), 0, u64::MAX, 100, None)
726 .expect("list org A");
727 assert_eq!(org_a_events.len(), 10);
728 for e in &org_a_events {
729 assert_eq!(e.organization_id, OrganizationId::new(1));
730 }
731
732 let (org_b_events, _) =
734 EventStore::list(&txn, OrganizationId::new(2), 0, u64::MAX, 100, None)
735 .expect("list org B");
736 assert_eq!(org_b_events.len(), 10);
737 for e in &org_b_events {
738 assert_eq!(e.organization_id, OrganizationId::new(2));
739 }
740 }
741
742 #[test]
743 fn system_events_isolated() {
744 let events_db = EventsDatabase::open_in_memory().expect("open");
745 let system_org = OrganizationId::new(0);
746 let regular_org = OrganizationId::new(42);
747
748 {
749 let mut txn = events_db.write().expect("write txn");
750 let mut sys_entry = make_entry(0, [1u8; 16], 1_700_000_000, 0);
752 sys_entry.scope = EventScope::System;
753 sys_entry.action = EventAction::OrganizationCreated;
754 EventStore::write(&mut txn, &sys_entry).expect("write system");
755
756 let org_entry = make_entry(42, [2u8; 16], 1_700_000_000, 0);
758 EventStore::write(&mut txn, &org_entry).expect("write org");
759 txn.commit().expect("commit");
760 }
761
762 let txn = events_db.read().expect("read txn");
763
764 let (system_events, _) =
765 EventStore::list(&txn, system_org, 0, u64::MAX, 100, None).expect("list system");
766 assert_eq!(system_events.len(), 1);
767 assert_eq!(system_events[0].organization_id, system_org);
768
769 let (org_events, _) =
770 EventStore::list(&txn, regular_org, 0, u64::MAX, 100, None).expect("list org");
771 assert_eq!(org_events.len(), 1);
772 assert_eq!(org_events[0].organization_id, regular_org);
773 }
774
775 #[test]
776 fn empty_org_scan_returns_empty() {
777 let events_db = EventsDatabase::open_in_memory().expect("open");
778 let txn = events_db.read().expect("read txn");
779 let (entries, cursor) =
780 EventStore::list(&txn, OrganizationId::new(999), 0, u64::MAX, 100, None).expect("list");
781 assert!(entries.is_empty());
782 assert!(cursor.is_none());
783 }
784
785 #[test]
786 fn count_consistent_with_list() {
787 let events_db = EventsDatabase::open_in_memory().expect("open");
788 let org_id = OrganizationId::new(5);
789
790 {
791 let mut txn = events_db.write().expect("write txn");
792 for i in 0..7u8 {
793 let entry = make_entry(5, [i; 16], 1_700_000_000 + i64::from(i), 0);
794 EventStore::write(&mut txn, &entry).expect("write");
795 }
796 txn.commit().expect("commit");
797 }
798
799 let txn = events_db.read().expect("read txn");
800 let count = EventStore::count(&txn, org_id).expect("count");
801 let (entries, _) = EventStore::list(&txn, org_id, 0, u64::MAX, 100, None).expect("list");
802 assert_eq!(count, entries.len() as u64);
803 }
804
805 #[test]
806 fn count_after_deletion() {
807 let events_db = EventsDatabase::open_in_memory().expect("open");
808 let org_id = OrganizationId::new(1);
809
810 {
811 let mut txn = events_db.write().expect("write txn");
812 let e1 = make_entry(1, [1u8; 16], 1_700_000_000, 100); let e2 = make_entry(1, [2u8; 16], 1_700_000_001, 200); let e3 = make_entry(1, [3u8; 16], 1_700_000_002, 0); EventStore::write(&mut txn, &e1).expect("write");
817 EventStore::write(&mut txn, &e2).expect("write");
818 EventStore::write(&mut txn, &e3).expect("write");
819 txn.commit().expect("commit");
820 }
821
822 {
824 let read_txn = events_db.read().expect("read txn");
825 let mut write_txn = events_db.write().expect("write txn");
826 let deleted =
827 EventStore::delete_expired(&read_txn, &mut write_txn, 300, 100).expect("gc");
828 assert_eq!(deleted, 2);
829 write_txn.commit().expect("commit");
830 }
831
832 let txn = events_db.read().expect("read txn");
833 let count = EventStore::count(&txn, org_id).expect("count");
834 assert_eq!(count, 1);
835 }
836
837 #[test]
838 fn delete_expired_respects_batch_limit() {
839 let events_db = EventsDatabase::open_in_memory().expect("open");
840
841 {
842 let mut txn = events_db.write().expect("write txn");
843 for i in 0..10u8 {
844 let entry = make_entry(1, [i; 16], 1_700_000_000 + i64::from(i), 50);
845 EventStore::write(&mut txn, &entry).expect("write");
846 }
847 txn.commit().expect("commit");
848 }
849
850 {
852 let read_txn = events_db.read().expect("read txn");
853 let mut write_txn = events_db.write().expect("write txn");
854 let deleted =
855 EventStore::delete_expired(&read_txn, &mut write_txn, 100, 3).expect("gc");
856 assert_eq!(deleted, 3, "should respect batch limit");
857 write_txn.commit().expect("commit");
858 }
859
860 let txn = events_db.read().expect("read txn");
861 let count = EventStore::count(&txn, OrganizationId::new(1)).expect("count");
862 assert_eq!(count, 7, "7 entries remain after deleting 3");
863 }
864
865 #[test]
866 fn delete_expired_preserves_non_expired() {
867 let events_db = EventsDatabase::open_in_memory().expect("open");
868
869 {
870 let mut txn = events_db.write().expect("write txn");
871 let expired = make_entry(1, [1u8; 16], 1_700_000_000, 50); let future = make_entry(1, [2u8; 16], 1_700_000_001, 999); let no_expiry = make_entry(1, [3u8; 16], 1_700_000_002, 0); EventStore::write(&mut txn, &expired).expect("write");
875 EventStore::write(&mut txn, &future).expect("write");
876 EventStore::write(&mut txn, &no_expiry).expect("write");
877 txn.commit().expect("commit");
878 }
879
880 {
881 let read_txn = events_db.read().expect("read txn");
882 let mut write_txn = events_db.write().expect("write txn");
883 let deleted =
884 EventStore::delete_expired(&read_txn, &mut write_txn, 100, 100).expect("gc");
885 assert_eq!(deleted, 1, "only the expired entry");
886 write_txn.commit().expect("commit");
887 }
888
889 let txn = events_db.read().expect("read txn");
890 let count = EventStore::count(&txn, OrganizationId::new(1)).expect("count");
891 assert_eq!(count, 2, "future + no_expiry remain");
892 }
893
894 #[test]
895 fn delete_expired_zero_expires_at_never_deleted() {
896 let events_db = EventsDatabase::open_in_memory().expect("open");
897
898 {
899 let mut txn = events_db.write().expect("write txn");
900 for i in 0..5u8 {
901 let entry = make_entry(1, [i; 16], 1_700_000_000 + i64::from(i), 0);
902 EventStore::write(&mut txn, &entry).expect("write");
903 }
904 txn.commit().expect("commit");
905 }
906
907 {
908 let read_txn = events_db.read().expect("read txn");
909 let mut write_txn = events_db.write().expect("write txn");
910 let deleted =
911 EventStore::delete_expired(&read_txn, &mut write_txn, u64::MAX, 100).expect("gc");
912 assert_eq!(deleted, 0);
913 write_txn.commit().expect("commit");
914 }
915
916 let txn = events_db.read().expect("read txn");
917 let count = EventStore::count(&txn, OrganizationId::new(1)).expect("count");
918 assert_eq!(count, 5);
919 }
920
921 #[test]
922 fn mixed_batch_gc() {
923 let events_db = EventsDatabase::open_in_memory().expect("open");
924
925 {
926 let mut txn = events_db.write().expect("write txn");
927
928 let entries = vec![
930 make_entry(1, [1u8; 16], 1_700_000_001, 10), make_entry(1, [2u8; 16], 1_700_000_002, 20), make_entry(1, [3u8; 16], 1_700_000_003, 30), make_entry(1, [4u8; 16], 1_700_000_004, 500), make_entry(1, [5u8; 16], 1_700_000_005, 600), make_entry(1, [6u8; 16], 1_700_000_006, 0), make_entry(1, [7u8; 16], 1_700_000_007, 0), ];
938 for entry in &entries {
939 EventStore::write(&mut txn, entry).expect("write");
940 }
941 txn.commit().expect("commit");
942 }
943
944 {
945 let read_txn = events_db.read().expect("read txn");
946 let mut write_txn = events_db.write().expect("write txn");
947 let deleted =
948 EventStore::delete_expired(&read_txn, &mut write_txn, 100, 100).expect("gc");
949 assert_eq!(deleted, 3);
950 write_txn.commit().expect("commit");
951 }
952
953 let txn = events_db.read().expect("read txn");
954 let count = EventStore::count(&txn, OrganizationId::new(1)).expect("count");
955 assert_eq!(count, 4);
956 }
957
958 #[test]
959 fn scan_apply_phase_filters_handler_events() {
960 let events_db = EventsDatabase::open_in_memory().expect("open");
961
962 {
963 let mut txn = events_db.write().expect("write txn");
964
965 let apply_entry = make_entry(1, [1u8; 16], 1_700_000_000, 0);
967 EventStore::write(&mut txn, &apply_entry).expect("write apply");
968
969 let mut handler_entry = make_entry(1, [2u8; 16], 1_700_000_001, 0);
971 handler_entry.emission = EventEmission::HandlerPhase { node_id: 1 };
972 EventStore::write(&mut txn, &handler_entry).expect("write handler");
973
974 txn.commit().expect("commit");
975 }
976
977 let txn = events_db.read().expect("read txn");
978 let results = EventStore::scan_apply_phase(&txn, 100).expect("scan");
979
980 assert_eq!(results.len(), 1, "only apply-phase event should be included");
981 assert_eq!(results[0].event_id, [1u8; 16]);
982 assert!(matches!(results[0].emission, EventEmission::ApplyPhase));
983 }
984
985 #[test]
986 fn scan_apply_phase_respects_max_entries() {
987 let events_db = EventsDatabase::open_in_memory().expect("open");
988
989 {
990 let mut txn = events_db.write().expect("write txn");
991 for i in 0..10u8 {
992 let entry = make_entry(1, [i; 16], 1_700_000_000 + i64::from(i), 0);
993 EventStore::write(&mut txn, &entry).expect("write");
994 }
995 txn.commit().expect("commit");
996 }
997
998 let txn = events_db.read().expect("read txn");
999 let results = EventStore::scan_apply_phase(&txn, 3).expect("scan");
1000
1001 assert_eq!(results.len(), 3, "should truncate to max_entries");
1002 assert!(results[0].timestamp >= results[1].timestamp, "sorted newest first");
1004 assert!(results[1].timestamp >= results[2].timestamp, "sorted newest first");
1005 }
1006
1007 #[test]
1008 fn scan_apply_phase_empty_db() {
1009 let events_db = EventsDatabase::open_in_memory().expect("open");
1010 let txn = events_db.read().expect("read txn");
1011 let results = EventStore::scan_apply_phase(&txn, 100).expect("scan");
1012 assert!(results.is_empty());
1013 }
1014
1015 #[test]
1016 fn scan_apply_phase_cross_org() {
1017 let events_db = EventsDatabase::open_in_memory().expect("open");
1018
1019 {
1020 let mut txn = events_db.write().expect("write txn");
1021 let e1 = make_entry(1, [1u8; 16], 1_700_000_000, 0);
1023 let e2 = make_entry(2, [2u8; 16], 1_700_000_001, 0);
1024 let e3 = make_entry(3, [3u8; 16], 1_700_000_002, 0);
1025 EventStore::write(&mut txn, &e1).expect("write");
1026 EventStore::write(&mut txn, &e2).expect("write");
1027 EventStore::write(&mut txn, &e3).expect("write");
1028 txn.commit().expect("commit");
1029 }
1030
1031 let txn = events_db.read().expect("read txn");
1032 let results = EventStore::scan_apply_phase(&txn, 100).expect("scan");
1033
1034 assert_eq!(results.len(), 3, "should scan across all organizations");
1035 let org_ids: Vec<_> = results.iter().map(|e| e.organization_id).collect();
1037 assert!(org_ids.contains(&OrganizationId::new(1)));
1038 assert!(org_ids.contains(&OrganizationId::new(2)));
1039 assert!(org_ids.contains(&OrganizationId::new(3)));
1040 }
1041
1042 #[test]
1043 fn events_database_clone_shares_state() {
1044 let db1 = EventsDatabase::open_in_memory().expect("open");
1045 let db2 = db1.clone();
1046
1047 {
1049 let mut txn = db1.write().expect("write txn");
1050 let entry = make_entry(1, [1u8; 16], 1_700_000_000, 0);
1051 EventStore::write(&mut txn, &entry).expect("write");
1052 txn.commit().expect("commit");
1053 }
1054
1055 {
1057 let txn = db2.read().expect("read txn");
1058 let count = EventStore::count(&txn, OrganizationId::new(1)).expect("count");
1059 assert_eq!(count, 1, "clone should share same database");
1060 }
1061 }
1062
1063 #[test]
1064 fn list_time_range_filtering() {
1065 let events_db = EventsDatabase::open_in_memory().expect("open");
1066 let org_id = OrganizationId::new(1);
1067
1068 {
1069 let mut txn = events_db.write().expect("write txn");
1070 for (i, ts) in [1000i64, 2000, 3000].iter().enumerate() {
1072 let entry = make_entry(1, [i as u8; 16], *ts, 0);
1073 EventStore::write(&mut txn, &entry).expect("write");
1074 }
1075 txn.commit().expect("commit");
1076 }
1077
1078 let txn = events_db.read().expect("read txn");
1079
1080 let (all, _) = EventStore::list(&txn, org_id, 0, u64::MAX, 100, None).expect("list all");
1082 assert_eq!(all.len(), 3);
1083
1084 let start_ns = 2000u64 * 1_000_000_000;
1086 let end_ns = 3000u64 * 1_000_000_000;
1087 let (mid, _) =
1088 EventStore::list(&txn, org_id, start_ns, end_ns, 100, None).expect("list mid");
1089 assert_eq!(mid.len(), 1);
1090 assert_eq!(mid[0].timestamp.timestamp(), 2000);
1091 }
1092
1093 #[test]
1097 fn delete_expired_works_after_field_reorder() {
1098 let events_db = EventsDatabase::open_in_memory().expect("open");
1099
1100 {
1101 let mut txn = events_db.write().expect("write txn");
1102
1103 let e1 = make_entry(1, [1u8; 16], 1_700_000_000, 50);
1105 EventStore::write(&mut txn, &e1).expect("write");
1106
1107 let mut e2 = make_entry(1, [2u8; 16], 1_700_000_001, 50);
1109 e2.emission = EventEmission::HandlerPhase { node_id: 1 };
1110 EventStore::write(&mut txn, &e2).expect("write");
1111
1112 let e3 = make_entry(1, [3u8; 16], 1_700_000_002, 0);
1114 EventStore::write(&mut txn, &e3).expect("write");
1115
1116 txn.commit().expect("commit");
1117 }
1118
1119 {
1120 let read_txn = events_db.read().expect("read txn");
1121 let mut write_txn = events_db.write().expect("write txn");
1122 let deleted =
1123 EventStore::delete_expired(&read_txn, &mut write_txn, 100, 100).expect("gc");
1124 assert_eq!(deleted, 2, "both expired entries should be deleted");
1125 write_txn.commit().expect("commit");
1126 }
1127
1128 let txn = events_db.read().expect("read txn");
1129 let count = EventStore::count(&txn, OrganizationId::new(1)).expect("count");
1130 assert_eq!(count, 1, "only non-expired entry remains");
1131 }
1132
1133 #[test]
1135 fn scan_apply_phase_50_apply_50_handler() {
1136 let events_db = EventsDatabase::open_in_memory().expect("open");
1137
1138 {
1139 let mut txn = events_db.write().expect("write txn");
1140
1141 for i in 0..50u8 {
1142 let entry = make_entry(1, [i; 16], 1_700_000_000 + i64::from(i), 0);
1143 EventStore::write(&mut txn, &entry).expect("write apply");
1144 }
1145
1146 for i in 50..100u8 {
1147 let mut entry = make_entry(1, [i; 16], 1_700_000_000 + i64::from(i), 0);
1148 entry.emission = EventEmission::HandlerPhase { node_id: 1 };
1149 EventStore::write(&mut txn, &entry).expect("write handler");
1150 }
1151
1152 txn.commit().expect("commit");
1153 }
1154
1155 let txn = events_db.read().expect("read txn");
1156 let results = EventStore::scan_apply_phase(&txn, 1000).expect("scan");
1157
1158 assert_eq!(results.len(), 50, "exactly 50 apply-phase events");
1159 for entry in &results {
1160 assert!(
1161 matches!(entry.emission, EventEmission::ApplyPhase),
1162 "all entries must be apply-phase"
1163 );
1164 }
1165 }
1166
1167 #[test]
1169 fn scan_apply_phase_truncates_to_newest() {
1170 let events_db = EventsDatabase::open_in_memory().expect("open");
1171
1172 {
1173 let mut txn = events_db.write().expect("write txn");
1174 for i in 0..20u8 {
1175 let entry = make_entry(1, [i; 16], 1_700_000_000 + i64::from(i), 0);
1176 EventStore::write(&mut txn, &entry).expect("write");
1177 }
1178 txn.commit().expect("commit");
1179 }
1180
1181 let txn = events_db.read().expect("read txn");
1182 let results = EventStore::scan_apply_phase(&txn, 5).expect("scan");
1183
1184 assert_eq!(results.len(), 5, "truncated to max_entries");
1185
1186 for window in results.windows(2) {
1188 assert!(window[0].timestamp >= window[1].timestamp, "must be sorted newest-first");
1189 }
1190
1191 let min_ts = chrono::TimeZone::timestamp_opt(&Utc, 1_700_000_015, 0).unwrap().timestamp();
1193 for entry in &results {
1194 assert!(
1195 entry.timestamp.timestamp() >= min_ts,
1196 "truncated entries should be the 5 newest"
1197 );
1198 }
1199 }
1200
1201 #[test]
1206 fn get_by_id_returns_written_event() {
1207 let events_db = EventsDatabase::open_in_memory().expect("open");
1208 let entry = make_entry(1, [10u8; 16], 1_700_000_000, 0);
1209
1210 {
1211 let mut txn = events_db.write().expect("write txn");
1212 EventStore::write(&mut txn, &entry).expect("write");
1213 txn.commit().expect("commit");
1214 }
1215
1216 let txn = events_db.read().expect("read txn");
1217 let found = EventStore::get_by_id(&txn, OrganizationId::new(1), &[10u8; 16])
1218 .expect("get_by_id")
1219 .expect("should exist");
1220
1221 assert_eq!(found.event_id, [10u8; 16]);
1222 assert_eq!(found.organization_id, OrganizationId::new(1));
1223 assert_eq!(found.source_service, "ledger");
1224 }
1225
1226 #[test]
1227 fn get_by_id_nonexistent_returns_none() {
1228 let events_db = EventsDatabase::open_in_memory().expect("open");
1229 let txn = events_db.read().expect("read txn");
1230 let result =
1231 EventStore::get_by_id(&txn, OrganizationId::new(1), &[99u8; 16]).expect("get_by_id");
1232 assert!(result.is_none());
1233 }
1234
1235 #[test]
1236 fn get_by_id_wrong_org_returns_none() {
1237 let events_db = EventsDatabase::open_in_memory().expect("open");
1238 let entry = make_entry(1, [10u8; 16], 1_700_000_000, 0);
1239
1240 {
1241 let mut txn = events_db.write().expect("write txn");
1242 EventStore::write(&mut txn, &entry).expect("write");
1243 txn.commit().expect("commit");
1244 }
1245
1246 let txn = events_db.read().expect("read txn");
1248 let result =
1249 EventStore::get_by_id(&txn, OrganizationId::new(999), &[10u8; 16]).expect("get_by_id");
1250 assert!(result.is_none(), "different org should not find the event");
1251 }
1252
1253 #[test]
1254 fn get_by_id_finds_all_n_events() {
1255 let events_db = EventsDatabase::open_in_memory().expect("open");
1256 let n = 50u8;
1257
1258 {
1259 let mut txn = events_db.write().expect("write txn");
1260 for i in 0..n {
1261 let entry = make_entry(1, [i; 16], 1_700_000_000 + i64::from(i), 0);
1262 EventStore::write(&mut txn, &entry).expect("write");
1263 }
1264 txn.commit().expect("commit");
1265 }
1266
1267 let txn = events_db.read().expect("read txn");
1268 for i in 0..n {
1269 let found = EventStore::get_by_id(&txn, OrganizationId::new(1), &[i; 16])
1270 .expect("get_by_id")
1271 .expect("should exist");
1272 assert_eq!(found.event_id, [i; 16]);
1273 }
1274 }
1275
1276 #[test]
1277 fn gc_removes_index_entries() {
1278 let events_db = EventsDatabase::open_in_memory().expect("open");
1279
1280 {
1281 let mut txn = events_db.write().expect("write txn");
1282 let e1 = make_entry(1, [1u8; 16], 1_700_000_000, 100); let e2 = make_entry(1, [2u8; 16], 1_700_000_001, 200); let e3 = make_entry(1, [3u8; 16], 1_700_000_002, 0); EventStore::write(&mut txn, &e1).expect("write");
1286 EventStore::write(&mut txn, &e2).expect("write");
1287 EventStore::write(&mut txn, &e3).expect("write");
1288 txn.commit().expect("commit");
1289 }
1290
1291 {
1293 let txn = events_db.read().expect("read txn");
1294 assert!(
1295 EventStore::get_by_id(&txn, OrganizationId::new(1), &[1u8; 16])
1296 .expect("get_by_id")
1297 .is_some()
1298 );
1299 assert!(
1300 EventStore::get_by_id(&txn, OrganizationId::new(1), &[2u8; 16])
1301 .expect("get_by_id")
1302 .is_some()
1303 );
1304 assert!(
1305 EventStore::get_by_id(&txn, OrganizationId::new(1), &[3u8; 16])
1306 .expect("get_by_id")
1307 .is_some()
1308 );
1309 }
1310
1311 {
1313 let read_txn = events_db.read().expect("read txn");
1314 let mut write_txn = events_db.write().expect("write txn");
1315 let deleted =
1316 EventStore::delete_expired(&read_txn, &mut write_txn, 300, 100).expect("gc");
1317 assert_eq!(deleted, 2);
1318 write_txn.commit().expect("commit");
1319 }
1320
1321 {
1323 let txn = events_db.read().expect("read txn");
1324 assert!(
1325 EventStore::get_by_id(&txn, OrganizationId::new(1), &[1u8; 16])
1326 .expect("get_by_id")
1327 .is_none(),
1328 "GC'd event should not be in index"
1329 );
1330 assert!(
1331 EventStore::get_by_id(&txn, OrganizationId::new(1), &[2u8; 16])
1332 .expect("get_by_id")
1333 .is_none(),
1334 "GC'd event should not be in index"
1335 );
1336 assert!(
1338 EventStore::get_by_id(&txn, OrganizationId::new(1), &[3u8; 16])
1339 .expect("get_by_id")
1340 .is_some()
1341 );
1342 }
1343 }
1344
1345 #[test]
1346 fn snapshot_restore_rebuilds_index() {
1347 let events_db = EventsDatabase::open_in_memory().expect("open");
1348
1349 let entries: Vec<EventEntry> =
1351 (0..10u8).map(|i| make_entry(1, [i; 16], 1_700_000_000 + i64::from(i), 0)).collect();
1352
1353 {
1354 let mut txn = events_db.write().expect("write txn");
1355 for entry in &entries {
1356 EventStore::write(&mut txn, entry).expect("write");
1357 }
1358 txn.commit().expect("commit");
1359 }
1360
1361 let snapshot_entries = {
1363 let txn = events_db.read().expect("read txn");
1364 EventStore::scan_apply_phase(&txn, 100).expect("scan")
1365 };
1366
1367 let restored_db = EventsDatabase::open_in_memory().expect("open");
1369 {
1370 let mut txn = restored_db.write().expect("write txn");
1371 for entry in &snapshot_entries {
1372 EventStore::write(&mut txn, entry).expect("write");
1373 }
1374 txn.commit().expect("commit");
1375 }
1376
1377 let txn = restored_db.read().expect("read txn");
1379 for i in 0..10u8 {
1380 let found = EventStore::get_by_id(&txn, OrganizationId::new(1), &[i; 16])
1381 .expect("get_by_id")
1382 .expect("should exist after restore");
1383 assert_eq!(found.event_id, [i; 16]);
1384 }
1385 }
1386
1387 #[test]
1388 fn index_atomicity_with_primary() {
1389 let events_db = EventsDatabase::open_in_memory().expect("open");
1390
1391 {
1393 let mut txn = events_db.write().expect("write txn");
1394 let entry = make_entry(1, [50u8; 16], 1_700_000_000, 0);
1395 EventStore::write(&mut txn, &entry).expect("write");
1396 }
1398
1399 let txn = events_db.read().expect("read txn");
1401 let result =
1402 EventStore::get_by_id(&txn, OrganizationId::new(1), &[50u8; 16]).expect("get_by_id");
1403 assert!(result.is_none(), "aborted txn should not write index entry");
1404
1405 let ts_ns =
1406 Utc.timestamp_opt(1_700_000_000, 0).unwrap().timestamp_nanos_opt().unwrap() as u64;
1407 let primary =
1408 EventStore::get(&txn, OrganizationId::new(1), ts_ns, &[50u8; 16]).expect("get");
1409 assert!(primary.is_none(), "aborted txn should not write primary entry");
1410 }
1411
1412 #[test]
1413 fn get_by_id_multi_org_isolation() {
1414 let events_db = EventsDatabase::open_in_memory().expect("open");
1415
1416 {
1417 let mut txn = events_db.write().expect("write txn");
1418 let e1 = make_entry(1, [99u8; 16], 1_700_000_000, 0);
1420 let e2 = make_entry(2, [99u8; 16], 1_700_000_001, 0);
1421 EventStore::write(&mut txn, &e1).expect("write");
1422 EventStore::write(&mut txn, &e2).expect("write");
1423 txn.commit().expect("commit");
1424 }
1425
1426 let txn = events_db.read().expect("read txn");
1427 let found1 = EventStore::get_by_id(&txn, OrganizationId::new(1), &[99u8; 16])
1428 .expect("get_by_id")
1429 .expect("org 1 should find");
1430 let found2 = EventStore::get_by_id(&txn, OrganizationId::new(2), &[99u8; 16])
1431 .expect("get_by_id")
1432 .expect("org 2 should find");
1433
1434 assert_eq!(found1.organization_id, OrganizationId::new(1));
1435 assert_eq!(found2.organization_id, OrganizationId::new(2));
1436 }
1437}