atomr_patterns/cqrs/
audit.rs1use std::collections::VecDeque;
6
7use async_trait::async_trait;
8
9use crate::cqrs::reader::Reader;
10use crate::DomainEvent;
11
12pub struct AuditProjection<E> {
14 pub capacity: usize,
15 pub entries: VecDeque<E>,
16}
17
18impl<E> Default for AuditProjection<E> {
19 fn default() -> Self {
20 Self { capacity: 1024, entries: VecDeque::new() }
21 }
22}
23
24impl<E: Clone> AuditProjection<E> {
25 pub fn recent(&self, n: usize) -> Vec<E> {
27 let n = n.min(self.entries.len());
28 self.entries.iter().rev().take(n).rev().cloned().collect()
29 }
30}
31
32pub struct AuditLog<E: DomainEvent> {
35 name: String,
36 capacity: usize,
37 _ev: std::marker::PhantomData<E>,
38}
39
40impl<E: DomainEvent> AuditLog<E> {
41 pub fn with_capacity(capacity: usize) -> Self {
45 Self { name: "audit".into(), capacity, _ev: std::marker::PhantomData }
46 }
47
48 pub fn named(mut self, name: impl Into<String>) -> Self {
49 self.name = name.into();
50 self
51 }
52}
53
54#[async_trait]
55impl<E: DomainEvent + Sync> Reader for AuditLog<E> {
56 type Event = E;
57 type Projection = AuditProjection<E>;
58 type Error = std::io::Error;
59
60 fn name(&self) -> &str {
61 &self.name
62 }
63
64 fn decode(_bytes: &[u8]) -> Result<Self::Event, String> {
65 Err("AuditLog: configure an EventCodecRegistry on the CQRS pattern".into())
70 }
71
72 async fn apply(&mut self, p: &mut AuditProjection<E>, event: E) -> Result<(), std::io::Error> {
73 if p.capacity == 0 {
74 p.capacity = self.capacity;
75 }
76 if p.entries.len() >= p.capacity {
77 p.entries.pop_front();
78 }
79 p.entries.push_back(event);
80 Ok(())
81 }
82}
83
84#[cfg(test)]
85mod tests {
86 use super::*;
87
88 #[derive(Clone, Debug, PartialEq)]
89 struct E(i32);
90 impl crate::DomainEvent for E {}
91
92 #[test]
93 fn ring_truncates_to_capacity() {
94 let mut p = AuditProjection::<E> { capacity: 3, entries: VecDeque::new() };
95 let mut audit = AuditLog::<E>::with_capacity(3);
96 let rt = tokio::runtime::Runtime::new().unwrap();
97 rt.block_on(async {
98 for i in 0..5 {
99 audit.apply(&mut p, E(i)).await.unwrap();
100 }
101 });
102 assert_eq!(p.entries.len(), 3);
103 assert_eq!(p.recent(3), vec![E(2), E(3), E(4)]);
104 }
105}