use std::collections::HashMap;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SimpleEvent {
pub id: u64,
pub aggregate_id: String,
pub event_type: String,
pub payload: String,
pub version: u64,
pub timestamp: u64,
}
pub struct SimpleEventStore {
events: Vec<SimpleEvent>,
next_id: u64,
versions: HashMap<String, u64>,
}
impl SimpleEventStore {
pub fn new() -> Self {
Self {
events: Vec::new(),
next_id: 1,
versions: HashMap::new(),
}
}
pub fn append(
&mut self,
aggregate_id: impl Into<String>,
event_type: impl Into<String>,
payload: impl Into<String>,
) -> SimpleEvent {
use std::time::{SystemTime, UNIX_EPOCH};
let aggregate_id = aggregate_id.into();
let version = self.versions.entry(aggregate_id.clone()).or_insert(0);
*version += 1;
let event = SimpleEvent {
id: self.next_id,
aggregate_id: aggregate_id.clone(),
event_type: event_type.into(),
payload: payload.into(),
version: *version,
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0),
};
self.next_id += 1;
self.events.push(event.clone());
event
}
pub fn load_aggregate(&self, aggregate_id: &str) -> Vec<SimpleEvent> {
self.events
.iter()
.filter(|e| e.aggregate_id == aggregate_id)
.cloned()
.collect()
}
pub fn load_from_version(&self, aggregate_id: &str, from_version: u64) -> Vec<SimpleEvent> {
self.events
.iter()
.filter(|e| e.aggregate_id == aggregate_id && e.version >= from_version)
.cloned()
.collect()
}
pub fn load_all_events(&self) -> Vec<SimpleEvent> {
self.events.clone()
}
pub fn len(&self) -> usize {
self.events.len()
}
pub fn is_empty(&self) -> bool {
self.events.is_empty()
}
pub fn current_version(&self, aggregate_id: &str) -> u64 {
self.versions.get(aggregate_id).copied().unwrap_or(0)
}
}
impl Default for SimpleEventStore {
fn default() -> Self {
Self::new()
}
}
pub struct EventStreamIter {
events: Vec<SimpleEvent>,
position: usize,
filter_aggregate: Option<String>,
filter_type: Option<String>,
}
impl EventStreamIter {
pub fn new(events: Vec<SimpleEvent>) -> Self {
Self {
events,
position: 0,
filter_aggregate: None,
filter_type: None,
}
}
pub fn for_aggregate(mut self, aggregate_id: impl Into<String>) -> Self {
self.filter_aggregate = Some(aggregate_id.into());
self
}
pub fn for_type(mut self, event_type: impl Into<String>) -> Self {
self.filter_type = Some(event_type.into());
self
}
fn matches(&self, event: &SimpleEvent) -> bool {
if let Some(ref agg) = self.filter_aggregate {
if &event.aggregate_id != agg {
return false;
}
}
if let Some(ref et) = self.filter_type {
if &event.event_type != et {
return false;
}
}
true
}
}
impl Iterator for EventStreamIter {
type Item = SimpleEvent;
fn next(&mut self) -> Option<Self::Item> {
while self.position < self.events.len() {
let ev = &self.events[self.position];
self.position += 1;
if self.matches(ev) {
return Some(ev.clone());
}
}
None
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SimpleSnapshot {
pub aggregate_id: String,
pub state: String,
pub version: u64,
}
pub struct SimpleSnapshotStore {
snapshots: HashMap<String, SimpleSnapshot>,
}
impl SimpleSnapshotStore {
pub fn new() -> Self {
Self {
snapshots: HashMap::new(),
}
}
pub fn save(&mut self, snapshot: SimpleSnapshot) {
self.snapshots
.insert(snapshot.aggregate_id.clone(), snapshot);
}
pub fn load_snapshot(&self, aggregate_id: &str) -> Option<&SimpleSnapshot> {
self.snapshots.get(aggregate_id)
}
pub fn delete(&mut self, aggregate_id: &str) -> bool {
self.snapshots.remove(aggregate_id).is_some()
}
pub fn len(&self) -> usize {
self.snapshots.len()
}
pub fn is_empty(&self) -> bool {
self.snapshots.is_empty()
}
}
impl Default for SimpleSnapshotStore {
fn default() -> Self {
Self::new()
}
}
pub type SimpleEventHandler = std::sync::Arc<dyn Fn(&SimpleEvent) + Send + Sync>;
pub struct SimpleEventBus {
subscriptions: HashMap<String, Vec<SimpleEventHandler>>,
wildcard: Vec<SimpleEventHandler>,
}
impl SimpleEventBus {
pub fn new() -> Self {
Self {
subscriptions: HashMap::new(),
wildcard: Vec::new(),
}
}
pub fn subscribe(&mut self, event_type: impl Into<String>, handler: SimpleEventHandler) {
let key = event_type.into();
if key == "*" {
self.wildcard.push(handler);
} else {
self.subscriptions.entry(key).or_default().push(handler);
}
}
pub fn publish(&self, event: &SimpleEvent) {
if let Some(handlers) = self.subscriptions.get(&event.event_type) {
for handler in handlers {
handler(event);
}
}
for handler in &self.wildcard {
handler(event);
}
}
pub fn subscription_count(&self) -> usize {
self.subscriptions.values().map(|v| v.len()).sum()
}
pub fn wildcard_count(&self) -> usize {
self.wildcard.len()
}
}
impl Default for SimpleEventBus {
fn default() -> Self {
Self::new()
}
}
pub struct ProjectionRunner {
pub name: String,
processed: u64,
}
impl ProjectionRunner {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
processed: 0,
}
}
pub fn run<S, F>(&mut self, store: &SimpleEventStore, initial: S, mut handler: F) -> S
where
F: FnMut(S, &SimpleEvent) -> S,
{
let mut state = initial;
for event in store.load_all_events() {
state = handler(state, &event);
self.processed += 1;
}
state
}
pub fn run_for_aggregate<S, F>(
&mut self,
store: &SimpleEventStore,
aggregate_id: &str,
initial: S,
mut handler: F,
) -> S
where
F: FnMut(S, &SimpleEvent) -> S,
{
let mut state = initial;
for event in store.load_aggregate(aggregate_id) {
state = handler(state, &event);
self.processed += 1;
}
state
}
pub fn processed_count(&self) -> u64 {
self.processed
}
}