use serde::Deserialize;
use serde::{de::DeserializeOwned, Serialize};
use crate::stream_query::{StreamFilter, StreamQuery};
use crate::{all_the_tuples, union, BoxDynError, StateSnapshotter};
use crate::{event::Event, PersistedEvent};
use async_trait::async_trait;
use paste::paste;
use std::error::Error as StdError;
use std::ops::Deref;
pub trait StateMutate: StateQuery {
fn mutate(&mut self, event: Self::Event);
}
pub trait MultiState<E: Event + Clone> {
fn mutate_all(&mut self, event: PersistedEvent<E>);
fn query_all(&self) -> StreamQuery<E>;
fn version(&self) -> i64;
}
macro_rules! impl_multi_state {
(
[$($ty:ident),*], $last:ident
) => {
#[allow(unused_parens)]
impl<E, $($ty,)* $last> MultiState<E> for ($(StatePart<$ty>,)* StatePart<$last>)
where
E: Event + Clone,
$($ty: StateQuery + StateMutate,)*
$last: StateQuery + StateMutate,
<$last as StateQuery>::Event: TryFrom<E> + Into<E>,
$(<$ty as StateQuery>::Event: TryFrom<E> + Into<E>,)*
<$last as StateQuery>::Event: TryFrom<E> + Into<E>,
$(<<$ty as StateQuery>::Event as TryFrom<E>>::Error:
StdError + 'static + Send + Sync,)*
<<$last as StateQuery>::Event as TryFrom<E>>::Error:
StdError + 'static + Send + Sync,
{
fn mutate_all(&mut self, event: PersistedEvent<E>) {
paste! {
let ($([<state_ $ty:lower>],)* [<state_ $last:lower>])= self;
$(
if [<state_ $ty:lower>].matches_event(&event) {
[<state_ $ty:lower>].mutate_part(event.clone());
}
)*
if [<state_ $last:lower>].matches_event(&event) {
[<state_ $last:lower>].mutate_part(event.clone());
}
}
}
fn query_all(&self) -> StreamQuery<E> {
paste!{
let ($([<state_ $ty:lower>],)* [<state_ $last:lower>])= self;
union!($([<state_ $ty:lower>].query_part(),)* [<state_ $last:lower>].query_part())
}
}
fn version(&self) -> i64 {
paste!{
let ($([<state_ $ty:lower>],)* [<state_ $last:lower>])= self;
let version = [<state_ $last:lower>].version();
$(
let version = version.max([<state_ $ty:lower>].version());
)*
version
}
}
}
}
}
all_the_tuples!(impl_multi_state);
#[async_trait]
pub trait MultiStateSnapshot<T: StateSnapshotter> {
async fn load_all(&mut self, backend: &T) -> i64;
async fn store_all(&self, backend: &T) -> Result<(), BoxDynError>;
}
macro_rules! impl_multi_state_snapshot {
(
[$($ty:ident),*], $last:ident
) => {
#[async_trait]
#[allow(unused_parens)]
impl<B, $($ty,)* $last> MultiStateSnapshot<B> for ($(StatePart<$ty>,)* StatePart<$last>)
where
B: StateSnapshotter + Send + Sync,
$($ty: StateQuery + Serialize + DeserializeOwned + 'static,)*
$last: StateQuery + Serialize + DeserializeOwned + 'static,
{
async fn load_all(&mut self, backend: &B) -> i64 {
paste! {
let ($([<state_ $ty:lower>],)* [<state_ $last:lower>]) = self;
*[<state_ $last:lower>] = backend.load_snapshot([<state_ $last:lower>].clone()).await;
let last_event_id = [<state_ $last:lower>].version;
$(
*[<state_ $ty:lower>] = backend.load_snapshot([<state_ $ty:lower>].clone()).await;
let last_event_id = last_event_id.max([<state_ $ty:lower>].version);
)*
}
last_event_id
}
async fn store_all(&self, backend: &B) -> Result<(), BoxDynError>{
paste!{
let ($([<state_ $ty:lower>],)* [<state_ $last:lower>]) = self;
$(
backend.store_snapshot(&[<state_ $ty:lower>]).await?;
)*
backend.store_snapshot(&[<state_ $last:lower>]).await?;
}
Ok(())
}
}
}
}
all_the_tuples!(impl_multi_state_snapshot);
pub trait StateQuery: Clone + Send + Sync {
const NAME: &'static str;
type Event: Event + Clone + Send + Sync;
fn query(&self) -> StreamQuery<Self::Event>;
}
impl<S, E: Clone> From<&S> for StreamQuery<E>
where
S: StateQuery<Event = E>,
{
fn from(state: &S) -> Self {
state.query()
}
}
#[derive(Clone, Serialize, Deserialize)]
pub struct StatePart<S: StateQuery> {
version: i64,
applied_events: u64,
inner: S,
}
impl<S: StateQuery> StatePart<S> {
pub fn new(version: i64, payload: S) -> Self {
Self {
version,
applied_events: 0,
inner: payload,
}
}
pub fn version(&self) -> i64 {
self.version
}
pub fn applied_events(&self) -> u64 {
self.applied_events
}
pub fn query_part(&self) -> StreamQuery<<S as StateQuery>::Event> {
self.inner.query().change_origin(self.version)
}
pub fn matches_event<U>(&self, event: &PersistedEvent<U>) -> bool
where
U: Event + Clone,
<S as StateQuery>::Event: Into<U>,
{
matches_filter(event, self.query_part().convert().filter())
}
pub fn mutate_part<E>(&mut self, event: PersistedEvent<E>)
where
E: Event,
S: StateMutate,
<S as StateQuery>::Event: TryFrom<E>,
<<S as StateQuery>::Event as TryFrom<E>>::Error: StdError + 'static + Send + Sync,
{
self.version = event.id;
self.applied_events += 1;
self.inner.mutate(event.event.try_into().unwrap());
}
}
impl<S: StateQuery> Deref for StatePart<S> {
type Target = S;
fn deref(&self) -> &S {
&self.inner
}
}
pub trait IntoStatePart<T>: Sized {
type Target;
fn into_state_part(self) -> Self::Target;
}
pub trait IntoState<T>: Sized {
fn into_state(self) -> T;
}
fn matches_filter<E: Event>(event: &PersistedEvent<E>, filter: &StreamFilter) -> bool {
match filter {
StreamFilter::Events { names } => names.contains(&event.name()),
StreamFilter::ExcludeEvents { names } => !names.contains(&event.name()),
StreamFilter::Eq { ident, value } => event
.domain_identifiers()
.get(ident)
.map(|v| v == value)
.unwrap_or(true),
StreamFilter::And { l, r } => matches_filter(event, l) && matches_filter(event, r),
StreamFilter::Or { l, r } => matches_filter(event, l) || matches_filter(event, r),
StreamFilter::Origin { id } => event.id() > *id,
}
}
macro_rules! impl_from_state {
(
[$($ty:ident),*], $last:ident
) => {
#[allow(unused_parens)]
impl<$($ty,)* $last> IntoStatePart<($($ty,)* $last)> for ($($ty,)* $last) where
$($ty: StateQuery,)*
$last: StateQuery,
{
type Target = ($(StatePart<$ty>,)* StatePart<$last>);
paste::paste! {
fn into_state_part(self) -> ($(StatePart<$ty>,)*StatePart<$last>){
let ($([<state_ $ty:lower>],)* [<state_ $last:lower>])= self;
($(StatePart{ inner: [<state_ $ty:lower>], version: 0, applied_events: 0},)* StatePart{inner: [<state_ $last:lower>], version: 0, applied_events: 0})
}
}
}
#[allow(unused_parens)]
impl<$($ty,)* $last> IntoState<($($ty,)* $last)> for ($(StatePart<$ty>,)* StatePart<$last>) where
$($ty: StateQuery,)*
$last: StateQuery,
{
paste::paste! {
fn into_state(self) -> ($($ty,)* $last){
let ($([<state_ $ty:lower>],)* [<state_ $last:lower>])= self;
($( [<state_ $ty:lower>].inner,)* [<state_ $last:lower>].inner)
}
}
}
}
}
all_the_tuples!(impl_from_state);
#[cfg(test)]
mod test {
use futures::executor::block_on;
use super::*;
use crate::utils::tests::*;
#[test]
fn it_mutates_all() {
let mut state = (Cart::new("c1"), Cart::new("c2")).into_state_part();
state.mutate_all(PersistedEvent::new(1, item_added_event("p1", "c1")));
state.mutate_all(PersistedEvent::new(2, item_added_event("p2", "c2")));
let (cart1, cart2) = state;
assert_eq!(cart1.version, 1);
assert_eq!(cart1.applied_events, 1);
assert_eq!(cart1.into_state(), cart("c1", ["p1".to_string()]));
assert_eq!(cart2.version, 2);
assert_eq!(cart2.applied_events, 1);
assert_eq!(cart2.into_state(), cart("c2", ["p2".to_string()]));
}
#[test]
fn it_queries_all() {
let cart1 = Cart::new("c1");
let cart2 = Cart::new("c2");
let state = (cart1.clone(), cart2.clone()).into_state_part();
let query: StreamQuery<ShoppingCartEvent> = state.query_all();
assert_eq!(
query,
union!(
cart1.query().change_origin(0),
cart2.query().change_origin(0)
)
);
}
#[test]
fn it_stores_all() {
let multi_state = (cart("c1", []), cart("c2", [])).into_state_part();
let mut snapshotter = MockStateSnapshotter::new();
snapshotter
.expect_store_snapshot()
.once()
.withf(|s: &StatePart<Cart>| s.inner == cart("c1", []))
.return_once(|_| Ok(()));
snapshotter
.expect_store_snapshot()
.once()
.withf(|s: &StatePart<Cart>| s.inner == cart("c2", []))
.return_once(|_| Ok(()));
block_on(multi_state.store_all(&snapshotter)).unwrap();
}
#[test]
fn it_loads_all() {
let mut multi_state = (cart("c1", []), cart("c2", [])).into_state_part();
let mut snapshotter = MockStateSnapshotter::new();
snapshotter
.expect_load_snapshot()
.once()
.withf(|q| q.inner == cart("c1", []))
.returning(|_| cart("c1", ["p1".to_owned()]).into_state_part());
snapshotter
.expect_load_snapshot()
.once()
.withf(|q| q.inner == cart("c2", []))
.returning(|_| cart("c2", ["p2".to_owned()]).into_state_part());
block_on(multi_state.load_all(&snapshotter));
let (cart1, cart2) = multi_state;
assert_eq!(cart1.inner, cart("c1", ["p1".to_owned()]));
assert_eq!(cart2.inner, cart("c2", ["p2".to_owned()]));
}
}