#![doc = include_str!("../README.md")]
use crate::stores::EventStore;
use caches::{ReductionCache, ReductionCacheError};
use chrono::{DateTime, Utc};
use futures_util::TryStreamExt;
pub use ids::LogId;
use policies::{CachingPolicy, NoPolicy};
use serde::{Deserialize, Serialize};
use std::{fmt::Debug, marker::PhantomData, sync::Arc};
use stores::EventStoreError;
use thiserror::Error;
use tokio::sync::mpsc::{self, Sender};
pub mod caches;
pub mod ids;
pub mod policies;
pub mod stores;
pub trait EventRecord<E> {
fn index(&self) -> u32;
fn recorded_at(&self) -> DateTime<Utc>;
fn idempotency_key(&self) -> Option<String>;
fn event(&self) -> E;
}
#[derive(Debug, Clone, Serialize)]
pub struct SerializableEventRecord<E> {
pub index: u32,
pub recorded_at: DateTime<Utc>,
pub idempotency_key: Option<String>,
pub event: E,
}
pub trait Aggregate: Default {
type Event;
fn apply(&mut self, event_record: &impl EventRecord<Self::Event>);
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Reduction<A> {
log_id: LogId,
reduced_at: DateTime<Utc>,
through_index: u32,
aggregate: A,
}
impl<A> Reduction<A> {
pub fn log_id(&self) -> &LogId {
&self.log_id
}
pub fn reduced_at(&self) -> DateTime<Utc> {
self.reduced_at
}
pub fn through_index(&self) -> u32 {
self.through_index
}
pub fn aggregate(&self) -> &A {
&self.aggregate
}
pub fn log_state(&self) -> LogState {
LogState {
next_index: self.through_index + 1,
}
}
}
#[derive(Debug, Error, PartialEq)]
pub enum LogManagerError {
#[error("the event store produced an unexpected error: {0}")]
EventStoreError(#[from] EventStoreError),
#[error("the reduction cache produced an unexpected error: {0}")]
ReductionCacheError(#[from] ReductionCacheError),
#[error(
"another process already appended another event to log_id={0}
since your last operation; reduce again to apply the new event,
determine if you operation is still relevant/necessary, and if so try again"
)]
ConcurrentAppend(LogId),
#[error("an event with the provided idempotency key already exists")]
IdempotentReplay {
idempotency_key: String,
log_id: LogId,
event_index: u32,
},
#[error("the log already contains the max number of events (4,294,967,295)")]
LogFull,
}
#[derive(Debug, Default, Clone, PartialEq)]
pub struct AppendOptions {
pub idempotency_key: Option<String>,
}
#[derive(Debug, Default)]
pub struct LogManagerOptions<ACP> {
pub caching_policy: Option<ACP>,
}
#[derive(Debug, Default, Clone, PartialEq)]
pub struct LogState {
next_index: u32,
}
impl LogState {
fn new() -> Self {
Self { next_index: 0 }
}
}
impl<A> From<Reduction<A>> for LogState {
fn from(reduction: Reduction<A>) -> Self {
reduction.log_state()
}
}
#[derive(Debug)]
pub struct LogManager<E, A, ES, AC> {
event_store: ES,
reduction_cache: Arc<AC>,
reduction_sender: Sender<Reduction<A>>,
_phantom_e: PhantomData<E>,
}
impl<E, A, ES, AC> LogManager<E, A, ES, AC>
where
E: Send + 'static,
A: Aggregate<Event = E> + Send + Clone + 'static,
ES: EventStore<E>,
AC: ReductionCache<A> + Send + Sync + 'static,
{
pub fn new(event_store: ES, reduction_cache: AC) -> Self {
Self::with_options::<NoPolicy>(event_store, reduction_cache, LogManagerOptions::default())
}
pub fn with_options<ACP>(
event_store: ES,
reduction_cache: AC,
options: LogManagerOptions<ACP>,
) -> Self
where
ACP: CachingPolicy<A> + Send + Sync + 'static,
{
let cache_arc = Arc::new(reduction_cache);
let cloned_cache_arc = cache_arc.clone();
let (sender, mut receiver) = mpsc::channel::<Reduction<A>>(1024);
tokio::spawn(async move {
while let Some(reduction) = receiver.recv().await {
if options
.caching_policy
.as_ref()
.map(|p| p.should_cache(&reduction))
.unwrap_or(true)
{
let _ = cloned_cache_arc.put(&reduction).await;
}
}
});
Self {
event_store,
reduction_cache: cache_arc,
reduction_sender: sender,
_phantom_e: PhantomData,
}
}
pub async fn create(
&self,
log_id: &LogId,
event: &E,
append_options: &AppendOptions,
) -> Result<LogState, LogManagerError> {
self.append(log_id, LogState::new(), event, append_options)
.await
}
pub async fn append(
&self,
log_id: &LogId,
log_state: impl Into<LogState>,
event: &E,
append_options: &AppendOptions,
) -> Result<LogState, LogManagerError> {
let next_index = log_state.into().next_index;
if next_index == u32::MAX {
return Err(LogManagerError::LogFull);
}
self.event_store
.append(log_id, event, next_index, append_options)
.await
.map_err(|e| match e {
EventStoreError::EventIndexAlreadyExists { log_id: lid, .. } => {
LogManagerError::ConcurrentAppend(lid)
}
EventStoreError::IdempotentReplay {
idempotency_key,
log_id,
event_index,
} => LogManagerError::IdempotentReplay {
idempotency_key,
log_id,
event_index,
},
_ => LogManagerError::EventStoreError(e),
})?;
Ok(LogState {
next_index: next_index + 1,
})
}
pub async fn reduce(&self, log_id: &LogId) -> Result<Reduction<A>, LogManagerError> {
let maybe_reduction = self.reduction_cache.get(log_id).await?;
let (aggregate, starting_index) = maybe_reduction
.map(|re| (re.aggregate, re.through_index + 1))
.unwrap_or((A::default(), 0));
let event_stream = self
.event_store
.load(log_id, starting_index, u32::MAX)
.await?;
let reduction = Reduction {
log_id: log_id.clone(),
reduced_at: Utc::now(),
through_index: 0,
aggregate,
};
let reduction = event_stream
.try_fold(reduction, |mut r, e| async move {
r.aggregate.apply(&e);
r.through_index = std::cmp::max(r.through_index, e.index());
Ok(r)
})
.await?;
let _ = self.reduction_sender.send(reduction.clone()).await;
Ok(reduction)
}
pub async fn load<'a>(
&'a self,
log_id: &'a LogId,
starting_index: u32,
max_events: u32,
) -> Result<Vec<SerializableEventRecord<E>>, LogManagerError> {
let row_stream = self
.event_store
.load(log_id, starting_index, max_events)
.await?;
let event_envelopes: Vec<SerializableEventRecord<E>> = row_stream
.map_ok(|er| SerializableEventRecord {
index: er.index(),
recorded_at: er.recorded_at(),
idempotency_key: er.idempotency_key(),
event: er.event(),
})
.try_collect()
.await?;
Ok(event_envelopes)
}
}
#[cfg(test)]
mod tests {
use caches::fake::{FakeReductionCache, FakeReductionCacheOp};
use policies::LogLengthPolicy;
use stores::fake::FakeEventStore;
use uuid::Uuid;
use super::*;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum TestEvent {
Increment,
Decrement,
}
#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
pub struct TestAggregate {
pub count: i32,
}
impl Aggregate for TestAggregate {
type Event = TestEvent;
fn apply(&mut self, event_record: &impl EventRecord<TestEvent>) {
match event_record.event() {
TestEvent::Increment => self.count += 1,
TestEvent::Decrement => self.count -= 1,
}
}
}
fn log_manager() -> LogManager<
TestEvent,
TestAggregate,
FakeEventStore<TestEvent>,
FakeReductionCache<TestAggregate>,
> {
LogManager::new(
FakeEventStore::<TestEvent>::new(),
FakeReductionCache::<TestAggregate>::new(),
)
}
#[tokio::test]
async fn create_reduce() {
let mgr = log_manager();
let log_id = LogId::new();
mgr.create(&log_id, &TestEvent::Increment, &AppendOptions::default())
.await
.unwrap();
let reduction = mgr.reduce(&log_id).await.unwrap();
assert_eq!(reduction.log_id(), &log_id);
assert_eq!(reduction.through_index(), 0);
assert_eq!(reduction.aggregate().count, 1);
}
#[tokio::test]
async fn create_append_load() {
let mgr = log_manager();
let log_id = LogId::new();
let log_state = mgr
.create(&log_id, &TestEvent::Increment, &AppendOptions::default())
.await
.unwrap();
mgr.append(
&log_id,
log_state,
&TestEvent::Increment,
&AppendOptions::default(),
)
.await
.unwrap();
let events = mgr.load(&log_id, 0, 100).await.unwrap();
assert_eq!(events.len(), 2);
for (idx, evt) in events.iter().enumerate() {
assert_eq!(evt.index as usize, idx);
assert_eq!(evt.event, TestEvent::Increment);
assert_eq!(evt.idempotency_key, None);
}
}
#[tokio::test]
async fn create_append_many() {
let mgr = log_manager();
let log_id = LogId::new();
let mut log_state = mgr
.create(&log_id, &TestEvent::Increment, &AppendOptions::default())
.await
.unwrap();
for _i in 0..10 {
log_state = mgr
.append(
&log_id,
log_state,
&TestEvent::Increment,
&AppendOptions::default(),
)
.await
.unwrap();
}
let reduction = mgr.reduce(&log_id).await.unwrap();
assert_eq!(reduction.aggregate().count, 11);
}
#[tokio::test]
async fn cached_reduction_gets_used() {
let (sender, mut receiver) =
tokio::sync::mpsc::channel::<FakeReductionCacheOp<TestAggregate>>(64);
let mgr = LogManager::new(
FakeEventStore::<TestEvent>::new(),
FakeReductionCache::<TestAggregate>::with_notifications(sender),
);
let log_id = LogId::new();
mgr.create(&log_id, &TestEvent::Increment, &AppendOptions::default())
.await
.unwrap();
let first_reduction = mgr.reduce(&log_id).await.unwrap();
assert_eq!(first_reduction.log_id(), &log_id);
assert_eq!(first_reduction.through_index(), 0);
assert_eq!(first_reduction.aggregate().count, 1);
let op = receiver.recv().await.unwrap();
assert_eq!(
op,
FakeReductionCacheOp::Get {
log_id: log_id.clone(),
response: None
}
);
let op = receiver.recv().await.unwrap();
assert_eq!(
op,
FakeReductionCacheOp::Put {
reduction: first_reduction.clone()
}
);
mgr.append(
&log_id,
first_reduction.clone(),
&TestEvent::Decrement,
&AppendOptions::default(),
)
.await
.unwrap();
let second_reduction = mgr.reduce(&log_id).await.unwrap();
assert_eq!(second_reduction.log_id(), &log_id);
assert_eq!(second_reduction.through_index(), 1);
assert_eq!(second_reduction.aggregate().count, 0);
let op = receiver.recv().await.unwrap();
assert_eq!(
op,
FakeReductionCacheOp::Get {
log_id: log_id.clone(),
response: Some(first_reduction.clone()),
}
);
let op = receiver.recv().await.unwrap();
assert_eq!(
op,
FakeReductionCacheOp::Put {
reduction: second_reduction.clone()
}
);
}
#[tokio::test]
async fn idempotent_create() {
let mgr = log_manager();
let log_id = LogId::new();
let idempotency_key = Uuid::now_v7().to_string();
let create_options = AppendOptions {
idempotency_key: Some(idempotency_key.clone()),
..Default::default()
};
mgr.create(&log_id, &TestEvent::Increment, &create_options)
.await
.unwrap();
let replay_log_id = LogId::new();
let result = mgr
.create(&replay_log_id, &TestEvent::Increment, &create_options)
.await;
assert_eq!(
result,
Err(LogManagerError::IdempotentReplay {
idempotency_key: idempotency_key.clone(),
log_id: log_id.clone(), event_index: 0
})
);
}
#[tokio::test]
async fn idempotent_append() {
let mgr = log_manager();
let log_id = LogId::new();
let log_state = mgr
.create(&log_id, &TestEvent::Increment, &AppendOptions::default())
.await
.unwrap();
let idempotency_key = Uuid::now_v7().to_string();
let append_options = AppendOptions {
idempotency_key: Some(idempotency_key.clone()),
..Default::default()
};
let log_state = mgr
.append(&log_id, log_state, &TestEvent::Decrement, &append_options)
.await
.unwrap();
let reduction = mgr.reduce(&log_id).await.unwrap();
assert_eq!(reduction.through_index(), 1);
assert_eq!(reduction.aggregate().count, 0);
let result = mgr
.append(&log_id, log_state, &TestEvent::Decrement, &append_options)
.await;
assert_eq!(
result,
Err(LogManagerError::IdempotentReplay {
idempotency_key: idempotency_key.clone(),
log_id: log_id.clone(),
event_index: 1
})
);
let reduction = mgr.reduce(&log_id).await.unwrap();
assert_eq!(reduction.through_index(), 1);
assert_eq!(reduction.aggregate().count, 0);
}
#[tokio::test]
async fn concurrent_append() {
let mgr = log_manager();
let log_id = LogId::new();
let log_state = mgr
.create(&log_id, &TestEvent::Increment, &AppendOptions::default())
.await
.unwrap();
mgr.append(
&log_id,
log_state.clone(),
&TestEvent::Decrement,
&AppendOptions::default(),
)
.await
.unwrap();
let result = mgr
.append(
&log_id,
log_state,
&TestEvent::Decrement,
&AppendOptions::default(),
)
.await;
assert_eq!(result, Err(LogManagerError::ConcurrentAppend(log_id)));
}
#[tokio::test]
async fn log_length_caching_policy() {
let (sender, mut receiver) =
tokio::sync::mpsc::channel::<FakeReductionCacheOp<TestAggregate>>(64);
let mgr = LogManager::with_options(
FakeEventStore::<TestEvent>::new(),
FakeReductionCache::<TestAggregate>::with_notifications(sender),
LogManagerOptions {
caching_policy: Some(LogLengthPolicy::at_least(2)),
},
);
let log_id = LogId::new();
mgr.create(&log_id, &TestEvent::Increment, &AppendOptions::default())
.await
.unwrap();
let first_reduction = mgr.reduce(&log_id).await.unwrap();
assert_eq!(first_reduction.log_id(), &log_id);
assert_eq!(first_reduction.through_index(), 0);
assert_eq!(first_reduction.aggregate().count, 1);
let op = receiver.recv().await.unwrap();
assert_eq!(
op,
FakeReductionCacheOp::Get {
log_id: log_id.clone(),
response: None
}
);
mgr.append(
&log_id,
first_reduction,
&TestEvent::Decrement,
&AppendOptions::default(),
)
.await
.unwrap();
let second_reduction = mgr.reduce(&log_id).await.unwrap();
assert_eq!(second_reduction.log_id(), &log_id);
assert_eq!(second_reduction.through_index(), 1);
assert_eq!(second_reduction.aggregate().count, 0);
let op = receiver.recv().await.unwrap();
assert_eq!(
op,
FakeReductionCacheOp::Get {
log_id: log_id.clone(),
response: None
}
);
let op = receiver.recv().await.unwrap();
assert_eq!(
op,
FakeReductionCacheOp::Put {
reduction: second_reduction.clone(),
}
);
}
#[tokio::test]
async fn log_full() {
let mgr = log_manager();
let log_id = LogId::new();
let log_state = LogState {
next_index: u32::MAX,
};
let result = mgr
.append(
&log_id,
log_state,
&TestEvent::Increment,
&AppendOptions::default(),
)
.await;
assert_eq!(result, Err(LogManagerError::LogFull));
}
}