Skip to main content

atomr_patterns/cqrs/
audit.rs

1//! [`AuditLog`] — built-in [`super::Reader`] that retains a ring of
2//! every event it sees, useful for compliance views and "what
3//! happened recently" UIs.
4
5use std::collections::VecDeque;
6
7use async_trait::async_trait;
8
9use crate::cqrs::reader::Reader;
10use crate::DomainEvent;
11
12/// Bounded ring buffer of events.
13pub struct AuditProjection<E> {
14    pub capacity: usize,
15    pub entries: VecDeque<E>,
16}
17
18impl<E> Default for AuditProjection<E> {
19    fn default() -> Self {
20        Self { capacity: 1024, entries: VecDeque::new() }
21    }
22}
23
24impl<E: Clone> AuditProjection<E> {
25    /// Most recent `n` events in chronological order.
26    pub fn recent(&self, n: usize) -> Vec<E> {
27        let n = n.min(self.entries.len());
28        self.entries.iter().rev().take(n).rev().cloned().collect()
29    }
30}
31
32/// A reader that records every event it sees into an
33/// [`AuditProjection`]. Construct via [`AuditLog::with_capacity`].
34pub struct AuditLog<E: DomainEvent> {
35    name: String,
36    capacity: usize,
37    _ev: std::marker::PhantomData<E>,
38}
39
40impl<E: DomainEvent> AuditLog<E> {
41    /// Construct an audit log with the given ring capacity. Wire
42    /// decoding via [`crate::cqrs::CqrsBuilder::with_event_codecs`]
43    /// — the registry takes priority over [`Reader::decode`].
44    pub fn with_capacity(capacity: usize) -> Self {
45        Self { name: "audit".into(), capacity, _ev: std::marker::PhantomData }
46    }
47
48    pub fn named(mut self, name: impl Into<String>) -> Self {
49        self.name = name.into();
50        self
51    }
52}
53
54#[async_trait]
55impl<E: DomainEvent + Sync> Reader for AuditLog<E> {
56    type Event = E;
57    type Projection = AuditProjection<E>;
58    type Error = std::io::Error;
59
60    fn name(&self) -> &str {
61        &self.name
62    }
63
64    fn decode(_bytes: &[u8]) -> Result<Self::Event, String> {
65        // Static `decode` doesn't have access to `self.decoder`, so
66        // we fall through and rely on a configured codec registry.
67        // Users who want a fixed-codec audit log should pass the
68        // decoder closure they used at the aggregate level.
69        Err("AuditLog: configure an EventCodecRegistry on the CQRS pattern".into())
70    }
71
72    async fn apply(&mut self, p: &mut AuditProjection<E>, event: E) -> Result<(), std::io::Error> {
73        if p.capacity == 0 {
74            p.capacity = self.capacity;
75        }
76        if p.entries.len() >= p.capacity {
77            p.entries.pop_front();
78        }
79        p.entries.push_back(event);
80        Ok(())
81    }
82}
83
84#[cfg(test)]
85mod tests {
86    use super::*;
87
88    #[derive(Clone, Debug, PartialEq)]
89    struct E(i32);
90    impl crate::DomainEvent for E {}
91
92    #[test]
93    fn ring_truncates_to_capacity() {
94        let mut p = AuditProjection::<E> { capacity: 3, entries: VecDeque::new() };
95        let mut audit = AuditLog::<E>::with_capacity(3);
96        let rt = tokio::runtime::Runtime::new().unwrap();
97        rt.block_on(async {
98            for i in 0..5 {
99                audit.apply(&mut p, E(i)).await.unwrap();
100            }
101        });
102        assert_eq!(p.entries.len(), 3);
103        assert_eq!(p.recent(3), vec![E(2), E(3), E(4)]);
104    }
105}