Struct cqrs_es::CqrsFramework
source · pub struct CqrsFramework<A, ES>where
A: Aggregate,
ES: EventStore<A>,{ /* private fields */ }
Expand description
This is the base framework for applying commands to produce events.
In Domain Driven Design we require that
changes are made only after loading the entire Aggregate
in order to ensure that the full
context is understood.
With event-sourcing this means:
- Loading all previous events for the aggregate instance.
- Applying these events, in order, to a new
Aggregate
in order to reach the correct state. - Using the recreated
Aggregate
to handle an inboundCommand
producing events or an error (seehandle
method in this trait). - Persisting any generated events or roll back in the event of an error.
To manage these tasks we use a CqrsFramework
.
Implementations§
source§impl<A, ES> CqrsFramework<A, ES>where
A: Aggregate,
ES: EventStore<A>,
impl<A, ES> CqrsFramework<A, ES>where
A: Aggregate,
ES: EventStore<A>,
sourcepub fn new(
store: ES,
queries: Vec<Box<dyn Query<A>>>,
service: A::Services
) -> Selfwhere
A: Aggregate,
ES: EventStore<A>,
pub fn new(
store: ES,
queries: Vec<Box<dyn Query<A>>>,
service: A::Services
) -> Selfwhere
A: Aggregate,
ES: EventStore<A>,
Creates new framework for dispatching commands using the provided elements.
Takes an implementation of an EventStore
, a vector of queries and a set of services
to be used within the command handler.
For a simple in-memory EventStore
suitable for experimentation or testing see
MemStore.
use cqrs_es::CqrsFramework;
use cqrs_es::mem_store::MemStore;
let store = MemStore::<MyAggregate>::default();
let queries = vec![];
let service = MyService::default();
let cqrs = CqrsFramework::new(store, queries, service);
For production uses a persistent event store using a backing database is needed, such as in the available persistence crates:
sourcepub fn append_query(self, query: Box<dyn Query<A>>) -> Selfwhere
A: Aggregate,
ES: EventStore<A>,
pub fn append_query(self, query: Box<dyn Query<A>>) -> Selfwhere
A: Aggregate,
ES: EventStore<A>,
Appends an additional query to the framework.
use cqrs_es::CqrsFramework;
use cqrs_es::mem_store::MemStore;
let store = MemStore::<MyAggregate>::default();
let queries = vec![];
let service = MyService::default();
let cqrs = CqrsFramework::new(store, queries, service)
.append_query(Box::new(MyQuery::default()));
sourcepub async fn execute(
&self,
aggregate_id: &str,
command: A::Command
) -> Result<(), AggregateError<A::Error>>
pub async fn execute( &self, aggregate_id: &str, command: A::Command ) -> Result<(), AggregateError<A::Error>>
This applies a command to an aggregate. Executing a command in this way is the only way to make changes to the state of an aggregate in CQRS.
An error while processing will result in no events committed and
an AggregateError
being returned.
If successful the events produced will be persisted in the backing EventStore
before being applied to any configured QueryProcessor
s.
type MyFramework = CqrsFramework<MyAggregate,MemStore<MyAggregate>>;
async fn do_something(cqrs: MyFramework) -> Result<(),AggregateError<MyUserError>> {
let command = MyCommands::DoSomething;
cqrs.execute("agg-id-F39A0C", command).await
}
sourcepub async fn execute_with_metadata(
&self,
aggregate_id: &str,
command: A::Command,
metadata: HashMap<String, String>
) -> Result<(), AggregateError<A::Error>>
pub async fn execute_with_metadata( &self, aggregate_id: &str, command: A::Command, metadata: HashMap<String, String> ) -> Result<(), AggregateError<A::Error>>
This applies a command to an aggregate. Executing a command in this way is the only way to make changes to the state of an aggregate in CQRS.
A Hashmap<String,String>
is supplied with any contextual information that should be
associated with this change. This metadata will be attached to any produced events and is
meant to assist in debugging and auditing. Common information might include:
- time of commit
- user making the change
- application version
An error while processing will result in no events committed and
an AggregateError
being returned.
If successful the events produced will be persisted in the backing EventStore
before being applied to any configured QueryProcessor
s.
type MyFramework = CqrsFramework<MyAggregate,MemStore<MyAggregate>>;
async fn do_something(cqrs: MyFramework) -> Result<(),AggregateError<MyUserError>> {
let command = MyCommands::DoSomething;
let metadata = HashMap::from([("time".to_string(), chrono::Utc::now().to_rfc3339())]);
cqrs.execute_with_metadata("agg-id-F39A0C", command, metadata).await
}