use mire::{Aggregate, EventData, EventStore};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize, EventData)]
#[serde(tag = "type")]
#[mire(entity = "account")]
enum AccountEvent {
Opened { initial: i64 },
Debited { amount: i64 },
Credited { amount: i64 },
}
#[derive(Debug, Default)]
struct Account {
balance: i64,
}
impl Aggregate for Account {
type Event = AccountEvent;
fn stream_category() -> &'static str {
"account"
}
fn apply(&mut self, event: &AccountEvent) {
match event {
AccountEvent::Opened { initial } => self.balance = *initial,
AccountEvent::Debited { amount } => self.balance -= amount,
AccountEvent::Credited { amount } => self.balance += amount,
}
}
}
async fn open_account(store: &EventStore, id: &str, initial: i64) -> anyhow::Result<()> {
let mut account = store.load_or_default::<Account>(id).await?;
account.record(AccountEvent::Opened { initial });
store.save(&mut account).await?;
Ok(())
}
async fn transfer(
store: &EventStore,
from_id: &str,
to_id: &str,
amount: i64,
) -> anyhow::Result<()> {
if amount <= 0 {
anyhow::bail!("transfer amount must be positive");
}
let mut scope = store.begin_transaction().await?;
let (first, second) = if from_id <= to_id {
(from_id, to_id)
} else {
(to_id, from_id)
};
let _ = scope.load_for_update::<Account>(first).await?;
let _ = scope.load_for_update::<Account>(second).await?;
let mut from = scope
.load::<Account>(from_id)
.await?
.ok_or_else(|| anyhow::anyhow!("source account {from_id} does not exist"))?;
let mut to = scope
.load::<Account>(to_id)
.await?
.ok_or_else(|| anyhow::anyhow!("destination account {to_id} does not exist"))?;
if from.state.balance < amount {
scope.rollback().await?;
anyhow::bail!(
"insufficient funds in {from_id}: balance {}, transfer {amount}",
from.state.balance
);
}
from.record(AccountEvent::Debited { amount });
to.record(AccountEvent::Credited { amount });
scope.save(&mut from).await?;
scope.save(&mut to).await?;
scope.commit().await?;
Ok(())
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let url = std::env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgres://mire:mire@localhost:5434/mire".to_string());
let pool = PgPool::connect(&url).await?;
let store = EventStore::new(pool);
store.migrate().await?;
let alice = Uuid::new_v4().to_string();
let bob = Uuid::new_v4().to_string();
open_account(&store, &alice, 100).await?;
open_account(&store, &bob, 0).await?;
transfer(&store, &alice, &bob, 30).await?;
let alice_after = store.load::<Account>(&alice).await?.unwrap();
let bob_after = store.load::<Account>(&bob).await?.unwrap();
println!(
"after $30 transfer: Alice={}, Bob={} (sum = {} ✓)",
alice_after.state.balance,
bob_after.state.balance,
alice_after.state.balance + bob_after.state.balance
);
match transfer(&store, &alice, &bob, 1_000_000).await {
Err(e) => println!("rejected: {e}"),
Ok(()) => unreachable!(),
}
let alice_after = store.load::<Account>(&alice).await?.unwrap();
let bob_after = store.load::<Account>(&bob).await?.unwrap();
println!(
"after rejected transfer: Alice={}, Bob={} (unchanged ✓)",
alice_after.state.balance, bob_after.state.balance
);
println!(
"\nThe two aggregates moved together (commit) or not at all \
(rollback). The lock prevented another concurrent transfer \
from racing the read-decide-write."
);
Ok(())
}