#![allow(
clippy::print_stdout,
clippy::wildcard_enum_match_arm,
clippy::cast_possible_truncation
)]
use batpak::prelude::*;
use serde::{Deserialize, Serialize};
const INCREMENTED: EventKind = EventKind::custom(1, 1);
const DECREMENTED: EventKind = EventKind::custom(1, 2);
#[derive(Serialize, Deserialize)]
struct IncrementedBy {
amount: i64,
reason: String,
}
#[derive(Debug, Default, Serialize, Deserialize)]
struct CounterState {
value: i64,
total_increments: u32,
total_decrements: u32,
}
impl EventSourced for CounterState {
type Input = batpak::prelude::ValueInput;
fn from_events(events: &[Event<serde_json::Value>]) -> Option<Self> {
if events.is_empty() {
return None;
}
let mut state = Self::default();
for event in events {
state.apply_event(event);
}
Some(state)
}
fn apply_event(&mut self, event: &Event<serde_json::Value>) {
let kind = event.header.event_kind;
if kind == INCREMENTED || kind == DECREMENTED {
if let Ok(payload) = serde_json::from_value::<IncrementedBy>(event.payload.clone()) {
self.value += payload.amount;
if payload.amount > 0 {
self.total_increments += 1;
} else {
self.total_decrements += 1;
}
}
}
}
fn relevant_event_kinds() -> &'static [EventKind] {
&[INCREMENTED, DECREMENTED]
}
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let dir = tempfile::tempdir()?;
let store = Store::open(StoreConfig::new(dir.path()))?;
let coord = Coordinate::new("counter:hits", "example")?;
println!("Writing events...\n");
store.append(
&coord,
INCREMENTED,
&IncrementedBy {
amount: 1,
reason: "page view".into(),
},
)?;
store.append(
&coord,
INCREMENTED,
&IncrementedBy {
amount: 5,
reason: "bulk import".into(),
},
)?;
store.append(
&coord,
DECREMENTED,
&IncrementedBy {
amount: -2,
reason: "cleanup".into(),
},
)?;
let state: Option<CounterState> =
store.project::<CounterState>("counter:hits", &Freshness::Consistent)?;
match state {
Some(s) => {
println!("Counter state (reconstructed from {} events):", 3);
println!(" value: {}", s.value);
println!(" total_increments: {}", s.total_increments);
println!(" total_decrements: {}", s.total_decrements);
}
None => println!("No events found!"),
}
println!("\nRaw event log:");
let entries = store.stream("counter:hits");
for entry in &entries {
let stored = store.get(entry.event_id)?;
println!(
" seq={} kind={} payload={}",
entry.clock, entry.kind, stored.event.payload
);
}
if let Some(last) = entries.last() {
println!("\nAncestor walk from last event:");
let ancestors = store.walk_ancestors(last.event_id, 10);
for (i, a) in ancestors.iter().enumerate() {
println!(
" {}: kind={} payload={}",
i, a.event.header.event_kind, a.event.payload
);
}
}
store.close()?;
println!("\nDone. The event log told us the count is 4, and we know exactly why.");
Ok(())
}