use std::collections::VecDeque;
use async_trait::async_trait;
use crate::cqrs::reader::Reader;
use crate::DomainEvent;
pub struct AuditProjection<E> {
pub capacity: usize,
pub entries: VecDeque<E>,
}
impl<E> Default for AuditProjection<E> {
fn default() -> Self {
Self { capacity: 1024, entries: VecDeque::new() }
}
}
impl<E: Clone> AuditProjection<E> {
pub fn recent(&self, n: usize) -> Vec<E> {
let n = n.min(self.entries.len());
self.entries.iter().rev().take(n).rev().cloned().collect()
}
}
pub struct AuditLog<E: DomainEvent> {
name: String,
capacity: usize,
_ev: std::marker::PhantomData<E>,
}
impl<E: DomainEvent> AuditLog<E> {
pub fn with_capacity(capacity: usize) -> Self {
Self { name: "audit".into(), capacity, _ev: std::marker::PhantomData }
}
pub fn named(mut self, name: impl Into<String>) -> Self {
self.name = name.into();
self
}
}
#[async_trait]
impl<E: DomainEvent + Sync> Reader for AuditLog<E> {
type Event = E;
type Projection = AuditProjection<E>;
type Error = std::io::Error;
fn name(&self) -> &str {
&self.name
}
fn decode(_bytes: &[u8]) -> Result<Self::Event, String> {
Err("AuditLog: configure an EventCodecRegistry on the CQRS pattern".into())
}
async fn apply(&mut self, p: &mut AuditProjection<E>, event: E) -> Result<(), std::io::Error> {
if p.capacity == 0 {
p.capacity = self.capacity;
}
if p.entries.len() >= p.capacity {
p.entries.pop_front();
}
p.entries.push_back(event);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[derive(Clone, Debug, PartialEq)]
struct E(i32);
impl crate::DomainEvent for E {}
#[test]
fn ring_truncates_to_capacity() {
let mut p = AuditProjection::<E> { capacity: 3, entries: VecDeque::new() };
let mut audit = AuditLog::<E>::with_capacity(3);
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
for i in 0..5 {
audit.apply(&mut p, E(i)).await.unwrap();
}
});
assert_eq!(p.entries.len(), 3);
assert_eq!(p.recent(3), vec![E(2), E(3), E(4)]);
}
}