use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use mire::{Aggregate, EventData, EventHandler, EventStore, HandledEvent, ProjectionRunner};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize, EventData)]
#[serde(tag = "type")]
#[mire(entity = "order")]
enum OrderEvent {
Placed { amount: i64 },
}
#[derive(Default)]
struct Order {
amount: i64,
}
impl Aggregate for Order {
type Event = OrderEvent;
fn stream_category() -> &'static str {
"order"
}
fn apply(&mut self, event: &OrderEvent) {
match event {
OrderEvent::Placed { amount } => self.amount += amount,
}
}
}
struct PrintingHandler {
worker_label: String,
counter: Arc<AtomicU64>,
}
impl EventHandler for PrintingHandler {
type Aggregate = Order;
async fn handle(&self, event: HandledEvent<OrderEvent>) -> anyhow::Result<()> {
let OrderEvent::Placed { amount } = event.event;
let n = self.counter.fetch_add(1, Ordering::Relaxed) + 1;
println!(
" [{}] handled event #{n} (amount={amount}) on {}",
self.worker_label,
event.stream_id(),
);
Ok(())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let url = std::env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgres://mire:mire@localhost:5434/mire".to_string());
let pool = PgPool::connect(&url).await?;
let store = EventStore::new(pool.clone());
store.migrate().await?;
let suffix = Uuid::new_v4().simple().to_string();
let subscription_id = format!("orders-projection-{suffix}");
let make_runner = |label: &str| {
let counter = Arc::new(AtomicU64::new(0));
let runner = ProjectionRunner::builder(store.clone())
.worker_id(format!("replica-{label}"))
.poll_interval(Duration::from_millis(50))
.lease_ttl(Duration::from_secs(3))
.lease_recheck_interval(Duration::from_millis(500))
.subscribe(
subscription_id.clone(),
PrintingHandler {
worker_label: format!("replica-{label}"),
counter: counter.clone(),
},
)
.build();
(runner, counter)
};
let (runner_a, count_a) = make_runner("A");
let (runner_b, count_b) = make_runner("B");
let cancel_a = CancellationToken::new();
let cancel_b = CancellationToken::new();
let mut handle_a = Some({
let cancel = cancel_a.clone();
tokio::spawn(async move { runner_a.run(cancel).await })
});
let mut handle_b = Some({
let cancel = cancel_b.clone();
tokio::spawn(async move { runner_b.run(cancel).await })
});
let lease_pool = store.pool();
let leader_id = loop {
if let Some(s) = mire::lease::status(lease_pool, &subscription_id).await?
&& s.leased_until > chrono::Utc::now()
{
break s.worker_id;
}
tokio::time::sleep(Duration::from_millis(50)).await;
};
println!("\n--- after startup ---");
println!("lease holder: {leader_id}");
println!("\n--- appending 5 events; leader processes ---");
for i in 0..5 {
let id = Uuid::new_v4().to_string();
let mut order = store.load_or_default::<Order>(&id).await?;
order.record(OrderEvent::Placed {
amount: 100 * (i + 1),
});
store.save(&mut order).await?;
}
tokio::time::sleep(Duration::from_millis(800)).await;
let before_a = count_a.load(Ordering::Relaxed);
let before_b = count_b.load(Ordering::Relaxed);
println!(" A processed {before_a} events");
println!(" B processed {before_b} events");
println!("\n--- killing the current leader ({leader_id}); follower should take over ---");
if leader_id == "replica-A" {
cancel_a.cancel();
if let Some(h) = handle_a.take() {
let _ = h.await;
}
} else {
cancel_b.cancel();
if let Some(h) = handle_b.take() {
let _ = h.await;
}
}
tokio::time::sleep(Duration::from_secs(2)).await;
let status_after = mire::lease::status(lease_pool, &subscription_id).await?;
if let Some(s) = &status_after {
println!(
"lease holder after failover: {} (fence_token={})",
s.worker_id, s.fence_token,
);
}
println!("\n--- appending 5 more events; new leader processes ---");
for i in 5..10 {
let id = Uuid::new_v4().to_string();
let mut order = store.load_or_default::<Order>(&id).await?;
order.record(OrderEvent::Placed {
amount: 100 * (i + 1),
});
store.save(&mut order).await?;
}
tokio::time::sleep(Duration::from_millis(800)).await;
let after_a = count_a.load(Ordering::Relaxed);
let after_b = count_b.load(Ordering::Relaxed);
println!(
" A processed {after_a} events total (Δ {})",
after_a - before_a
);
println!(
" B processed {after_b} events total (Δ {})",
after_b - before_b
);
cancel_a.cancel();
cancel_b.cancel();
if let Some(h) = handle_a {
let _ = h.await;
}
if let Some(h) = handle_b {
let _ = h.await;
}
println!("\n--- summary ---");
println!(
" total events processed: A={after_a}, B={after_b} (sum {})",
after_a + after_b
);
println!(
" 10 events were appended; in this clean handoff each was delivered \
once to the current leader. Under a mid-batch pause+failover, \
delivery is at-least-once — idempotent handlers absorb the \
re-delivery."
);
Ok(())
}