use crate::trivial::{NullEventStore, NullSnapshotStore};
use cqrs_core::{
Aggregate, AggregateCommand, AggregateEvent, AggregateId, CqrsError, EventNumber, EventSink,
EventSource, Events, Precondition, ProducedEvent, Since, SnapshotSink, SnapshotSource, Version,
VersionedAggregate,
};
use std::{
borrow::{Borrow, BorrowMut},
fmt,
marker::PhantomData,
};
#[derive(Clone, Copy, Debug, Default, Hash, PartialEq, Eq)]
pub struct HydratedAggregate<A>
where
A: Aggregate,
{
version: Version,
snapshot_version: Option<Version>,
state: A,
}
impl<A> HydratedAggregate<A>
where
A: Aggregate,
{
pub fn version(&self) -> Version {
self.version
}
pub fn snapshot_version(&self) -> Option<Version> {
self.snapshot_version
}
pub fn set_snapshot_version(&mut self, new_snapshot_version: Version) {
self.snapshot_version = Some(new_snapshot_version);
}
pub fn state(&self) -> &A {
&self.state
}
pub fn apply_events<E: AggregateEvent<A>, I: IntoIterator<Item = E>>(&mut self, events: I) {
for event in events {
self.apply(event);
}
}
pub fn apply<E: AggregateEvent<A>>(&mut self, event: E) {
self.state.apply(event);
self.version.incr();
}
}
impl<A> AsRef<A> for HydratedAggregate<A>
where
A: Aggregate,
{
fn as_ref(&self) -> &A {
&self.state
}
}
impl<A> Borrow<A> for HydratedAggregate<A>
where
A: Aggregate,
{
fn borrow(&self) -> &A {
&self.state
}
}
#[derive(Clone, Copy, Debug, Default, Hash, PartialEq, Eq)]
pub struct Entity<I, A>
where
A: Aggregate,
I: AggregateId<A>,
{
id: I,
aggregate: HydratedAggregate<A>,
}
impl<I, A> Entity<I, A>
where
A: Aggregate,
I: AggregateId<A>,
{
pub fn new(id: I, aggregate: HydratedAggregate<A>) -> Self {
Entity { id, aggregate }
}
pub fn id(&self) -> &I {
&self.id
}
pub fn aggregate(&self) -> &HydratedAggregate<A> {
&self.aggregate
}
pub fn aggregate_mut(&mut self) -> &mut HydratedAggregate<A> {
&mut self.aggregate
}
}
impl<I, A> From<Entity<I, A>> for HydratedAggregate<A>
where
A: Aggregate,
I: AggregateId<A>,
{
fn from(entity: Entity<I, A>) -> Self {
entity.aggregate
}
}
impl<I, A> AsRef<HydratedAggregate<A>> for Entity<I, A>
where
A: Aggregate,
I: AggregateId<A>,
{
fn as_ref(&self) -> &HydratedAggregate<A> {
&self.aggregate
}
}
impl<I, A> AsMut<HydratedAggregate<A>> for Entity<I, A>
where
A: Aggregate,
I: AggregateId<A>,
{
fn as_mut(&mut self) -> &mut HydratedAggregate<A> {
&mut self.aggregate
}
}
impl<I, A> Borrow<HydratedAggregate<A>> for Entity<I, A>
where
A: Aggregate,
I: AggregateId<A>,
{
fn borrow(&self) -> &HydratedAggregate<A> {
&self.aggregate
}
}
impl<I, A> Borrow<A> for Entity<I, A>
where
A: Aggregate,
I: AggregateId<A>,
{
fn borrow(&self) -> &A {
self.aggregate.borrow()
}
}
impl<I, A> BorrowMut<HydratedAggregate<A>> for Entity<I, A>
where
A: Aggregate,
I: AggregateId<A>,
{
fn borrow_mut(&mut self) -> &mut HydratedAggregate<A> {
&mut self.aggregate
}
}
pub trait EntitySource<A, E>: EventSource<A, E> + SnapshotSource<A>
where
A: Aggregate,
E: AggregateEvent<A>,
{
fn load_from_snapshot<I>(&self, id: &I) -> EntityLoadSnapshotResult<A, Self>
where
I: AggregateId<A>,
{
let entity = if let Some(snapshot) = self.get_snapshot(id)? {
Some(HydratedAggregate {
version: snapshot.version,
snapshot_version: Some(snapshot.version),
state: snapshot.payload,
})
} else {
None
};
Ok(entity)
}
fn refresh<I>(
&self,
id: &I,
aggregate: &mut HydratedAggregate<A>,
) -> Result<(), <Self as EventSource<A, E>>::Error>
where
I: AggregateId<A>,
{
let seq_events = self.read_events(id, aggregate.version.into(), None)?;
if let Some(seq_events) = seq_events {
for seq_event in seq_events {
aggregate.apply(seq_event.event);
debug_assert_eq!(Version::Number(seq_event.sequence), aggregate.version);
}
}
Ok(())
}
fn rehydrate<I>(&self, id: &I) -> EntityRefreshResult<A, E, Self>
where
I: AggregateId<A>,
{
let aggregate = self
.load_from_snapshot(id)
.map_err(EntityLoadError::SnapshotSource)?;
let missing = aggregate.is_none();
let mut aggregate = aggregate.unwrap_or_default();
self.refresh(id, &mut aggregate)
.map_err(EntityLoadError::EventSource)?;
if missing && aggregate.version == Version::Initial {
Ok(None)
} else {
Ok(Some(aggregate))
}
}
}
pub type EntityLoadSnapshotResult<A, L> =
Result<Option<HydratedAggregate<A>>, <L as SnapshotSource<A>>::Error>;
pub type EntityRefreshResult<A, E, L> = Result<
Option<HydratedAggregate<A>>,
EntityLoadError<<L as EventSource<A, E>>::Error, <L as SnapshotSource<A>>::Error>,
>;
pub type EntityPersistResult<A, E, M, L> =
Result<(), EntityPersistError<<L as EventSink<A, E, M>>::Error, <L as SnapshotSink<A>>::Error>>;
pub type EntityExecAndPersistResult<A, C, M, L> = Result<
HydratedAggregate<A>,
EntityExecAndPersistError<
A,
C,
<L as EventSink<A, ProducedEvent<A, C>, M>>::Error,
<L as SnapshotSink<A>>::Error,
>,
>;
pub type EntityResult<A, C, M, L> = Result<
HydratedAggregate<A>,
EntityError<
<L as EventSource<A, ProducedEvent<A, C>>>::Error,
<L as SnapshotSource<A>>::Error,
A,
C,
<L as EventSink<A, ProducedEvent<A, C>, M>>::Error,
<L as SnapshotSink<A>>::Error,
>,
>;
pub type EntityOptionResult<A, C, M, L> = Result<
Option<HydratedAggregate<A>>,
EntityError<
<L as EventSource<A, ProducedEvent<A, C>>>::Error,
<L as SnapshotSource<A>>::Error,
A,
C,
<L as EventSink<A, ProducedEvent<A, C>, M>>::Error,
<L as SnapshotSink<A>>::Error,
>,
>;
impl<A, E, T> EntitySource<A, E> for T
where
A: Aggregate,
E: AggregateEvent<A>,
T: EventSource<A, E> + SnapshotSource<A>,
{
}
pub trait EntitySink<A, E, M>: EventSink<A, E, M> + SnapshotSink<A>
where
A: Aggregate,
E: AggregateEvent<A>,
{
fn apply_events_and_persist<I, Es>(
&self,
id: &I,
aggregate: &mut HydratedAggregate<A>,
events: Es,
expected_version: Version,
metadata: M,
) -> EntityPersistResult<A, E, M, Self>
where
I: AggregateId<A>,
Es: Events<E>,
{
self.append_events(
id,
events.as_ref(),
Some(Precondition::ExpectedVersion(expected_version)),
metadata,
)
.map_err(EntityPersistError::EventSink)?;
for event in events {
aggregate.apply(event);
}
let new_snapshot_version = self
.persist_snapshot(
id,
aggregate.state(),
aggregate.version(),
aggregate.snapshot_version(),
)
.map_err(EntityPersistError::SnapshotSink)?;
aggregate.set_snapshot_version(new_snapshot_version);
Ok(())
}
fn exec_and_persist<I, C>(
&self,
id: &I,
aggregate: Option<HydratedAggregate<A>>,
command: C,
precondition: Option<Precondition>,
metadata: M,
) -> EntityExecAndPersistResult<A, C, M, Self>
where
I: AggregateId<A>,
C: AggregateCommand<A, Event = E>,
C::Events: Events<E>,
{
if let Some(precondition) = precondition {
let initial_version = aggregate.as_ref().map(|agg| agg.version);
precondition.verify(initial_version)?;
}
let mut aggregate = aggregate.unwrap_or_default();
let expected_version = aggregate.version;
match aggregate.state.execute(command) {
Ok(events) => {
self.apply_events_and_persist(
id,
&mut aggregate,
events,
expected_version,
metadata,
)
.map_err(EntityExecAndPersistError::Persist)?;
}
Err(e) => {
return Err(EntityExecAndPersistError::Exec(aggregate, e));
}
}
Ok(aggregate)
}
}
impl<A, E, M, T> EntitySink<A, E, M> for T
where
A: Aggregate,
E: AggregateEvent<A>,
T: EventSink<A, E, M> + SnapshotSink<A>,
{
}
pub trait EntityStore<A, E, M>: EntitySource<A, E> + EntitySink<A, E, M>
where
A: Aggregate,
E: AggregateEvent<A>,
{
fn load_or_default_exec_and_persist<I, C>(
&self,
id: &I,
command: C,
precondition: Option<Precondition>,
metadata: M,
) -> EntityResult<A, C, M, Self>
where
I: AggregateId<A>,
C: AggregateCommand<A, Event = E>,
C::Events: Events<E>,
{
let aggregate = self.rehydrate(id).map_err(EntityError::Load)?;
let aggregate = self.exec_and_persist(id, aggregate, command, precondition, metadata)?;
Ok(aggregate)
}
fn load_exec_and_persist<I, C>(
&self,
id: &I,
command: C,
precondition: Option<Precondition>,
metadata: M,
) -> EntityOptionResult<A, C, M, Self>
where
I: AggregateId<A>,
C: AggregateCommand<A, Event = E>,
C::Events: Events<E>,
{
if let Some(aggregate) = self.rehydrate(id).map_err(EntityError::Load)? {
let aggregate =
self.exec_and_persist(id, Some(aggregate), command, precondition, metadata)?;
Ok(Some(aggregate))
} else {
Ok(None)
}
}
}
impl<A, E, M, T> EntityStore<A, E, M> for T
where
A: Aggregate,
E: AggregateEvent<A>,
T: EntitySource<A, E> + EntitySink<A, E, M>,
{
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct CompositeEntitySource<'e, 's, A, E, ES, SS>
where
A: Aggregate,
E: AggregateEvent<A>,
ES: EventSource<A, E> + 'e,
SS: SnapshotSource<A> + 's,
{
event_source: &'e ES,
snapshot_source: &'s SS,
_phantom: PhantomData<&'e (A, E)>,
}
impl<A, E> Default
for CompositeEntitySource<'static, 'static, A, E, NullEventStore<A, E>, NullSnapshotStore<A>>
where
A: Aggregate,
E: AggregateEvent<A>,
{
fn default() -> Self {
CompositeEntitySource {
event_source: &NullEventStore::DEFAULT,
snapshot_source: &NullSnapshotStore::DEFAULT,
_phantom: PhantomData,
}
}
}
impl<'e, 's, A, E, ES, SS> CompositeEntitySource<'e, 's, A, E, ES, SS>
where
A: Aggregate,
E: AggregateEvent<A>,
ES: EventSource<A, E> + 'e,
SS: SnapshotSource<A> + 's,
{
pub fn with_event_source<'new_e, NewES>(
self,
event_source: &'new_e NewES,
) -> CompositeEntitySource<'new_e, 's, A, E, NewES, SS>
where
NewES: EventSource<A, E> + 'new_e,
{
CompositeEntitySource {
event_source,
snapshot_source: self.snapshot_source,
_phantom: PhantomData,
}
}
pub fn with_snapshot_source<'new_s, NewSS>(
self,
snapshot_source: &'new_s NewSS,
) -> CompositeEntitySource<'e, 'new_s, A, E, ES, NewSS>
where
NewSS: SnapshotSource<A> + 'new_s,
{
CompositeEntitySource {
event_source: self.event_source,
snapshot_source,
_phantom: PhantomData,
}
}
}
impl<'e, 's, A, E, ES, SS> EventSource<A, E> for CompositeEntitySource<'e, 's, A, E, ES, SS>
where
A: Aggregate,
E: AggregateEvent<A>,
ES: EventSource<A, E> + 'e,
SS: SnapshotSource<A> + 's,
{
type Error = ES::Error;
type Events = ES::Events;
fn read_events<I>(
&self,
id: &I,
since: Since,
max_count: Option<u64>,
) -> Result<Option<Self::Events>, Self::Error>
where
I: AggregateId<A>,
{
self.event_source.read_events(id, since, max_count)
}
}
impl<'e, 's, A, E, ES, SS> SnapshotSource<A> for CompositeEntitySource<'e, 's, A, E, ES, SS>
where
A: Aggregate,
E: AggregateEvent<A>,
ES: EventSource<A, E> + 'e,
SS: SnapshotSource<A> + 's,
{
type Error = SS::Error;
fn get_snapshot<I>(
&self,
id: &I,
) -> Result<Option<VersionedAggregate<A>>, <Self as SnapshotSource<A>>::Error>
where
I: AggregateId<A>,
{
self.snapshot_source.get_snapshot(id)
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct CompositeEntitySink<'e, 's, A, E, M, ES, SS>
where
A: Aggregate,
E: AggregateEvent<A>,
ES: EventSink<A, E, M> + 'e,
SS: SnapshotSink<A> + 's,
{
event_sink: &'e ES,
snapshot_sink: &'s SS,
_phantom: PhantomData<&'e (A, E, M)>,
}
impl<A, E, M> Default
for CompositeEntitySink<'static, 'static, A, E, M, NullEventStore<A, E>, NullSnapshotStore<A>>
where
A: Aggregate,
E: AggregateEvent<A>,
{
fn default() -> Self {
CompositeEntitySink {
event_sink: &NullEventStore::DEFAULT,
snapshot_sink: &NullSnapshotStore::DEFAULT,
_phantom: PhantomData,
}
}
}
impl<'e, 's, A, E, M, ES, SS> CompositeEntitySink<'e, 's, A, E, M, ES, SS>
where
A: Aggregate,
E: AggregateEvent<A>,
ES: EventSink<A, E, M> + 'e,
SS: SnapshotSink<A> + 's,
{
pub fn with_event_sink<'new_e, NewES>(
self,
event_sink: &'new_e NewES,
) -> CompositeEntitySink<'new_e, 's, A, E, M, NewES, SS>
where
NewES: EventSink<A, E, M> + 'new_e,
{
CompositeEntitySink {
event_sink,
snapshot_sink: self.snapshot_sink,
_phantom: PhantomData,
}
}
pub fn with_snapshot_sink<'new_s, NewSS>(
self,
snapshot_sink: &'new_s NewSS,
) -> CompositeEntitySink<'e, 'new_s, A, E, M, ES, NewSS>
where
NewSS: SnapshotSink<A> + 'new_s,
{
CompositeEntitySink {
event_sink: self.event_sink,
snapshot_sink,
_phantom: PhantomData,
}
}
}
impl<'e, 's, A, E, M, ES, SS> EventSink<A, E, M> for CompositeEntitySink<'e, 's, A, E, M, ES, SS>
where
A: Aggregate,
E: AggregateEvent<A>,
ES: EventSink<A, E, M> + 'e,
SS: SnapshotSink<A> + 's,
{
type Error = ES::Error;
fn append_events<I>(
&self,
id: &I,
events: &[E],
precondition: Option<Precondition>,
metadata: M,
) -> Result<EventNumber, Self::Error>
where
I: AggregateId<A>,
{
self.event_sink
.append_events(id, events, precondition, metadata)
}
}
impl<'e, 's, A, E, M, ES, SS> SnapshotSink<A> for CompositeEntitySink<'e, 's, A, E, M, ES, SS>
where
A: Aggregate,
E: AggregateEvent<A>,
ES: EventSink<A, E, M> + 'e,
SS: SnapshotSink<A> + 's,
{
type Error = SS::Error;
fn persist_snapshot<I>(
&self,
id: &I,
aggregate: &A,
version: Version,
last_snapshot_version: Option<Version>,
) -> Result<Version, Self::Error>
where
I: AggregateId<A>,
{
self.snapshot_sink
.persist_snapshot(id, aggregate, version, last_snapshot_version)
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct CompositeEntityStore<A, E, M, ES, SS>
where
A: Aggregate,
E: AggregateEvent<A>,
ES: EntitySource<A, E>,
SS: EntitySink<A, E, M>,
{
entity_source: ES,
entity_sink: SS,
_phantom: PhantomData<*const (A, E, M)>,
}
impl<A, E, M> Default
for CompositeEntityStore<
A,
E,
M,
CompositeEntitySource<'static, 'static, A, E, NullEventStore<A, E>, NullSnapshotStore<A>>,
CompositeEntitySink<'static, 'static, A, E, M, NullEventStore<A, E>, NullSnapshotStore<A>>,
>
where
A: Aggregate,
E: AggregateEvent<A>,
{
fn default() -> Self {
CompositeEntityStore {
entity_source: CompositeEntitySource::default(),
entity_sink: CompositeEntitySink::default(),
_phantom: PhantomData,
}
}
}
impl<A, E, M, ES, SS> CompositeEntityStore<A, E, M, ES, SS>
where
A: Aggregate,
E: AggregateEvent<A>,
ES: EntitySource<A, E>,
SS: EntitySink<A, E, M>,
{
pub fn with_entity_source<NewES>(
self,
entity_source: NewES,
) -> CompositeEntityStore<A, E, M, NewES, SS>
where
NewES: EntitySource<A, E>,
{
CompositeEntityStore {
entity_source,
entity_sink: self.entity_sink,
_phantom: PhantomData,
}
}
pub fn with_entity_sink<NewSS>(
self,
entity_sink: NewSS,
) -> CompositeEntityStore<A, E, M, ES, NewSS>
where
NewSS: EntitySink<A, E, M>,
{
CompositeEntityStore {
entity_source: self.entity_source,
entity_sink,
_phantom: PhantomData,
}
}
}
impl<A, E, M, ES, SS> EventSource<A, E> for CompositeEntityStore<A, E, M, ES, SS>
where
A: Aggregate,
E: AggregateEvent<A>,
ES: EntitySource<A, E>,
SS: EntitySink<A, E, M>,
{
type Error = <ES as EventSource<A, E>>::Error;
type Events = <ES as EventSource<A, E>>::Events;
fn read_events<I>(
&self,
id: &I,
since: Since,
max_count: Option<u64>,
) -> Result<Option<Self::Events>, Self::Error>
where
I: AggregateId<A>,
{
self.entity_source.read_events(id, since, max_count)
}
}
impl<A, E, M, ES, SS> SnapshotSource<A> for CompositeEntityStore<A, E, M, ES, SS>
where
A: Aggregate,
E: AggregateEvent<A>,
ES: EntitySource<A, E>,
SS: EntitySink<A, E, M>,
{
type Error = <ES as SnapshotSource<A>>::Error;
fn get_snapshot<I>(
&self,
id: &I,
) -> Result<Option<VersionedAggregate<A>>, <Self as SnapshotSource<A>>::Error>
where
I: AggregateId<A>,
{
self.entity_source.get_snapshot(id)
}
}
impl<A, E, M, ES, SS> EventSink<A, E, M> for CompositeEntityStore<A, E, M, ES, SS>
where
A: Aggregate,
E: AggregateEvent<A>,
ES: EntitySource<A, E>,
SS: EntitySink<A, E, M>,
{
type Error = <SS as EventSink<A, E, M>>::Error;
fn append_events<I>(
&self,
id: &I,
events: &[E],
precondition: Option<Precondition>,
metadata: M,
) -> Result<EventNumber, Self::Error>
where
I: AggregateId<A>,
{
self.entity_sink
.append_events(id, events, precondition, metadata)
}
}
impl<A, E, M, ES, SS> SnapshotSink<A> for CompositeEntityStore<A, E, M, ES, SS>
where
A: Aggregate,
E: AggregateEvent<A>,
ES: EntitySource<A, E>,
SS: EntitySink<A, E, M>,
{
type Error = <SS as SnapshotSink<A>>::Error;
fn persist_snapshot<I>(
&self,
id: &I,
aggregate: &A,
version: Version,
last_snapshot_version: Option<Version>,
) -> Result<Version, Self::Error>
where
I: AggregateId<A>,
{
self.entity_sink
.persist_snapshot(id, aggregate, version, last_snapshot_version)
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum EntityLoadError<EErr, SErr>
where
EErr: CqrsError,
SErr: CqrsError,
{
EventSource(EErr),
SnapshotSource(SErr),
}
impl<EErr, SErr> fmt::Display for EntityLoadError<EErr, SErr>
where
EErr: CqrsError,
SErr: CqrsError,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
EntityLoadError::EventSource(e) => {
write!(f, "entity load error, problem loading events: {}", e)
}
EntityLoadError::SnapshotSource(e) => {
write!(f, "entity load error, problem loading snapshot: {}", e)
}
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum EntityPersistError<EErr, SErr>
where
EErr: CqrsError,
SErr: CqrsError,
{
EventSink(EErr),
SnapshotSink(SErr),
}
impl<EErr, SErr> fmt::Display for EntityPersistError<EErr, SErr>
where
EErr: CqrsError,
SErr: CqrsError,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
EntityPersistError::EventSink(e) => {
write!(f, "entity persist error, problem persisting events: {}", e)
}
EntityPersistError::SnapshotSink(e) => write!(
f,
"entity persist error, problem persisting snapshot (events successfully \
persisted): {}",
e
),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum EntityExecAndPersistError<A, C, PEErr, PSErr>
where
A: Aggregate,
C: AggregateCommand<A>,
PEErr: CqrsError,
PSErr: CqrsError,
{
PreconditionFailed(Precondition),
Exec(HydratedAggregate<A>, C::Error),
Persist(EntityPersistError<PEErr, PSErr>),
}
impl<A, C, PEErr, PSErr> fmt::Display for EntityExecAndPersistError<A, C, PEErr, PSErr>
where
A: Aggregate,
C: AggregateCommand<A>,
PEErr: CqrsError,
PSErr: CqrsError,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
EntityExecAndPersistError::PreconditionFailed(p) => {
write!(f, "entity exec error, precondition failed: {}", p)
}
EntityExecAndPersistError::Exec(_, e) => {
write!(f, "entity exec error, command was rejected: {}", e)
}
EntityExecAndPersistError::Persist(e) => fmt::Display::fmt(&e, f),
}
}
}
impl<A, C, PEErr, PSErr> From<Precondition> for EntityExecAndPersistError<A, C, PEErr, PSErr>
where
A: Aggregate,
C: AggregateCommand<A>,
PEErr: CqrsError,
PSErr: CqrsError,
{
fn from(p: Precondition) -> Self {
EntityExecAndPersistError::PreconditionFailed(p)
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum EntityError<LEErr, LSErr, A, C, PEErr, PSErr>
where
A: Aggregate,
C: AggregateCommand<A>,
LEErr: CqrsError,
LSErr: CqrsError,
PEErr: CqrsError,
PSErr: CqrsError,
{
Load(EntityLoadError<LEErr, LSErr>),
PreconditionFailed(Precondition),
Exec(HydratedAggregate<A>, C::Error),
Persist(EntityPersistError<PEErr, PSErr>),
}
impl<LEErr, LSErr, A, C, PEErr, PSErr> fmt::Display
for EntityError<LEErr, LSErr, A, C, PEErr, PSErr>
where
A: Aggregate,
C: AggregateCommand<A>,
LEErr: CqrsError,
LSErr: CqrsError,
PEErr: CqrsError,
PSErr: CqrsError,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
EntityError::Load(e) => fmt::Display::fmt(&e, f),
EntityError::PreconditionFailed(p) => {
write!(f, "entity error, precondition failed: {}", p)
}
EntityError::Exec(_, e) => write!(f, "entity error, command was rejected: {}", e),
EntityError::Persist(e) => fmt::Display::fmt(&e, f),
}
}
}
impl<LEErr, LSErr, A, C, PEErr, PSErr> From<Precondition>
for EntityError<LEErr, LSErr, A, C, PEErr, PSErr>
where
A: Aggregate,
C: AggregateCommand<A>,
LEErr: CqrsError,
LSErr: CqrsError,
PEErr: CqrsError,
PSErr: CqrsError,
{
fn from(p: Precondition) -> Self {
EntityError::PreconditionFailed(p)
}
}
impl<LEErr, LSErr, A, C, PEErr, PSErr> From<EntityExecAndPersistError<A, C, PEErr, PSErr>>
for EntityError<LEErr, LSErr, A, C, PEErr, PSErr>
where
A: Aggregate,
C: AggregateCommand<A>,
LEErr: CqrsError,
LSErr: CqrsError,
PEErr: CqrsError,
PSErr: CqrsError,
{
fn from(p: EntityExecAndPersistError<A, C, PEErr, PSErr>) -> Self {
match p {
EntityExecAndPersistError::PreconditionFailed(p) => EntityError::PreconditionFailed(p),
EntityExecAndPersistError::Exec(agg, err) => EntityError::Exec(agg, err),
EntityExecAndPersistError::Persist(e) => EntityError::Persist(e),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{memory::StateStore, testing::*};
#[test]
fn can_construct_composite_entity_source() {
let null = NullEventStore::<TestAggregate, TestEvent>::default();
let memory = StateStore::<TestAggregate>::default();
let _source = CompositeEntitySource::default()
.with_event_source(&null)
.with_snapshot_source(&memory);
}
#[test]
fn can_construct_composite_entity_sink() {
let null = NullEventStore::<TestAggregate, TestEvent>::default();
let memory = StateStore::<TestAggregate>::default();
let _sink: CompositeEntitySink<
TestAggregate,
TestEvent,
TestMetadata,
NullEventStore<TestAggregate, TestEvent>,
StateStore<TestAggregate>,
> = CompositeEntitySink::default()
.with_event_sink(&null)
.with_snapshot_sink(&memory);
}
#[test]
fn can_construct_composite_entity_store() {
let null = NullEventStore::<TestAggregate, TestEvent>::default();
let memory = StateStore::<TestAggregate>::default();
let source = CompositeEntitySource::default()
.with_event_source(&null)
.with_snapshot_source(&memory);
let sink: CompositeEntitySink<
TestAggregate,
TestEvent,
TestMetadata,
NullEventStore<TestAggregate, TestEvent>,
StateStore<TestAggregate>,
> = CompositeEntitySink::default()
.with_event_sink(&null)
.with_snapshot_sink(&memory);
let _store = CompositeEntityStore::default()
.with_entity_source(source)
.with_entity_sink(sink);
}
}