use std::time::Instant;
use mire::{Aggregate, Event, EventData, EventStore, ExpectedVersion};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize, EventData)]
#[serde(tag = "type")]
#[mire(entity = "ledger")]
enum LedgerEvent {
Posted { amount: i64 },
}
#[derive(Debug, Default)]
struct Ledger {
balance: i64,
}
impl Aggregate for Ledger {
type Event = LedgerEvent;
fn stream_category() -> &'static str {
"ledger"
}
fn apply(&mut self, event: &LedgerEvent) {
match event {
LedgerEvent::Posted { amount } => self.balance += amount,
}
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let url = std::env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgres://mire:mire@localhost:5434/mire".to_string());
let pool = PgPool::connect(&url).await?;
let store = EventStore::new(pool);
store.migrate().await?;
let events_per_recipe = 200;
println!("ingesting {events_per_recipe} events per recipe\n");
let id_baseline = Uuid::new_v4().to_string();
let start = Instant::now();
for _ in 0..events_per_recipe {
let mut ledger = store.load_or_default::<Ledger>(&id_baseline).await?;
ledger.record(LedgerEvent::Posted { amount: 1 });
store.save(&mut ledger).await?;
}
let baseline = start.elapsed();
println!(" baseline (one save per event): {baseline:.2?}");
let id_a = Uuid::new_v4().to_string();
let start = Instant::now();
let mut ledger = store.load_or_default::<Ledger>(&id_a).await?;
ledger.record_many((0..events_per_recipe).map(|_| LedgerEvent::Posted { amount: 1 }));
store.save(&mut ledger).await?;
let recipe_a = start.elapsed();
println!(
" Recipe A (record_many + one save): {recipe_a:.2?} ({:.1}× faster)",
baseline.as_secs_f64() / recipe_a.as_secs_f64()
);
let id_b = Uuid::new_v4().to_string();
let stream_id = format!("ledger-{id_b}");
let events: Vec<Event<LedgerEvent>> = (0..events_per_recipe)
.map(|_| Event::new(LedgerEvent::Posted { amount: 1 }))
.collect();
let start = Instant::now();
store
.append::<LedgerEvent>(
&stream_id,
Ledger::stream_category(),
ExpectedVersion::NoStream,
&events,
)
.await?;
let recipe_b = start.elapsed();
println!(
" Recipe B (append directly, no agg): {recipe_b:.2?} ({:.1}× faster)",
baseline.as_secs_f64() / recipe_b.as_secs_f64()
);
let baseline_state = store.load::<Ledger>(&id_baseline).await?.unwrap();
let a_state = store.load::<Ledger>(&id_a).await?.unwrap();
let b_state = store.load::<Ledger>(&id_b).await?.unwrap();
assert_eq!(baseline_state.state.balance, events_per_recipe);
assert_eq!(a_state.state.balance, events_per_recipe);
assert_eq!(b_state.state.balance, events_per_recipe);
assert_eq!(baseline_state.version, events_per_recipe);
assert_eq!(a_state.version, events_per_recipe);
assert_eq!(b_state.version, events_per_recipe);
println!(
"\nall three streams ended at balance={events_per_recipe}, version={events_per_recipe} ✓"
);
println!(
"\nUse Recipe A when each event passes through a command (you \
want the aggregate's apply() to validate as you go). Use \
Recipe B when you have a stream of pre-validated events and \
only need durability."
);
Ok(())
}