Struct cqrs_es::CqrsFramework

source ·
pub struct CqrsFramework<A, ES>
where A: Aggregate, ES: EventStore<A>,
{ /* private fields */ }
Expand description

This is the base framework for applying commands to produce events.

In Domain Driven Design we require that changes are made only after loading the entire Aggregate in order to ensure that the full context is understood. With event-sourcing this means:

  1. Loading all previous events for the aggregate instance.
  2. Applying these events, in order, to a new Aggregate in order to reach the correct state.
  3. Using the recreated Aggregate to handle an inbound Command producing events or an error (see handle method in this trait).
  4. Persisting any generated events or roll back in the event of an error.

To manage these tasks we use a CqrsFramework.

Implementations§

source§

impl<A, ES> CqrsFramework<A, ES>
where A: Aggregate, ES: EventStore<A>,

source

pub fn new( store: ES, queries: Vec<Box<dyn Query<A>>>, service: A::Services ) -> Self
where A: Aggregate, ES: EventStore<A>,

Creates new framework for dispatching commands using the provided elements. Takes an implementation of an EventStore, a vector of queries and a set of services to be used within the command handler.

For a simple in-memory EventStore suitable for experimentation or testing see MemStore.

use cqrs_es::CqrsFramework;
use cqrs_es::mem_store::MemStore;

let store = MemStore::<MyAggregate>::default();
let queries = vec![];
let service = MyService::default();

let cqrs = CqrsFramework::new(store, queries, service);

For production uses a persistent event store using a backing database is needed, such as in the available persistence crates:

source

pub fn append_query(self, query: Box<dyn Query<A>>) -> Self
where A: Aggregate, ES: EventStore<A>,

Appends an additional query to the framework.

use cqrs_es::CqrsFramework;
use cqrs_es::mem_store::MemStore;

let store = MemStore::<MyAggregate>::default();
let queries = vec![];
let service = MyService::default();

let cqrs = CqrsFramework::new(store, queries, service)
    .append_query(Box::new(MyQuery::default()));
source

pub async fn execute( &self, aggregate_id: &str, command: A::Command ) -> Result<(), AggregateError<A::Error>>

This applies a command to an aggregate. Executing a command in this way is the only way to make changes to the state of an aggregate in CQRS.

An error while processing will result in no events committed and an AggregateError being returned.

If successful the events produced will be persisted in the backing EventStore before being applied to any configured QueryProcessors.

type MyFramework = CqrsFramework<MyAggregate,MemStore<MyAggregate>>;

async fn do_something(cqrs: MyFramework) -> Result<(),AggregateError<MyUserError>> {
    let command = MyCommands::DoSomething;

    cqrs.execute("agg-id-F39A0C", command).await
}
source

pub async fn execute_with_metadata( &self, aggregate_id: &str, command: A::Command, metadata: HashMap<String, String> ) -> Result<(), AggregateError<A::Error>>

This applies a command to an aggregate. Executing a command in this way is the only way to make changes to the state of an aggregate in CQRS.

A Hashmap<String,String> is supplied with any contextual information that should be associated with this change. This metadata will be attached to any produced events and is meant to assist in debugging and auditing. Common information might include:

  • time of commit
  • user making the change
  • application version

An error while processing will result in no events committed and an AggregateError being returned.

If successful the events produced will be persisted in the backing EventStore before being applied to any configured QueryProcessors.

type MyFramework = CqrsFramework<MyAggregate,MemStore<MyAggregate>>;

async fn do_something(cqrs: MyFramework) -> Result<(),AggregateError<MyUserError>>  {
    let command = MyCommands::DoSomething;
    let metadata = HashMap::from([("time".to_string(), chrono::Utc::now().to_rfc3339())]);

    cqrs.execute_with_metadata("agg-id-F39A0C", command, metadata).await
}

Auto Trait Implementations§

§

impl<A, ES> !RefUnwindSafe for CqrsFramework<A, ES>

§

impl<A, ES> Send for CqrsFramework<A, ES>

§

impl<A, ES> Sync for CqrsFramework<A, ES>

§

impl<A, ES> Unpin for CqrsFramework<A, ES>
where ES: Unpin, <A as Aggregate>::Services: Unpin,

§

impl<A, ES> !UnwindSafe for CqrsFramework<A, ES>

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.