Skip to main content

actionqueue_platform/
ledger.rs

1//! Generic append-only ledger backed by WAL events.
2
3use std::collections::HashMap;
4
5use actionqueue_core::ids::{LedgerEntryId, TenantId};
6use actionqueue_core::platform::LedgerEntry;
7use tracing;
8
9/// Generic append-only ledger for platform events.
10///
11/// Ledger keys identify logical ledgers (e.g. `"audit"`, `"decision"`,
12/// `"relationship"`, `"incident"`, `"reality"`). Entries are opaque byte
13/// payloads whose schema is defined by the consumer (Caelum, Digicorp).
14///
15/// Maintains secondary indexes for O(N_key) and O(N_tenant) queries.
16#[derive(Default)]
17pub struct AppendLedger {
18    entries: Vec<LedgerEntry>,
19    /// Secondary index: ledger_key → Vec of indices into `entries`.
20    entries_by_key: HashMap<String, Vec<usize>>,
21    /// Secondary index: tenant_id → Vec of indices into `entries`.
22    entries_by_tenant: HashMap<TenantId, Vec<usize>>,
23}
24
25impl AppendLedger {
26    /// Creates an empty ledger.
27    pub fn new() -> Self {
28        Self::default()
29    }
30
31    /// Appends an entry to the ledger.
32    pub fn append(&mut self, entry: LedgerEntry) {
33        let idx = self.entries.len();
34        tracing::debug!(
35            entry_id = %entry.entry_id(),
36            ledger_key = entry.ledger_key(),
37            tenant_id = %entry.tenant_id(),
38            "ledger entry appended"
39        );
40        self.entries_by_key.entry(entry.ledger_key().to_string()).or_default().push(idx);
41        self.entries_by_tenant.entry(entry.tenant_id()).or_default().push(idx);
42        self.entries.push(entry);
43    }
44
45    /// Returns an iterator over all entries for the given ledger key.
46    pub fn iter_for_key<'a>(&'a self, key: &str) -> impl Iterator<Item = &'a LedgerEntry> {
47        let indices = self.entries_by_key.get(key).map(|v| v.as_slice()).unwrap_or(&[]);
48        indices.iter().filter_map(|&i| self.entries.get(i))
49    }
50
51    /// Returns an iterator over all entries for the given tenant.
52    pub fn iter_for_tenant(&self, tenant_id: TenantId) -> impl Iterator<Item = &LedgerEntry> {
53        let indices = self.entries_by_tenant.get(&tenant_id).map(|v| v.as_slice()).unwrap_or(&[]);
54        indices.iter().filter_map(|&i| self.entries.get(i))
55    }
56
57    /// Returns the entry for the given identifier, if any.
58    pub fn entry_by_id(&self, entry_id: LedgerEntryId) -> Option<&LedgerEntry> {
59        self.entries.iter().find(|e| e.entry_id() == entry_id)
60    }
61
62    /// Returns the total number of entries in the ledger.
63    pub fn len(&self) -> usize {
64        self.entries.len()
65    }
66
67    /// Returns `true` if the ledger contains no entries.
68    pub fn is_empty(&self) -> bool {
69        self.entries.is_empty()
70    }
71
72    /// Returns an iterator over all entries, in insertion order.
73    pub fn iter(&self) -> impl Iterator<Item = &LedgerEntry> {
74        self.entries.iter()
75    }
76}
77
78#[cfg(test)]
79mod tests {
80    use actionqueue_core::ids::{LedgerEntryId, TenantId};
81    use actionqueue_core::platform::LedgerEntry;
82
83    use super::AppendLedger;
84
85    fn make_entry(tenant: TenantId, key: &str) -> LedgerEntry {
86        LedgerEntry::new(LedgerEntryId::new(), tenant, key, b"payload".to_vec(), 1000)
87    }
88
89    #[test]
90    fn append_and_len() {
91        let mut ledger = AppendLedger::new();
92        let tenant = TenantId::new();
93        ledger.append(make_entry(tenant, "audit"));
94        ledger.append(make_entry(tenant, "decision"));
95        assert_eq!(ledger.len(), 2);
96    }
97
98    #[test]
99    fn iter_for_key_returns_matching_entries() {
100        let mut ledger = AppendLedger::new();
101        let tenant = TenantId::new();
102        ledger.append(make_entry(tenant, "audit"));
103        ledger.append(make_entry(tenant, "decision"));
104        ledger.append(make_entry(tenant, "audit"));
105
106        let audit_entries: Vec<_> = ledger.iter_for_key("audit").collect();
107        assert_eq!(audit_entries.len(), 2);
108
109        let decision_entries: Vec<_> = ledger.iter_for_key("decision").collect();
110        assert_eq!(decision_entries.len(), 1);
111    }
112
113    #[test]
114    fn iter_for_tenant_returns_all_tenant_entries() {
115        let mut ledger = AppendLedger::new();
116        let tenant_a = TenantId::new();
117        let tenant_b = TenantId::new();
118        ledger.append(make_entry(tenant_a, "audit"));
119        ledger.append(make_entry(tenant_b, "audit"));
120        ledger.append(make_entry(tenant_a, "decision"));
121
122        let a_entries: Vec<_> = ledger.iter_for_tenant(tenant_a).collect();
123        assert_eq!(a_entries.len(), 2);
124
125        let b_entries: Vec<_> = ledger.iter_for_tenant(tenant_b).collect();
126        assert_eq!(b_entries.len(), 1);
127    }
128
129    #[test]
130    fn entry_by_id_returns_correct_entry() {
131        let mut ledger = AppendLedger::new();
132        let tenant = TenantId::new();
133        let entry_id = LedgerEntryId::new();
134        let entry = LedgerEntry::new(entry_id, tenant, "audit", b"data".to_vec(), 1000);
135        ledger.append(entry);
136        assert!(ledger.entry_by_id(entry_id).is_some());
137        assert!(ledger.entry_by_id(LedgerEntryId::new()).is_none());
138    }
139
140    #[test]
141    fn empty_key_returns_empty_iter() {
142        let ledger = AppendLedger::new();
143        assert_eq!(ledger.iter_for_key("nonexistent").count(), 0);
144    }
145}