use mire::{Aggregate, EventData, EventStore};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize, EventData)]
#[serde(tag = "type")]
#[mire(entity = "account")]
enum AccountEvent {
Opened { owner: String },
Deposited { amount: i64 },
Withdrawn { amount: i64 },
Closed,
}
#[derive(Debug, Default)]
struct Account {
open: bool,
owner: String,
balance: i64,
}
impl Aggregate for Account {
type Event = AccountEvent;
fn stream_category() -> &'static str {
"account"
}
fn apply(&mut self, event: &AccountEvent) {
match event {
AccountEvent::Opened { owner } => {
self.open = true;
self.owner = owner.clone();
}
AccountEvent::Deposited { amount } => self.balance += amount,
AccountEvent::Withdrawn { amount } => self.balance -= amount,
AccountEvent::Closed => self.open = false,
}
}
}
fn open(account: &mut mire::AggregateRoot<Account>, owner: &str) -> anyhow::Result<()> {
if account.state.open {
anyhow::bail!("account is already open");
}
if owner.trim().is_empty() {
anyhow::bail!("owner cannot be empty");
}
account.record(AccountEvent::Opened {
owner: owner.to_string(),
});
Ok(())
}
fn deposit(account: &mut mire::AggregateRoot<Account>, amount: i64) -> anyhow::Result<()> {
if !account.state.open {
anyhow::bail!("cannot deposit into a closed account");
}
if amount <= 0 {
anyhow::bail!("deposit amount must be positive, got {amount}");
}
account.record(AccountEvent::Deposited { amount });
Ok(())
}
fn withdraw(account: &mut mire::AggregateRoot<Account>, amount: i64) -> anyhow::Result<()> {
if !account.state.open {
anyhow::bail!("cannot withdraw from a closed account");
}
if amount <= 0 {
anyhow::bail!("withdrawal amount must be positive, got {amount}");
}
if account.state.balance < amount {
anyhow::bail!(
"insufficient funds: balance {}, tried to withdraw {amount}",
account.state.balance
);
}
account.record(AccountEvent::Withdrawn { amount });
Ok(())
}
fn close(account: &mut mire::AggregateRoot<Account>) -> anyhow::Result<()> {
if !account.state.open {
anyhow::bail!("account is already closed");
}
if account.state.balance != 0 {
anyhow::bail!(
"cannot close account with non-zero balance ({})",
account.state.balance
);
}
account.record(AccountEvent::Closed);
Ok(())
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let url = std::env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgres://mire:mire@localhost:5434/mire".to_string());
let pool = PgPool::connect(&url).await?;
let store = EventStore::new(pool);
store.migrate().await?;
let id = Uuid::new_v4().to_string();
let mut account = store.load_or_default::<Account>(&id).await?;
open(&mut account, "Ada Lovelace")?;
deposit(&mut account, 100)?;
deposit(&mut account, 50)?;
withdraw(&mut account, 30)?;
store.save(&mut account).await?;
println!(
"balance after deposits/withdrawal: {}",
account.state.balance
);
let reloaded = store.load::<Account>(&id).await?.unwrap();
let mut account = reloaded;
if let Err(e) = withdraw(&mut account, 1_000_000) {
println!("withdraw 1_000_000 rejected: {e}");
}
if let Err(e) = deposit(&mut account, -5) {
println!("deposit -5 rejected: {e}");
}
if let Err(e) = close(&mut account) {
println!("close with non-zero balance rejected: {e}");
}
assert!(
!account.has_pending(),
"no events should have been recorded for rejected commands"
);
withdraw(&mut account, 120)?;
close(&mut account)?;
store.save(&mut account).await?;
println!("account closed; final balance: {}", account.state.balance);
let final_view = store.load::<Account>(&id).await?.unwrap();
assert_eq!(final_view.state.balance, 0);
assert!(!final_view.state.open);
assert_eq!(final_view.version, 6); println!("replay confirms: 6 events committed, balance 0, account closed");
Ok(())
}