#![cfg_attr(docsrs, feature(doc_cfg))]
pub mod convert;
mod evt_log;
mod snapshot_store;
pub use evt_log::*;
pub use snapshot_store::*;
use anyhow::anyhow;
use bytes::Bytes;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use std::{
error::Error as StdError,
fmt::Debug,
num::{NonZeroU64, NonZeroUsize},
};
use thiserror::Error;
use tokio::{
pin,
sync::{mpsc, oneshot},
task,
};
use tracing::{debug, error, instrument};
pub trait EventSourced {
type Id: Debug + Send + 'static;
type Cmd: Debug + Send + Sync + 'static;
type Evt: Debug + Send + Sync;
type State: Debug + Default + Send + Sync + 'static;
type Error: StdError + Send + Sync + 'static;
const TYPE_NAME: &'static str;
fn handle_cmd(
id: &Self::Id,
state: &Self::State,
cmd: Self::Cmd,
) -> Result<Self::Evt, Self::Error>;
fn handle_evt(state: Self::State, evt: Self::Evt) -> Self::State;
}
pub trait EventSourcedExt: Sized {
#[allow(async_fn_in_trait)]
#[allow(clippy::too_many_arguments)]
#[instrument(skip(evt_log, snapshot_store, binarizer))]
async fn spawn<
L,
S,
EvtToBytes,
EvtToBytesError,
StateToBytes,
StateToBytesError,
EvtFromBytes,
EvtFromBytesError,
StateFromBytes,
StateFromBytesError,
>(
id: Self::Id,
snapshot_after: Option<NonZeroU64>,
cmd_buffer: NonZeroUsize,
mut evt_log: L,
mut snapshot_store: S,
binarizer: Binarizer<EvtToBytes, EvtFromBytes, StateToBytes, StateFromBytes>,
) -> Result<EntityRef<Self>, SpawnError>
where
Self: EventSourced,
L: EvtLog<Id = Self::Id>,
S: SnapshotStore<Id = Self::Id>,
EvtToBytes: Fn(&Self::Evt) -> Result<Bytes, EvtToBytesError> + Send + Sync + 'static,
EvtToBytesError: StdError + Send + Sync + 'static,
StateToBytes: Fn(&Self::State) -> Result<Bytes, StateToBytesError> + Send + Sync + 'static,
StateToBytesError: StdError + Send + Sync + 'static,
EvtFromBytes:
Fn(Bytes) -> Result<Self::Evt, EvtFromBytesError> + Copy + Send + Sync + 'static,
EvtFromBytesError: StdError + Send + Sync + 'static,
StateFromBytes:
Fn(Bytes) -> Result<Self::State, StateFromBytesError> + Copy + Send + Sync + 'static,
StateFromBytesError: StdError + Send + Sync + 'static,
{
let Binarizer {
evt_to_bytes,
evt_from_bytes,
state_to_bytes,
state_from_bytes,
} = binarizer;
let (snapshot_seq_no, state) = snapshot_store
.load::<Self::State, _, _>(&id, state_from_bytes)
.await
.map_err(|error| SpawnError::LoadSnapshot(error.into()))?
.map(|Snapshot { seq_no, state }| {
debug!(?id, seq_no, ?state, "restored snapshot");
(seq_no, state)
})
.unzip();
let mut state = state.unwrap_or_default();
let mut last_seq_no = evt_log
.last_seq_no(Self::TYPE_NAME, &id)
.await
.map_err(|error| SpawnError::LastNonZeroU64(error.into()))?;
if last_seq_no < snapshot_seq_no {
return Err(SpawnError::InvalidLastSeqNo(last_seq_no, snapshot_seq_no));
};
if snapshot_seq_no < last_seq_no {
let from_seq_no = snapshot_seq_no.successor();
let last_seq_no = last_seq_no.unwrap(); debug!(?id, from_seq_no, last_seq_no, "replaying evts");
let evts = evt_log
.evts_by_id::<Self::Evt, _, _>(Self::TYPE_NAME, &id, from_seq_no, evt_from_bytes)
.await
.map_err(|error| SpawnError::EvtsById(error.into()))?;
pin!(evts);
while let Some(evt) = evts.next().await {
let (seq_no, evt) = evt.map_err(|error| SpawnError::NextEvt(error.into()))?;
state = Self::handle_evt(state, evt);
if seq_no == last_seq_no {
break;
}
}
debug!(?id, ?state, "replayed evts");
}
let mut evt_count = 0u64;
let (cmd_in, mut cmd_out) = mpsc::channel::<(
Self::Cmd,
oneshot::Sender<Result<(), Self::Error>>,
)>(cmd_buffer.get());
task::spawn(async move {
while let Some((cmd, result_sender)) = cmd_out.recv().await {
debug!(?id, ?cmd, "handling command");
let result = Self::handle_cmd(&id, &state, cmd);
debug!(?id, ?result, "handled command");
if let Err(error) = result {
if result_sender.send(Err(error)).is_err() {
error!(?id, "cannot send command handler error");
};
continue;
};
let evt = result.unwrap();
debug!(?id, ?evt, "persisting event");
match evt_log
.persist(&evt, Self::TYPE_NAME, &id, last_seq_no, &evt_to_bytes)
.await
{
Ok(seq_no) => {
debug!(?id, ?evt, seq_no, "persited event");
last_seq_no = Some(seq_no);
state = Self::handle_evt(state, evt);
evt_count += 1;
if snapshot_after
.map(|a| evt_count % a == 0)
.unwrap_or_default()
{
debug!(?id, seq_no, evt_count, "saving snapshot");
if let Err(error) = snapshot_store
.save(&id, seq_no, &state, &state_to_bytes)
.await
{
error!(?id, error = error_chain(error), "cannot save snapshot");
};
}
if result_sender.send(Ok(())).is_err() {
error!(?id, "cannot send command handler OK");
};
}
Err(error) => {
error!(?id, error = error_chain(error), "cannot persist event");
break;
}
}
}
debug!(?id, "entity terminated");
});
Ok(EntityRef { cmd_in })
}
}
#[derive(Debug, Error)]
pub enum SpawnError {
#[error("cannot load snapshot from snapshot store")]
LoadSnapshot(#[source] Box<dyn StdError + Send + Sync>),
#[error("last sequence number {0:?} less than snapshot sequence number {0:?}")]
InvalidLastSeqNo(Option<NonZeroU64>, Option<NonZeroU64>),
#[error("cannot get last seqence number from event log")]
LastNonZeroU64(#[source] Box<dyn StdError + Send + Sync>),
#[error("cannot get events by ID from event log")]
EvtsById(#[source] Box<dyn StdError + Send + Sync>),
#[error("cannot get next event from event log")]
NextEvt(#[source] Box<dyn StdError + Send + Sync>),
}
impl<E> EventSourcedExt for E where E: EventSourced {}
#[derive(Debug, Clone)]
#[allow(clippy::type_complexity)]
pub struct EntityRef<E>
where
E: EventSourced,
{
cmd_in: mpsc::Sender<(E::Cmd, oneshot::Sender<Result<(), E::Error>>)>,
}
impl<E> EntityRef<E>
where
E: EventSourced,
{
#[instrument(skip(self))]
pub async fn handle_cmd(&self, cmd: E::Cmd) -> Result<(), HandleCmdError<E>> {
let (result_in, result_out) = oneshot::channel();
self.cmd_in
.send((cmd, result_in))
.await
.map_err(|_| HandleCmdError::Internal("cannot send command".to_string()))?;
result_out
.await
.map_err(|_| {
HandleCmdError::Internal("cannot receive command handler result".to_string())
})?
.map_err(HandleCmdError::Handler)
}
}
#[derive(Debug, Error, Serialize, Deserialize)]
pub enum HandleCmdError<E>
where
E: EventSourced,
{
#[error("{0}")]
Internal(String),
#[error(transparent)]
Handler(E::Error),
}
pub struct Binarizer<EvtToBytes, EvtFromBytes, StateToBytes, StateFromBytes> {
pub evt_to_bytes: EvtToBytes,
pub evt_from_bytes: EvtFromBytes,
pub state_to_bytes: StateToBytes,
pub state_from_bytes: StateFromBytes,
}
pub trait SeqNo<T> {
fn successor(self) -> T;
}
impl SeqNo<NonZeroU64> for NonZeroU64 {
fn successor(self) -> NonZeroU64 {
self.saturating_add(1)
}
}
impl SeqNo<NonZeroU64> for Option<NonZeroU64> {
fn successor(self) -> NonZeroU64 {
self.map(|n| n.saturating_add(1)).unwrap_or(NonZeroU64::MIN)
}
}
fn error_chain<E>(error: E) -> String
where
E: StdError + Send + Sync + 'static,
{
format!("{}", anyhow!(error))
}
#[cfg(all(test, feature = "serde_json"))]
mod tests {
use super::*;
use futures::{stream, Stream};
use std::{convert::Infallible, iter};
use tracing_test::traced_test;
use uuid::Uuid;
#[derive(Debug)]
struct Simple;
impl EventSourced for Simple {
type Id = Uuid;
type Cmd = ();
type Evt = ();
type State = u64;
type Error = Infallible;
const TYPE_NAME: &'static str = "simple";
fn handle_cmd(
_id: &Self::Id,
_state: &Self::State,
_cmd: Self::Cmd,
) -> Result<Self::Evt, Self::Error> {
Ok(())
}
fn handle_evt(mut state: Self::State, _evt: Self::Evt) -> Self::State {
state += 1;
state
}
}
#[derive(Debug, Clone)]
struct TestEvtLog;
impl EvtLog for TestEvtLog {
type Id = Uuid;
type Error = TestEvtLogError;
async fn persist<E, ToBytes, ToBytesError>(
&mut self,
_evt: &E,
_type: &str,
_id: &Self::Id,
last_seq_no: Option<NonZeroU64>,
_to_bytes: &ToBytes,
) -> Result<NonZeroU64, Self::Error>
where
E: Sync,
ToBytes: Fn(&E) -> Result<Bytes, ToBytesError> + Sync,
ToBytesError: StdError + Send + Sync + 'static,
{
let seq_no = last_seq_no.unwrap_or(NonZeroU64::MIN);
Ok(seq_no)
}
async fn last_seq_no(
&self,
_type: &str,
_entity_id: &Self::Id,
) -> Result<Option<NonZeroU64>, Self::Error> {
Ok(Some(42.try_into().unwrap()))
}
async fn evts_by_id<E, FromBytes, FromBytesError>(
&self,
_type: &str,
_id: &Self::Id,
seq_no: NonZeroU64,
evt_from_bytes: FromBytes,
) -> Result<impl Stream<Item = Result<(NonZeroU64, E), Self::Error>> + Send, Self::Error>
where
E: Send,
FromBytes: Fn(Bytes) -> Result<E, FromBytesError> + Copy + Send + Sync,
FromBytesError: StdError + Send + Sync + 'static,
{
let successors = iter::successors(Some(seq_no), |n| n.checked_add(1));
let evts = stream::iter(successors).map(move |n| {
let evt = evt_from_bytes(serde_json::to_vec(&()).unwrap().into()).unwrap();
Ok((n, evt))
});
Ok(evts)
}
async fn evts_by_type<E, FromBytes, FromBytesError>(
&self,
_type: &str,
_seq_no: NonZeroU64,
_evt_from_bytes: FromBytes,
) -> Result<impl Stream<Item = Result<(NonZeroU64, E), Self::Error>> + Send, Self::Error>
where
E: Send,
FromBytes: Fn(Bytes) -> Result<E, FromBytesError> + Copy + Send,
FromBytesError: StdError + Send + Sync + 'static,
{
Ok(stream::empty())
}
}
#[derive(Debug, Error)]
#[error("TestEvtLogError")]
struct TestEvtLogError(#[source] Box<dyn StdError + Send + Sync>);
#[derive(Debug, Clone)]
struct TestSnapshotStore;
impl SnapshotStore for TestSnapshotStore {
type Id = Uuid;
type Error = TestSnapshotStoreError;
async fn save<S, ToBytes, ToBytesError>(
&mut self,
_id: &Self::Id,
_seq_no: NonZeroU64,
_state: &S,
_state_to_bytes: &ToBytes,
) -> Result<(), Self::Error>
where
S: Send,
ToBytes: Fn(&S) -> Result<Bytes, ToBytesError> + Sync,
ToBytesError: StdError,
{
Ok(())
}
async fn load<S, FromBytes, FromBytesError>(
&self,
_id: &Self::Id,
state_from_bytes: FromBytes,
) -> Result<Option<Snapshot<S>>, Self::Error>
where
FromBytes: Fn(Bytes) -> Result<S, FromBytesError>,
FromBytesError: StdError,
{
let bytes = serde_json::to_vec(&21).unwrap();
let state = state_from_bytes(bytes.into()).unwrap();
Ok(Some(Snapshot {
seq_no: 21.try_into().unwrap(),
state,
}))
}
}
#[derive(Debug, Error)]
#[error("TestSnapshotStoreError")]
struct TestSnapshotStoreError;
#[tokio::test]
#[traced_test]
async fn test_spawn_handle_cmd() -> Result<(), Box<dyn StdError>> {
let evt_log = TestEvtLog;
let snapshot_store = TestSnapshotStore;
let entity = Simple::spawn(
Uuid::from_u128(1),
None,
unsafe { NonZeroUsize::new_unchecked(1) },
evt_log,
snapshot_store,
convert::serde_json::binarizer(),
)
.await?;
entity.handle_cmd(()).await?;
assert!(logs_contain("state=42"));
Ok(())
}
}