use eventcore::{
Command, CommandError, CommandLogic, Event, NewEvents, ProjectionConfig, RetryPolicy, execute,
run_projection,
};
use eventcore_memory::InMemoryEventStore;
use eventcore_testing::EventCollector;
use nutype::nutype;
use serde::{Deserialize, Serialize};
use std::sync::{Arc, Mutex};
use uuid::Uuid;
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
enum WithdrawError {
#[error(
"insufficient funds for account {account_id}: balance={balance}, attempted_withdrawal={attempted}"
)]
InsufficientFunds {
account_id: String,
balance: u16,
attempted: u16,
},
}
impl From<WithdrawError> for CommandError {
fn from(e: WithdrawError) -> Self {
CommandError::BusinessRuleViolation(Box::new(e))
}
}
#[nutype(
validate(greater = 0),
derive(Debug, Clone, Copy, PartialEq, Eq, Into, Serialize, Deserialize)
)]
struct MoneyAmount(u16);
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
enum BankAccountEvent {
MoneyDeposited {
account_id: eventcore::StreamId,
amount: MoneyAmount,
},
MoneyWithdrawn {
account_id: eventcore::StreamId,
amount: MoneyAmount,
},
}
impl Event for BankAccountEvent {
fn stream_id(&self) -> &eventcore::StreamId {
match self {
BankAccountEvent::MoneyDeposited { account_id, .. }
| BankAccountEvent::MoneyWithdrawn { account_id, .. } => account_id,
}
}
fn event_type_name() -> &'static str {
"BankAccountEvent"
}
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
struct AccountBalance {
cents: u16,
}
impl AccountBalance {
fn deposit(mut self, amount: MoneyAmount) -> Self {
self.cents = self.cents.saturating_add(amount.into());
self
}
fn withdraw(mut self, amount: MoneyAmount) -> Self {
self.cents = self.cents.saturating_sub(amount.into());
self
}
fn has_sufficient_funds(&self, amount: MoneyAmount) -> bool {
self.cents >= amount.into()
}
fn balance_cents(&self) -> u16 {
self.cents
}
fn apply(self, event: &BankAccountEvent) -> Self {
match event {
BankAccountEvent::MoneyDeposited { amount, .. } => self.deposit(*amount),
BankAccountEvent::MoneyWithdrawn { amount, .. } => self.withdraw(*amount),
}
}
}
#[derive(Command)]
struct Deposit {
#[stream]
account_id: eventcore::StreamId,
amount: MoneyAmount,
}
impl CommandLogic for Deposit {
type Event = BankAccountEvent;
type State = ();
fn apply(&self, state: Self::State, _event: &Self::Event) -> Self::State {
state
}
fn handle(&self, _state: Self::State) -> Result<NewEvents<Self::Event>, CommandError> {
Ok(vec![BankAccountEvent::MoneyDeposited {
account_id: self.account_id.clone(),
amount: self.amount,
}]
.into())
}
}
#[derive(Command)]
struct Withdraw {
#[stream]
account_id: eventcore::StreamId,
amount: MoneyAmount,
}
impl CommandLogic for Withdraw {
type Event = BankAccountEvent;
type State = AccountBalance;
fn apply(&self, state: Self::State, event: &Self::Event) -> Self::State {
state.apply(event)
}
fn handle(&self, state: Self::State) -> Result<NewEvents<Self::Event>, CommandError> {
if !state.has_sufficient_funds(self.amount) {
return Err(WithdrawError::InsufficientFunds {
account_id: self.account_id.as_ref().to_string(),
balance: state.balance_cents(),
attempted: self.amount.into(),
}
.into());
}
Ok(vec![BankAccountEvent::MoneyWithdrawn {
account_id: self.account_id.clone(),
amount: self.amount,
}]
.into())
}
}
fn test_account_id() -> eventcore::StreamId {
eventcore::StreamId::try_new(Uuid::now_v7().to_string()).expect("valid stream id")
}
fn test_amount(cents: u16) -> MoneyAmount {
MoneyAmount::try_new(cents).expect("valid amount")
}
#[tokio::test]
async fn deposit_command_emits_money_deposited_event() {
let store = InMemoryEventStore::new();
let account_id = test_account_id();
let amount = test_amount(100);
let command = Deposit {
account_id: account_id.clone(),
amount,
};
let _ = execute(&store, command, RetryPolicy::new())
.await
.expect("command execution to succeed");
let storage: Arc<Mutex<Vec<BankAccountEvent>>> = Arc::new(Mutex::new(Vec::new()));
let collector = EventCollector::new(storage.clone());
run_projection(collector, &store, ProjectionConfig::default())
.await
.expect("projection to complete");
let events = storage.lock().unwrap();
assert_eq!(events.len(), 1, "expected exactly one event");
match &events[0] {
BankAccountEvent::MoneyDeposited {
account_id: event_account_id,
amount: event_amount,
} => {
assert_eq!(
event_account_id, &account_id,
"event should reference correct account"
);
assert_eq!(event_amount, &amount, "event should have correct amount");
}
_ => panic!("expected MoneyDeposited event"),
}
}
#[tokio::test]
async fn insufficient_funds_returns_business_rule_violation() {
let store = InMemoryEventStore::new();
let account_id = test_account_id();
let initial_amount = test_amount(50);
let seed_deposit = Deposit {
account_id: account_id.clone(),
amount: initial_amount,
};
let _ = execute(&store, seed_deposit, RetryPolicy::new())
.await
.expect("initial deposit to succeed");
let withdrawal_amount = test_amount(100);
let withdraw = Withdraw {
account_id: account_id.clone(),
amount: withdrawal_amount,
};
let error = match execute(&store, withdraw, RetryPolicy::new()).await {
Ok(_) => panic!("expected business rule violation but command succeeded"),
Err(error) => error,
};
let message = match &error {
CommandError::BusinessRuleViolation(err) => err.to_string(),
_ => panic!("expected BusinessRuleViolation error, got: {:?}", error),
};
assert!(
message.contains(account_id.as_ref()),
"error should include account id"
);
assert!(
message.contains("balance=50"),
"error should include current balance"
);
assert!(
message.contains("attempted_withdrawal=100"),
"error should include attempted withdrawal amount"
);
let storage: Arc<Mutex<Vec<BankAccountEvent>>> = Arc::new(Mutex::new(Vec::new()));
let collector = EventCollector::new(storage.clone());
run_projection(collector, &store, ProjectionConfig::default())
.await
.expect("projection to complete");
let events = storage.lock().unwrap();
assert_eq!(events.len(), 1, "failure should not append new events");
}