use std::time::Duration;
use mire::{Aggregate, EventData, EventHandler, EventStore, HandledEvent, ProjectionRunner};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize, EventData)]
#[serde(tag = "type")]
#[mire(entity = "counter")]
enum CounterEvent {
Incremented { by: i64 },
}
#[derive(Debug, Default)]
struct Counter {
total: i64,
}
impl Aggregate for Counter {
type Event = CounterEvent;
fn stream_category() -> &'static str {
"counter"
}
fn apply(&mut self, event: &CounterEvent) {
match event {
CounterEvent::Incremented { by } => self.total += by,
}
}
}
struct CounterTotalsProjection {
db: PgPool,
}
impl EventHandler for CounterTotalsProjection {
type Aggregate = Counter;
async fn handle(&self, event: HandledEvent<CounterEvent>) -> anyhow::Result<()> {
let CounterEvent::Incremented { by } = event.event;
sqlx::query(
"INSERT INTO counter_totals (stream_id, total) VALUES ($1, $2)
ON CONFLICT (stream_id) DO UPDATE SET total = counter_totals.total + EXCLUDED.total",
)
.bind(event.stream_id())
.bind(by)
.execute(&self.db)
.await?;
Ok(())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let database_url = std::env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgres://mire:mire@localhost:5432/mire".to_string());
let pool = PgPool::connect(&database_url).await?;
let store = EventStore::new(pool.clone());
store.migrate().await?;
sqlx::raw_sql(
"CREATE TABLE IF NOT EXISTS counter_totals (
stream_id TEXT PRIMARY KEY,
total BIGINT NOT NULL
)",
)
.execute(&pool)
.await?;
let runner = ProjectionRunner::builder(store.clone())
.poll_interval(Duration::from_millis(50))
.subscribe(
"counter-totals-projection",
CounterTotalsProjection { db: pool.clone() },
)
.build();
let token = CancellationToken::new();
let handle = {
let token = token.clone();
tokio::spawn(async move { runner.run(token).await })
};
let mut stream_ids = Vec::new();
for _ in 0..2 {
let id = Uuid::new_v4().to_string();
let mut counter = store.load_or_default::<Counter>(&id).await?;
counter.record(CounterEvent::Incremented { by: 3 });
counter.record(CounterEvent::Incremented { by: 7 });
store.save(&mut counter).await?;
stream_ids.push(counter.stream_id.clone());
}
for stream_id in &stream_ids {
loop {
let total: Option<i64> =
sqlx::query_scalar("SELECT total FROM counter_totals WHERE stream_id = $1")
.bind(stream_id)
.fetch_optional(&pool)
.await?;
if total == Some(10) {
break;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
}
token.cancel();
handle.await??;
println!("read model caught up for {} streams:", stream_ids.len());
for stream_id in &stream_ids {
let total: i64 =
sqlx::query_scalar("SELECT total FROM counter_totals WHERE stream_id = $1")
.bind(stream_id)
.fetch_one(&pool)
.await?;
println!(" {stream_id} = {total}");
}
println!(
"projection lag: {}",
store.projection_lag("counter-totals-projection").await?
);
Ok(())
}