use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use mire::{Aggregate, EventData, EventStore, EventStoreError};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize, EventData)]
#[serde(tag = "type")]
#[mire(entity = "counter")]
enum CounterEvent {
Deposited { by: i64 },
}
#[derive(Debug, Default)]
struct Counter {
total: i64,
}
impl Aggregate for Counter {
type Event = CounterEvent;
fn stream_category() -> &'static str {
"counter"
}
fn apply(&mut self, event: &CounterEvent) {
match event {
CounterEvent::Deposited { by } => self.total += by,
}
}
}
async fn with_retry<F>(
store: &EventStore,
id: &str,
conflicts: &Arc<AtomicU64>,
mut command: F,
) -> anyhow::Result<()>
where
F: FnMut(&mut mire::AggregateRoot<Counter>) -> anyhow::Result<()>,
{
loop {
let mut agg = store.load_or_default::<Counter>(id).await?;
command(&mut agg)?;
match store.save(&mut agg).await {
Ok(()) => return Ok(()),
Err(EventStoreError::ConcurrencyConflict { .. }) => {
conflicts.fetch_add(1, Ordering::Relaxed);
}
Err(e) => return Err(e.into()),
}
}
}
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
async fn main() -> anyhow::Result<()> {
let url = std::env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgres://mire:mire@localhost:5434/mire".to_string());
let pool = PgPool::connect(&url).await?;
let store = EventStore::new(pool);
store.migrate().await?;
let id = Uuid::new_v4().to_string();
let conflicts = Arc::new(AtomicU64::new(0));
let writers_per_task = 50;
let mut handles = Vec::new();
for task_label in ["A", "B"] {
let store = store.clone();
let id = id.clone();
let conflicts = conflicts.clone();
handles.push(tokio::spawn(async move {
for i in 0..writers_per_task {
with_retry(&store, &id, &conflicts, |agg| {
agg.record(CounterEvent::Deposited { by: 1 });
Ok(())
})
.await?;
if i % 10 == 9 {
println!(" task {task_label} reached deposit #{}", i + 1);
}
}
anyhow::Ok(())
}));
}
for h in handles {
h.await??;
}
let final_view = store.load::<Counter>(&id).await?.unwrap();
let total_conflicts = conflicts.load(Ordering::Relaxed);
println!("\nfinal balance: {}", final_view.state.total);
println!("stream version (events committed): {}", final_view.version);
println!("conflict retries observed: {total_conflicts}");
assert_eq!(
final_view.state.total,
(writers_per_task * 2) as i64,
"both tasks must succeed despite the contention"
);
assert_eq!(final_view.version, (writers_per_task * 2) as i64);
if total_conflicts > 0 {
println!(
"\nThe contention produced {total_conflicts} retries; \
each was a small cost paid by the loser, with no lock \
held by the winner. Throughput would degrade with locking; \
with OCC, the winning writer never pauses."
);
} else {
println!(
"\nNo conflicts this run (the scheduling happened to \
serialise the two tasks). Run again or increase the load \
to observe retries."
);
}
Ok(())
}