use mire::{Aggregate, EventData, EventStore, Snapshot, StreamQuery};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize, EventData)]
#[serde(tag = "type")]
#[mire(entity = "tally")]
enum TallyEvent {
Added { amount: i64 },
}
#[derive(Debug, Default, Serialize, Deserialize)]
struct Tally {
sum: i64,
count: i64,
}
impl Aggregate for Tally {
type Event = TallyEvent;
fn stream_category() -> &'static str {
"tally"
}
fn apply(&mut self, event: &TallyEvent) {
match event {
TallyEvent::Added { amount } => {
self.sum += amount;
self.count += 1;
}
}
}
}
impl Snapshot for Tally {
const SNAPSHOT_VERSION: i32 = 1;
const SNAPSHOT_FREQUENCY: i64 = 5;
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let database_url = std::env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgres://mire:mire@localhost:5432/mire".to_string());
let pool = PgPool::connect(&database_url).await?;
let store = EventStore::new(pool);
store.migrate().await?;
let id = Uuid::new_v4().to_string();
let mut tally = store.load_or_default::<Tally>(&id).await?;
for n in 1..=12 {
tally.record(TallyEvent::Added { amount: n });
store.save_snapshotting(&mut tally).await?;
}
println!(
"wrote {} events; state: sum={} count={}",
tally.version, tally.state.sum, tally.state.count
);
let snapshots = store
.read_stream(
&format!("{}-snapshot", tally.stream_id),
StreamQuery::default(),
)
.await?;
println!("snapshot stream holds {} snapshot(s):", snapshots.len());
for snap in &snapshots {
println!(
" reflects version {} (snapshot_version {})",
snap.data["version"], snap.data["snapshot_version"]
);
}
let loaded = store.load_snapshotted::<Tally>(&id).await?.expect("exists");
println!(
"loaded via snapshot: sum={} count={} version={}",
loaded.state.sum, loaded.state.count, loaded.version
);
assert_eq!(loaded.state.sum, (1i64..=12).sum::<i64>());
assert_eq!(loaded.version, 12);
println!("done");
Ok(())
}