use crate::command::CommandContext;
use serde::{Serialize, de::DeserializeOwned};
use uuid::Uuid;
pub trait Aggregate:
Default + Clone + Serialize + DeserializeOwned + Send + Sync + 'static
{
const AGGREGATE_TYPE: &'static str;
type Command: Send + 'static;
type DomainEvent: Serialize + DeserializeOwned + Send + Sync + Clone + 'static;
type Error: std::error::Error + Send + Sync + 'static;
fn handle(&self, cmd: Self::Command) -> Result<Vec<Self::DomainEvent>, Self::Error>;
fn apply(self, event: &Self::DomainEvent) -> Self;
}
fn reduce<A: Aggregate>(state: A, event: &eventfold::Event) -> A {
let tagged = if event.data.is_null() {
serde_json::json!({ "type": event.event_type })
} else {
serde_json::json!({
"type": event.event_type,
"data": event.data,
})
};
match serde_json::from_value::<A::DomainEvent>(tagged) {
Ok(domain_event) => state.apply(&domain_event),
Err(_) => state,
}
}
pub fn reducer<A: Aggregate>() -> eventfold::ReduceFn<A> {
reduce::<A>
}
pub fn to_eventfold_event<A: Aggregate>(
domain_event: &A::DomainEvent,
ctx: &CommandContext,
) -> serde_json::Result<eventfold::Event> {
let value = serde_json::to_value(domain_event)?;
let obj = value
.as_object()
.expect("adjacently tagged enum must serialize to a JSON object");
let event_type = obj["type"]
.as_str()
.expect("adjacently tagged enum must have a string 'type' field");
let data = obj.get("data").cloned().unwrap_or(serde_json::Value::Null);
let mut event = eventfold::Event::new(event_type, data).with_id(Uuid::new_v4().to_string());
if let Some(ref actor) = ctx.actor {
event = event.with_actor(actor);
}
let mut meta_map = match ctx.metadata {
Some(serde_json::Value::Object(ref map)) => map.clone(),
_ => serde_json::Map::new(),
};
if let Some(ref cid) = ctx.correlation_id {
meta_map.insert(
"correlation_id".to_string(),
serde_json::Value::String(cid.clone()),
);
}
if let Some(ref device_id) = ctx.source_device {
meta_map.insert(
"source_device".to_string(),
serde_json::Value::String(device_id.clone()),
);
}
if !meta_map.is_empty() {
event = event.with_meta(serde_json::Value::Object(meta_map));
}
Ok(event)
}
#[cfg(test)]
pub(crate) mod test_fixtures {
use super::Aggregate;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
pub(crate) struct Counter {
pub value: u64,
}
pub(crate) enum CounterCommand {
Increment,
Decrement,
Add(u64),
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(tag = "type", content = "data")]
pub(crate) enum CounterEvent {
Incremented,
Decremented,
Added { amount: u64 },
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum CounterError {
#[error("cannot decrement: counter is already zero")]
AlreadyZero,
}
impl Aggregate for Counter {
const AGGREGATE_TYPE: &'static str = "counter";
type Command = CounterCommand;
type DomainEvent = CounterEvent;
type Error = CounterError;
fn handle(&self, cmd: Self::Command) -> Result<Vec<Self::DomainEvent>, Self::Error> {
match cmd {
CounterCommand::Increment => Ok(vec![CounterEvent::Incremented]),
CounterCommand::Decrement => {
if self.value == 0 {
return Err(CounterError::AlreadyZero);
}
Ok(vec![CounterEvent::Decremented])
}
CounterCommand::Add(n) => Ok(vec![CounterEvent::Added { amount: n }]),
}
}
fn apply(mut self, event: &Self::DomainEvent) -> Self {
match event {
CounterEvent::Incremented => self.value += 1,
CounterEvent::Decremented => self.value -= 1,
CounterEvent::Added { amount } => self.value += amount,
}
self
}
}
}
#[cfg(test)]
mod tests {
use super::Aggregate;
use super::test_fixtures::{Counter, CounterCommand, CounterError, CounterEvent};
#[test]
fn handle_increment() {
let counter = Counter::default();
let events = counter.handle(CounterCommand::Increment).unwrap();
assert_eq!(events, vec![CounterEvent::Incremented]);
}
#[test]
fn handle_decrement_nonzero() {
let counter = Counter { value: 5 };
let events = counter.handle(CounterCommand::Decrement).unwrap();
assert_eq!(events, vec![CounterEvent::Decremented]);
}
#[test]
fn handle_decrement_at_zero() {
let counter = Counter::default();
let result = counter.handle(CounterCommand::Decrement);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(
matches!(err, CounterError::AlreadyZero),
"expected AlreadyZero, got: {err}"
);
}
#[test]
fn handle_add() {
let counter = Counter::default();
let events = counter.handle(CounterCommand::Add(5)).unwrap();
assert_eq!(events, vec![CounterEvent::Added { amount: 5 }]);
}
#[test]
fn apply_incremented() {
let counter = Counter::default().apply(&CounterEvent::Incremented);
assert_eq!(counter.value, 1);
}
#[test]
fn apply_decremented() {
let counter = Counter { value: 3 }.apply(&CounterEvent::Decremented);
assert_eq!(counter.value, 2);
}
#[test]
fn apply_added() {
let counter = Counter::default().apply(&CounterEvent::Added { amount: 5 });
assert_eq!(counter.value, 5);
}
#[test]
fn handle_then_apply_roundtrip() {
let counter = Counter::default();
let events = counter.handle(CounterCommand::Increment).unwrap();
let final_state = events
.iter()
.fold(Counter::default(), |state, event| state.apply(event));
assert_eq!(final_state.value, 1);
}
use super::{reducer, to_eventfold_event};
use crate::command::CommandContext;
#[test]
fn reducer_roundtrip_increment() {
let event =
to_eventfold_event::<Counter>(&CounterEvent::Incremented, &CommandContext::default())
.unwrap();
let state = reducer::<Counter>()(Counter::default(), &event);
assert_eq!(state.value, 1);
}
#[test]
fn reducer_roundtrip_added() {
let event = to_eventfold_event::<Counter>(
&CounterEvent::Added { amount: 5 },
&CommandContext::default(),
)
.unwrap();
let state = reducer::<Counter>()(Counter::default(), &event);
assert_eq!(state.value, 5);
}
#[test]
fn reducer_unknown_event_skipped() {
let event = eventfold::Event::new("UnknownType", serde_json::json!({}));
let state = reducer::<Counter>()(Counter::default(), &event);
assert_eq!(state.value, 0);
}
#[test]
fn context_propagates_actor() {
let ctx = CommandContext::default().with_actor("user-1");
let event = to_eventfold_event::<Counter>(&CounterEvent::Incremented, &ctx).unwrap();
assert_eq!(event.actor, Some("user-1".into()));
}
#[test]
fn context_propagates_correlation_id() {
let ctx = CommandContext::default().with_correlation_id("req-abc");
let event = to_eventfold_event::<Counter>(&CounterEvent::Incremented, &ctx).unwrap();
let meta = event.meta.expect("meta should be present");
assert_eq!(meta["correlation_id"], "req-abc");
}
#[test]
fn fieldless_variant_roundtrip() {
let event =
to_eventfold_event::<Counter>(&CounterEvent::Incremented, &CommandContext::default())
.unwrap();
assert_eq!(event.event_type, "Incremented");
assert!(event.data.is_null());
let state = reducer::<Counter>()(Counter::default(), &event);
assert_eq!(state.value, 1);
}
#[test]
fn to_eventfold_event_assigns_uuid_v4_id() {
let event =
to_eventfold_event::<Counter>(&CounterEvent::Incremented, &CommandContext::default())
.unwrap();
let id = event.id.expect("event.id should be Some");
let parsed = uuid::Uuid::parse_str(&id)
.unwrap_or_else(|e| panic!("event id '{id}' is not a valid UUID: {e}"));
assert_eq!(
parsed.get_version(),
Some(uuid::Version::Random),
"event id '{id}' is not UUID v4 (version = {:?})",
parsed.get_version()
);
assert_eq!(id, parsed.as_hyphenated().to_string());
}
#[test]
fn to_eventfold_event_produces_distinct_ids() {
let ctx = CommandContext::default();
let event_a = to_eventfold_event::<Counter>(&CounterEvent::Incremented, &ctx).unwrap();
let event_b = to_eventfold_event::<Counter>(&CounterEvent::Incremented, &ctx).unwrap();
assert_ne!(
event_a.id, event_b.id,
"successive events must have distinct ids"
);
}
#[test]
fn context_propagates_source_device() {
let ctx = CommandContext::default().with_source_device("device-xyz");
let event = to_eventfold_event::<Counter>(&CounterEvent::Incremented, &ctx).unwrap();
let meta = event.meta.expect("meta should be present");
assert_eq!(meta["source_device"], "device-xyz");
}
#[test]
fn context_propagates_source_device_and_correlation_id() {
let ctx = CommandContext::default()
.with_correlation_id("req-abc")
.with_source_device("device-xyz");
let event = to_eventfold_event::<Counter>(&CounterEvent::Incremented, &ctx).unwrap();
let meta = event.meta.expect("meta should be present");
assert_eq!(meta["correlation_id"], "req-abc");
assert_eq!(meta["source_device"], "device-xyz");
}
#[test]
fn context_merges_source_device_with_existing_metadata() {
let ctx = CommandContext::default()
.with_metadata(serde_json::json!({"foo": "bar", "level": 3}))
.with_source_device("device-xyz");
let event = to_eventfold_event::<Counter>(&CounterEvent::Incremented, &ctx).unwrap();
let meta = event.meta.expect("meta should be present");
assert_eq!(meta["foo"], "bar");
assert_eq!(meta["level"], 3);
assert_eq!(meta["source_device"], "device-xyz");
}
#[test]
fn all_none_context_produces_no_meta() {
let ctx = CommandContext::default();
assert!(ctx.source_device.is_none());
assert!(ctx.correlation_id.is_none());
assert!(ctx.metadata.is_none());
let event = to_eventfold_event::<Counter>(&CounterEvent::Incremented, &ctx).unwrap();
assert!(
event.meta.is_none(),
"meta should be None when all context fields are None, got: {:?}",
event.meta
);
}
#[test]
fn reducer_applies_event_with_no_id() {
let event_with_id_none = eventfold::Event::new("Incremented", serde_json::Value::Null);
assert!(event_with_id_none.id.is_none(), "precondition: id is None");
let state = reducer::<Counter>()(Counter::default(), &event_with_id_none);
assert_eq!(state.value, 1);
let unknown_event = eventfold::Event::new("SomeFutureEvent", serde_json::json!({}));
assert!(unknown_event.id.is_none(), "precondition: id is None");
let state = reducer::<Counter>()(Counter::default(), &unknown_event);
assert_eq!(state.value, 0);
}
}