use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::io;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use crate::actor::{ActorConfig, AggregateHandle, spawn_actor_with_config};
use crate::aggregate::Aggregate;
use crate::process_manager::{
AggregateDispatcher, ProcessManagerCatchUp, ProcessManagerReport, ProcessManagerRunner,
TypedDispatcher, append_dead_letter,
};
use crate::projection::{Projection, ProjectionRunner};
use crate::storage::StreamLayout;
type HandleCache = HashMap<(String, String), Box<dyn Any + Send + Sync>>;
type ProjectionMap = HashMap<String, Box<dyn Any + Send + Sync>>;
type ProcessManagerList = Vec<std::sync::Mutex<Box<dyn ProcessManagerCatchUp>>>;
type DispatcherMap = HashMap<String, Box<dyn AggregateDispatcher>>;
type ProjectionCatchUpList = Vec<std::sync::Mutex<Box<dyn ProjectionCatchUpFn>>>;
trait ProjectionCatchUpFn: Send + Sync {
fn catch_up(&mut self) -> io::Result<()>;
}
struct SharedProjectionCatchUp<P: Projection> {
inner: Arc<std::sync::Mutex<ProjectionRunner<P>>>,
}
impl<P: Projection> ProjectionCatchUpFn for SharedProjectionCatchUp<P> {
fn catch_up(&mut self) -> io::Result<()> {
let mut runner = self
.inner
.lock()
.map_err(|e| io::Error::other(e.to_string()))?;
runner.catch_up()
}
}
const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(300);
#[derive(Debug, Clone, Default)]
pub struct InjectOptions {
pub run_process_managers: bool,
}
#[derive(Clone)]
pub struct AggregateStore {
layout: StreamLayout,
cache: Arc<RwLock<HandleCache>>,
projections: Arc<std::sync::RwLock<ProjectionMap>>,
projection_catch_ups: Arc<std::sync::RwLock<ProjectionCatchUpList>>,
process_managers: Arc<std::sync::RwLock<ProcessManagerList>>,
dispatchers: Arc<DispatcherMap>,
seen_ids: Arc<std::sync::Mutex<HashSet<String>>>,
idle_timeout: Duration,
}
impl std::fmt::Debug for AggregateStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AggregateStore")
.field("base_dir", &self.layout.base_dir())
.finish()
}
}
impl AggregateStore {
pub async fn open(base_dir: impl AsRef<Path>) -> io::Result<Self> {
let layout = StreamLayout::new(base_dir.as_ref());
let meta_dir = layout.meta_dir();
tokio::task::spawn_blocking(move || std::fs::create_dir_all(meta_dir))
.await
.map_err(io::Error::other)??;
Ok(Self {
layout,
cache: Arc::new(RwLock::new(HashMap::new())),
projections: Arc::new(std::sync::RwLock::new(HashMap::new())),
projection_catch_ups: Arc::new(std::sync::RwLock::new(Vec::new())),
process_managers: Arc::new(std::sync::RwLock::new(Vec::new())),
dispatchers: Arc::new(HashMap::new()),
seen_ids: Arc::new(std::sync::Mutex::new(HashSet::new())),
idle_timeout: DEFAULT_IDLE_TIMEOUT,
})
}
pub async fn get<A: Aggregate>(&self, id: &str) -> io::Result<AggregateHandle<A>> {
let key = (A::AGGREGATE_TYPE.to_owned(), id.to_owned());
{
let cache = self.cache.read().await;
if let Some(boxed) = cache.get(&key)
&& let Some(handle) = boxed.downcast_ref::<AggregateHandle<A>>()
&& handle.is_alive()
{
return Ok(handle.clone());
}
}
{
let mut cache = self.cache.write().await;
cache.remove(&key);
}
let layout = self.layout.clone();
let agg_type = A::AGGREGATE_TYPE.to_owned();
let inst_id = id.to_owned();
let stream_dir =
tokio::task::spawn_blocking(move || layout.ensure_stream(&agg_type, &inst_id))
.await
.map_err(io::Error::other)??;
tracing::debug!(
aggregate_type = A::AGGREGATE_TYPE,
instance_id = %id,
"spawning actor"
);
let config = ActorConfig {
idle_timeout: self.idle_timeout,
};
let handle = spawn_actor_with_config::<A>(&stream_dir, config)?;
let mut cache = self.cache.write().await;
cache.insert(key, Box::new(handle.clone()));
Ok(handle)
}
pub async fn list<A: Aggregate>(&self) -> io::Result<Vec<String>> {
let layout = self.layout.clone();
let agg_type = A::AGGREGATE_TYPE.to_owned();
tokio::task::spawn_blocking(move || layout.list_streams(&agg_type))
.await
.map_err(io::Error::other)?
}
pub fn builder(base_dir: impl AsRef<Path>) -> AggregateStoreBuilder {
AggregateStoreBuilder {
base_dir: base_dir.as_ref().to_owned(),
projection_factories: Vec::new(),
process_manager_factories: Vec::new(),
dispatcher_factories: Vec::new(),
idle_timeout: DEFAULT_IDLE_TIMEOUT,
}
}
pub fn projection<P: Projection>(&self) -> io::Result<P> {
let projections = self
.projections
.read()
.map_err(|e| io::Error::other(e.to_string()))?;
let runner_any = projections.get(P::NAME).ok_or_else(|| {
io::Error::new(
io::ErrorKind::NotFound,
format!("projection '{}' not registered", P::NAME),
)
})?;
let runner_arc = runner_any
.downcast_ref::<Arc<std::sync::Mutex<ProjectionRunner<P>>>>()
.ok_or_else(|| io::Error::other("projection type mismatch"))?;
let mut runner = runner_arc
.lock()
.map_err(|e| io::Error::other(e.to_string()))?;
runner.catch_up()?;
Ok(runner.state().clone())
}
pub fn rebuild_projection<P: Projection>(&self) -> io::Result<()> {
let projections = self
.projections
.read()
.map_err(|e| io::Error::other(e.to_string()))?;
let runner_any = projections.get(P::NAME).ok_or_else(|| {
io::Error::new(
io::ErrorKind::NotFound,
format!("projection '{}' not registered", P::NAME),
)
})?;
let runner_arc = runner_any
.downcast_ref::<Arc<std::sync::Mutex<ProjectionRunner<P>>>>()
.ok_or_else(|| io::Error::other("projection type mismatch"))?;
let mut runner = runner_arc
.lock()
.map_err(|e| io::Error::other(e.to_string()))?;
runner.rebuild()
}
pub async fn run_process_managers(&self) -> io::Result<ProcessManagerReport> {
let mut all_work: Vec<(Vec<crate::command::CommandEnvelope>, std::path::PathBuf)> =
Vec::new();
{
let pms = self
.process_managers
.read()
.map_err(|e| io::Error::other(e.to_string()))?;
for pm_mutex in pms.iter() {
let mut pm = pm_mutex
.lock()
.map_err(|e| io::Error::other(e.to_string()))?;
let envelopes = pm.catch_up()?;
let dead_letter_path = pm.dead_letter_path();
all_work.push((envelopes, dead_letter_path));
}
}
let mut report = ProcessManagerReport::default();
for (envelopes, dead_letter_path) in &all_work {
for envelope in envelopes {
let agg_type = &envelope.aggregate_type;
match self.dispatchers.get(agg_type) {
Some(dispatcher) => match dispatcher.dispatch(self, envelope.clone()).await {
Ok(()) => {
tracing::info!(
target_type = %agg_type,
target_id = %envelope.instance_id,
"dispatching command"
);
report.dispatched += 1;
}
Err(e) => {
tracing::error!(
aggregate_type = %agg_type,
instance_id = %envelope.instance_id,
error = %e,
"process manager dispatch failed, dead-lettering"
);
append_dead_letter(dead_letter_path, envelope.clone(), &e.to_string())?;
report.dead_lettered += 1;
}
},
None => {
let err_msg = format!("unknown aggregate type: {agg_type}");
tracing::error!(
aggregate_type = %agg_type,
"no dispatcher registered, dead-lettering"
);
append_dead_letter(dead_letter_path, envelope.clone(), &err_msg)?;
report.dead_lettered += 1;
}
}
}
}
{
let pms = self
.process_managers
.read()
.map_err(|e| io::Error::other(e.to_string()))?;
for pm_mutex in pms.iter() {
let pm = pm_mutex
.lock()
.map_err(|e| io::Error::other(e.to_string()))?;
pm.save()?;
}
}
Ok(report)
}
pub fn layout(&self) -> &StreamLayout {
&self.layout
}
pub async fn list_streams(
&self,
aggregate_type: Option<&str>,
) -> io::Result<Vec<(String, String)>> {
let layout = self.layout.clone();
match aggregate_type {
Some(agg_type) => {
let agg_type = agg_type.to_owned();
tokio::task::spawn_blocking(move || {
let ids = layout.list_streams(&agg_type)?;
Ok(ids.into_iter().map(|id| (agg_type.clone(), id)).collect())
})
.await
.map_err(io::Error::other)?
}
None => tokio::task::spawn_blocking(move || {
let types = layout.list_aggregate_types()?;
let mut pairs = Vec::new();
for agg_type in types {
let ids = layout.list_streams(&agg_type)?;
pairs.extend(ids.into_iter().map(|id| (agg_type.clone(), id)));
}
Ok(pairs)
})
.await
.map_err(io::Error::other)?,
}
}
pub async fn read_events(
&self,
aggregate_type: &str,
instance_id: &str,
) -> io::Result<Vec<eventfold::Event>> {
let layout = self.layout.clone();
let agg_type = aggregate_type.to_owned();
let inst_id = instance_id.to_owned();
tokio::task::spawn_blocking(move || {
let stream_dir = layout.stream_dir(&agg_type, &inst_id);
if !stream_dir.is_dir() {
return Err(io::Error::new(
io::ErrorKind::NotFound,
format!("stream directory not found: {}", stream_dir.display()),
));
}
let reader = eventfold::EventReader::new(&stream_dir);
let iter = match reader.read_from(0) {
Ok(iter) => iter,
Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(Vec::new()),
Err(e) => return Err(e),
};
let mut events = Vec::new();
for result in iter {
let (event, _next_offset, _line_hash) = result?;
events.push(event);
}
Ok(events)
})
.await
.map_err(io::Error::other)?
}
pub async fn inject_event<A: Aggregate>(
&self,
instance_id: &str,
event: eventfold::Event,
opts: InjectOptions,
) -> io::Result<()> {
let event_id = event.id.clone();
if let Some(ref id) = event_id {
let seen = self
.seen_ids
.lock()
.map_err(|e| io::Error::other(e.to_string()))?;
if seen.contains(id) {
return Ok(());
}
}
let layout = self.layout.clone();
let agg_type = A::AGGREGATE_TYPE.to_owned();
let inst_id = instance_id.to_owned();
let stream_dir =
tokio::task::spawn_blocking(move || layout.ensure_stream(&agg_type, &inst_id))
.await
.map_err(io::Error::other)??;
let key = (A::AGGREGATE_TYPE.to_owned(), instance_id.to_owned());
let injected_via_actor = {
let cache = self.cache.read().await;
if let Some(boxed) = cache.get(&key)
&& let Some(handle) = boxed.downcast_ref::<AggregateHandle<A>>()
&& handle.is_alive()
{
handle.inject_via_actor(event.clone()).await?;
true
} else {
false
}
};
if !injected_via_actor {
let ev = event;
tokio::task::spawn_blocking(move || {
let mut writer = eventfold::EventWriter::open(&stream_dir)?;
writer.append(&ev).map(|_| ())
})
.await
.map_err(io::Error::other)??;
}
if let Some(id) = event_id {
let mut seen = self
.seen_ids
.lock()
.map_err(|e| io::Error::other(e.to_string()))?;
seen.insert(id);
}
{
let catch_ups = self
.projection_catch_ups
.read()
.map_err(|e| io::Error::other(e.to_string()))?;
for catch_up_mutex in catch_ups.iter() {
let mut catch_up = catch_up_mutex
.lock()
.map_err(|e| io::Error::other(e.to_string()))?;
catch_up.catch_up()?;
}
}
if opts.run_process_managers {
self.run_process_managers().await?;
}
Ok(())
}
}
type ProjectionFactory = Box<
dyn FnOnce(
StreamLayout,
) -> io::Result<(
Box<dyn Any + Send + Sync>,
std::sync::Mutex<Box<dyn ProjectionCatchUpFn>>,
)>,
>;
type ProcessManagerFactory =
Box<dyn FnOnce(StreamLayout) -> io::Result<std::sync::Mutex<Box<dyn ProcessManagerCatchUp>>>>;
type DispatcherFactory = Box<dyn FnOnce() -> Box<dyn AggregateDispatcher>>;
pub struct AggregateStoreBuilder {
base_dir: PathBuf,
projection_factories: Vec<(String, ProjectionFactory)>,
process_manager_factories: Vec<(String, ProcessManagerFactory)>,
dispatcher_factories: Vec<(String, DispatcherFactory)>,
idle_timeout: Duration,
}
impl AggregateStoreBuilder {
pub fn projection<P: Projection>(mut self) -> Self {
self.projection_factories.push((
P::NAME.to_owned(),
Box::new(|layout| {
let runner = ProjectionRunner::<P>::new(layout)?;
let shared = Arc::new(std::sync::Mutex::new(runner));
let any_box: Box<dyn Any + Send + Sync> = Box::new(shared.clone());
let catch_up: std::sync::Mutex<Box<dyn ProjectionCatchUpFn>> =
std::sync::Mutex::new(Box::new(SharedProjectionCatchUp { inner: shared }));
Ok((any_box, catch_up))
}),
));
self
}
pub fn process_manager<PM>(mut self) -> Self
where
PM: crate::process_manager::ProcessManager,
{
self.process_manager_factories.push((
PM::NAME.to_owned(),
Box::new(|layout| {
let runner = ProcessManagerRunner::<PM>::new(layout)?;
Ok(std::sync::Mutex::new(
Box::new(runner) as Box<dyn ProcessManagerCatchUp>
))
}),
));
self
}
pub fn idle_timeout(mut self, timeout: Duration) -> Self {
self.idle_timeout = timeout;
self
}
pub fn aggregate_type<A>(mut self) -> Self
where
A: Aggregate,
A::Command: serde::de::DeserializeOwned,
{
self.dispatcher_factories.push((
A::AGGREGATE_TYPE.to_owned(),
Box::new(|| Box::new(TypedDispatcher::<A>::new()) as Box<dyn AggregateDispatcher>),
));
self
}
pub async fn open(self) -> io::Result<AggregateStore> {
let layout = StreamLayout::new(&self.base_dir);
let meta_dir = layout.meta_dir();
tokio::task::spawn_blocking(move || std::fs::create_dir_all(meta_dir))
.await
.map_err(io::Error::other)??;
let mut projections = HashMap::new();
let mut projection_catch_ups: ProjectionCatchUpList = Vec::new();
for (name, factory) in self.projection_factories {
let (any_runner, catch_up) = factory(layout.clone())?;
projections.insert(name, any_runner);
projection_catch_ups.push(catch_up);
}
let mut process_managers = Vec::new();
for (_name, factory) in self.process_manager_factories {
let runner = factory(layout.clone())?;
process_managers.push(runner);
}
let mut dispatchers: HashMap<String, Box<dyn AggregateDispatcher>> = HashMap::new();
for (name, factory) in self.dispatcher_factories {
dispatchers.insert(name, factory());
}
Ok(AggregateStore {
layout,
cache: Arc::new(RwLock::new(HashMap::new())),
projections: Arc::new(std::sync::RwLock::new(projections)),
projection_catch_ups: Arc::new(std::sync::RwLock::new(projection_catch_ups)),
process_managers: Arc::new(std::sync::RwLock::new(process_managers)),
dispatchers: Arc::new(dispatchers),
seen_ids: Arc::new(std::sync::Mutex::new(HashSet::new())),
idle_timeout: self.idle_timeout,
})
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use tempfile::TempDir;
use super::*;
use crate::aggregate::test_fixtures::{Counter, CounterCommand};
use crate::command::CommandContext;
#[tokio::test]
async fn full_roundtrip() {
let tmp = TempDir::new().expect("failed to create temp dir");
let store = AggregateStore::open(tmp.path())
.await
.expect("open should succeed");
let handle = store
.get::<Counter>("c-1")
.await
.expect("get should succeed");
let ctx = CommandContext::default();
handle
.execute(CounterCommand::Increment, ctx.clone())
.await
.expect("first increment should succeed");
handle
.execute(CounterCommand::Increment, ctx)
.await
.expect("second increment should succeed");
let state = handle.state().await.expect("state should succeed");
assert_eq!(state.value, 2);
}
#[tokio::test]
async fn list_empty_initially() {
let tmp = TempDir::new().expect("failed to create temp dir");
let store = AggregateStore::open(tmp.path())
.await
.expect("open should succeed");
let ids = store.list::<Counter>().await.expect("list should succeed");
assert!(ids.is_empty());
}
#[tokio::test]
async fn list_after_commands() {
let tmp = TempDir::new().expect("failed to create temp dir");
let store = AggregateStore::open(tmp.path())
.await
.expect("open should succeed");
let ctx = CommandContext::default();
let h1 = store
.get::<Counter>("c-1")
.await
.expect("get c-1 should succeed");
h1.execute(CounterCommand::Increment, ctx.clone())
.await
.expect("c-1 increment should succeed");
let h2 = store
.get::<Counter>("c-2")
.await
.expect("get c-2 should succeed");
h2.execute(CounterCommand::Increment, ctx)
.await
.expect("c-2 increment should succeed");
let mut ids = store.list::<Counter>().await.expect("list should succeed");
ids.sort();
assert_eq!(ids, vec!["c-1", "c-2"]);
}
#[tokio::test]
async fn same_id_returns_shared_handle() {
let tmp = TempDir::new().expect("failed to create temp dir");
let store = AggregateStore::open(tmp.path())
.await
.expect("open should succeed");
let h1 = store
.get::<Counter>("c-1")
.await
.expect("first get should succeed");
let h2 = store
.get::<Counter>("c-1")
.await
.expect("second get should succeed");
h1.execute(CounterCommand::Increment, CommandContext::default())
.await
.expect("increment via h1 should succeed");
let state = h2.state().await.expect("state via h2 should succeed");
assert_eq!(state.value, 1);
}
#[tokio::test]
async fn state_survives_store_reopen() {
let tmp = TempDir::new().expect("failed to create temp dir");
{
let store = AggregateStore::open(tmp.path())
.await
.expect("open should succeed");
let handle = store
.get::<Counter>("c-1")
.await
.expect("get should succeed");
let ctx = CommandContext::default();
for _ in 0..3 {
handle
.execute(CounterCommand::Increment, ctx.clone())
.await
.expect("increment should succeed");
}
}
tokio::time::sleep(Duration::from_millis(50)).await;
let store = AggregateStore::open(tmp.path())
.await
.expect("reopen should succeed");
let handle = store
.get::<Counter>("c-1")
.await
.expect("get after reopen should succeed");
let state = handle.state().await.expect("state should succeed");
assert_eq!(state.value, 3);
}
#[tokio::test]
async fn two_aggregate_types_coexist() {
#[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
struct Toggle {
pub on: bool,
}
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
#[serde(tag = "type", content = "data")]
enum ToggleEvent {
Toggled,
}
#[derive(Debug, thiserror::Error)]
enum ToggleError {}
impl Aggregate for Toggle {
const AGGREGATE_TYPE: &'static str = "toggle";
type Command = ();
type DomainEvent = ToggleEvent;
type Error = ToggleError;
fn handle(&self, _cmd: ()) -> Result<Vec<ToggleEvent>, ToggleError> {
Ok(vec![ToggleEvent::Toggled])
}
fn apply(mut self, _event: &ToggleEvent) -> Self {
self.on = !self.on;
self
}
}
let tmp = TempDir::new().expect("failed to create temp dir");
let store = AggregateStore::open(tmp.path())
.await
.expect("open should succeed");
let counter_handle = store
.get::<Counter>("c-1")
.await
.expect("get counter should succeed");
counter_handle
.execute(CounterCommand::Increment, CommandContext::default())
.await
.expect("counter increment should succeed");
let toggle_handle = store
.get::<Toggle>("t-1")
.await
.expect("get toggle should succeed");
toggle_handle
.execute((), CommandContext::default())
.await
.expect("toggle should succeed");
let counter_state = counter_handle
.state()
.await
.expect("counter state should succeed");
assert_eq!(counter_state.value, 1);
let toggle_state = toggle_handle
.state()
.await
.expect("toggle state should succeed");
assert!(toggle_state.on);
let counter_ids = store
.list::<Counter>()
.await
.expect("list counters should succeed");
assert_eq!(counter_ids, vec!["c-1"]);
let toggle_ids = store
.list::<Toggle>()
.await
.expect("list toggles should succeed");
assert_eq!(toggle_ids, vec!["t-1"]);
}
use crate::projection::test_fixtures::EventCounter;
async fn increment(store: &AggregateStore, id: &str) {
let handle = store.get::<Counter>(id).await.expect("get should succeed");
handle
.execute(CounterCommand::Increment, CommandContext::default())
.await
.expect("increment should succeed");
}
#[tokio::test]
async fn builder_with_projection_roundtrip() {
let tmp = TempDir::new().expect("failed to create temp dir");
let store = AggregateStore::builder(tmp.path())
.projection::<EventCounter>()
.open()
.await
.expect("builder open should succeed");
let handle = store
.get::<Counter>("c-1")
.await
.expect("get should succeed");
let ctx = CommandContext::default();
for _ in 0..3 {
handle
.execute(CounterCommand::Increment, ctx.clone())
.await
.expect("increment should succeed");
}
let counter = store
.projection::<EventCounter>()
.expect("projection query should succeed");
assert_eq!(counter.count, 3);
}
#[tokio::test]
async fn projection_sees_multiple_instances() {
let tmp = TempDir::new().expect("failed to create temp dir");
let store = AggregateStore::builder(tmp.path())
.projection::<EventCounter>()
.open()
.await
.expect("builder open should succeed");
increment(&store, "c-1").await;
increment(&store, "c-2").await;
let counter = store
.projection::<EventCounter>()
.expect("projection query should succeed");
assert_eq!(counter.count, 2);
}
#[tokio::test]
async fn projection_persists_across_restart() {
let tmp = TempDir::new().expect("failed to create temp dir");
{
let store = AggregateStore::builder(tmp.path())
.projection::<EventCounter>()
.open()
.await
.expect("builder open should succeed");
increment(&store, "c-1").await;
increment(&store, "c-1").await;
increment(&store, "c-2").await;
let counter = store
.projection::<EventCounter>()
.expect("projection query should succeed");
assert_eq!(counter.count, 3);
}
tokio::time::sleep(Duration::from_millis(50)).await;
let store = AggregateStore::builder(tmp.path())
.projection::<EventCounter>()
.open()
.await
.expect("reopen should succeed");
let counter = store
.projection::<EventCounter>()
.expect("projection query after reopen should succeed");
assert_eq!(counter.count, 3);
}
#[tokio::test]
async fn projection_without_registration_returns_error() {
let tmp = TempDir::new().expect("failed to create temp dir");
let store = AggregateStore::open(tmp.path())
.await
.expect("open should succeed");
let result = store.projection::<EventCounter>();
assert!(result.is_err());
assert_eq!(result.unwrap_err().kind(), io::ErrorKind::NotFound);
}
#[tokio::test]
async fn open_convenience_still_works() {
let tmp = TempDir::new().expect("failed to create temp dir");
let store = AggregateStore::open(tmp.path())
.await
.expect("open should succeed");
let handle = store
.get::<Counter>("c-1")
.await
.expect("get should succeed");
handle
.execute(CounterCommand::Increment, CommandContext::default())
.await
.expect("increment should succeed");
let state = handle.state().await.expect("state should succeed");
assert_eq!(state.value, 1);
}
#[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
struct Receiver {
pub received_count: u64,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(tag = "type", content = "data")]
enum ReceiverCommand {
Accept,
}
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
#[serde(tag = "type", content = "data")]
enum ReceiverEvent {
Accepted,
}
#[derive(Debug, thiserror::Error)]
enum ReceiverError {}
impl Aggregate for Receiver {
const AGGREGATE_TYPE: &'static str = "receiver";
type Command = ReceiverCommand;
type DomainEvent = ReceiverEvent;
type Error = ReceiverError;
fn handle(&self, _cmd: ReceiverCommand) -> Result<Vec<ReceiverEvent>, ReceiverError> {
Ok(vec![ReceiverEvent::Accepted])
}
fn apply(mut self, _event: &ReceiverEvent) -> Self {
self.received_count += 1;
self
}
}
#[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
struct ForwardSaga {
pub forwarded: u64,
}
impl crate::process_manager::ProcessManager for ForwardSaga {
const NAME: &'static str = "forward-saga";
fn subscriptions(&self) -> &'static [&'static str] {
&["counter"]
}
fn react(
&mut self,
_aggregate_type: &str,
stream_id: &str,
_event: &eventfold::Event,
) -> Vec<crate::command::CommandEnvelope> {
self.forwarded += 1;
vec![crate::command::CommandEnvelope {
aggregate_type: "receiver".to_string(),
instance_id: stream_id.to_string(),
command: serde_json::json!({"type": "Accept"}),
context: CommandContext::default(),
}]
}
}
#[tokio::test]
async fn end_to_end_process_manager_dispatch() {
let tmp = TempDir::new().expect("failed to create temp dir");
let store = AggregateStore::builder(tmp.path())
.process_manager::<ForwardSaga>()
.aggregate_type::<Receiver>()
.open()
.await
.expect("builder open should succeed");
increment(&store, "c-1").await;
increment(&store, "c-1").await;
let report = store
.run_process_managers()
.await
.expect("run_process_managers should succeed");
assert_eq!(report.dispatched, 2);
assert_eq!(report.dead_lettered, 0);
let receiver_handle = store
.get::<Receiver>("c-1")
.await
.expect("get receiver should succeed");
let receiver_state = receiver_handle
.state()
.await
.expect("receiver state should succeed");
assert_eq!(receiver_state.received_count, 2);
}
#[tokio::test]
async fn process_manager_dead_letters_unknown_type() {
#[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
struct BadTargetSaga {
seen: u64,
}
impl crate::process_manager::ProcessManager for BadTargetSaga {
const NAME: &'static str = "bad-target-saga";
fn subscriptions(&self) -> &'static [&'static str] {
&["counter"]
}
fn react(
&mut self,
_aggregate_type: &str,
_stream_id: &str,
_event: &eventfold::Event,
) -> Vec<crate::command::CommandEnvelope> {
self.seen += 1;
vec![crate::command::CommandEnvelope {
aggregate_type: "nonexistent".to_string(),
instance_id: "x".to_string(),
command: serde_json::json!({}),
context: CommandContext::default(),
}]
}
}
let tmp = TempDir::new().expect("failed to create temp dir");
let store = AggregateStore::builder(tmp.path())
.process_manager::<BadTargetSaga>()
.open()
.await
.expect("builder open should succeed");
increment(&store, "c-1").await;
let report = store
.run_process_managers()
.await
.expect("run_process_managers should succeed");
assert_eq!(report.dispatched, 0);
assert_eq!(report.dead_lettered, 1);
let dl_path = tmp
.path()
.join("process_managers/bad-target-saga/dead_letters.jsonl");
let contents = std::fs::read_to_string(&dl_path).expect("dead-letter file should exist");
let entry: serde_json::Value =
serde_json::from_str(contents.trim()).expect("dead-letter entry should be valid JSON");
assert!(
entry["error"]
.as_str()
.expect("error field should be a string")
.contains("nonexistent")
);
}
#[tokio::test]
async fn run_process_managers_idempotent() {
let tmp = TempDir::new().expect("failed to create temp dir");
let store = AggregateStore::builder(tmp.path())
.process_manager::<ForwardSaga>()
.aggregate_type::<Receiver>()
.open()
.await
.expect("builder open should succeed");
increment(&store, "c-1").await;
let first = store
.run_process_managers()
.await
.expect("first run should succeed");
assert_eq!(first.dispatched, 1);
let second = store
.run_process_managers()
.await
.expect("second run should succeed");
assert_eq!(second.dispatched, 0);
assert_eq!(second.dead_lettered, 0);
}
#[tokio::test]
async fn process_manager_recovers_after_restart() {
let tmp = TempDir::new().expect("failed to create temp dir");
{
let store = AggregateStore::builder(tmp.path())
.process_manager::<ForwardSaga>()
.aggregate_type::<Receiver>()
.open()
.await
.expect("builder open should succeed");
increment(&store, "c-1").await;
increment(&store, "c-2").await;
let report = store
.run_process_managers()
.await
.expect("run should succeed");
assert_eq!(report.dispatched, 2);
}
tokio::time::sleep(Duration::from_millis(50)).await;
let store = AggregateStore::builder(tmp.path())
.process_manager::<ForwardSaga>()
.aggregate_type::<Receiver>()
.open()
.await
.expect("reopen should succeed");
increment(&store, "c-1").await;
let report = store
.run_process_managers()
.await
.expect("run after restart should succeed");
assert_eq!(report.dispatched, 1);
assert_eq!(report.dead_lettered, 0);
}
#[tokio::test]
async fn idle_actor_evicted_and_respawned() {
let tmp = TempDir::new().expect("failed to create temp dir");
let store = AggregateStore::builder(tmp.path())
.idle_timeout(Duration::from_millis(200))
.open()
.await
.expect("builder open should succeed");
let handle = store
.get::<Counter>("c-1")
.await
.expect("get should succeed");
handle
.execute(CounterCommand::Increment, CommandContext::default())
.await
.expect("increment should succeed");
tokio::time::sleep(Duration::from_millis(400)).await;
assert!(
!handle.is_alive(),
"actor should be dead after idle timeout"
);
let handle2 = store
.get::<Counter>("c-1")
.await
.expect("get after eviction should succeed");
let state = handle2.state().await.expect("state should succeed");
assert_eq!(state.value, 1, "state should reflect persisted events");
}
#[tokio::test]
async fn rapid_commands_keep_actor_alive() {
let tmp = TempDir::new().expect("failed to create temp dir");
let store = AggregateStore::builder(tmp.path())
.idle_timeout(Duration::from_millis(300))
.open()
.await
.expect("builder open should succeed");
let handle = store
.get::<Counter>("c-1")
.await
.expect("get should succeed");
let ctx = CommandContext::default();
for _ in 0..5 {
handle
.execute(CounterCommand::Increment, ctx.clone())
.await
.expect("execute should succeed");
tokio::time::sleep(Duration::from_millis(100)).await;
}
assert!(
handle.is_alive(),
"actor should remain alive during activity"
);
let state = handle.state().await.expect("state should succeed");
assert_eq!(state.value, 5);
}
fn incremented_event() -> eventfold::Event {
eventfold::Event::new("Incremented", serde_json::Value::Null)
}
#[tokio::test]
async fn inject_event_appends_to_stream() {
let tmp = TempDir::new().expect("failed to create temp dir");
let store = AggregateStore::open(tmp.path())
.await
.expect("open should succeed");
store
.inject_event::<Counter>("c-1", incremented_event(), InjectOptions::default())
.await
.expect("inject_event should succeed");
let jsonl_path = tmp.path().join("streams/counter/c-1/app.jsonl");
let contents = std::fs::read_to_string(&jsonl_path).expect("app.jsonl should exist");
assert_eq!(
contents.lines().count(),
1,
"should have exactly one event line"
);
}
#[tokio::test]
async fn inject_event_projections_reflect_event() {
let tmp = TempDir::new().expect("failed to create temp dir");
let store = AggregateStore::builder(tmp.path())
.projection::<EventCounter>()
.open()
.await
.expect("builder open should succeed");
store
.inject_event::<Counter>("c-1", incremented_event(), InjectOptions::default())
.await
.expect("inject_event should succeed");
let counter = store
.projection::<EventCounter>()
.expect("projection query should succeed");
assert_eq!(counter.count, 1);
}
#[tokio::test]
async fn inject_event_dedup_by_id() {
let tmp = TempDir::new().expect("failed to create temp dir");
let store = AggregateStore::open(tmp.path())
.await
.expect("open should succeed");
let event = incremented_event().with_id("ev-1".to_string());
store
.inject_event::<Counter>("c-1", event.clone(), InjectOptions::default())
.await
.expect("first inject should succeed");
store
.inject_event::<Counter>("c-1", event, InjectOptions::default())
.await
.expect("second inject should succeed (no-op)");
let jsonl_path = tmp.path().join("streams/counter/c-1/app.jsonl");
let contents = std::fs::read_to_string(&jsonl_path).expect("app.jsonl should exist");
assert_eq!(
contents.lines().count(),
1,
"dedup should prevent second write"
);
}
#[tokio::test]
async fn inject_event_no_dedup_for_none_id() {
let tmp = TempDir::new().expect("failed to create temp dir");
let store = AggregateStore::open(tmp.path())
.await
.expect("open should succeed");
let event = incremented_event();
assert!(event.id.is_none(), "precondition: id is None");
store
.inject_event::<Counter>("c-1", event.clone(), InjectOptions::default())
.await
.expect("first inject should succeed");
store
.inject_event::<Counter>("c-1", event, InjectOptions::default())
.await
.expect("second inject should succeed");
let jsonl_path = tmp.path().join("streams/counter/c-1/app.jsonl");
let contents = std::fs::read_to_string(&jsonl_path).expect("app.jsonl should exist");
assert_eq!(contents.lines().count(), 2, "both events should be written");
}
#[tokio::test]
async fn inject_options_default_does_not_run_process_managers() {
let opts = InjectOptions::default();
assert!(!opts.run_process_managers);
}
#[tokio::test]
async fn inject_event_with_process_managers() {
let tmp = TempDir::new().expect("failed to create temp dir");
let store = AggregateStore::builder(tmp.path())
.process_manager::<ForwardSaga>()
.aggregate_type::<Receiver>()
.open()
.await
.expect("builder open should succeed");
store
.inject_event::<Counter>(
"c-1",
incremented_event(),
InjectOptions {
run_process_managers: true,
},
)
.await
.expect("inject_event should succeed");
let receiver_handle = store
.get::<Receiver>("c-1")
.await
.expect("get receiver should succeed");
let receiver_state = receiver_handle
.state()
.await
.expect("receiver state should succeed");
assert_eq!(
receiver_state.received_count, 1,
"process manager should have dispatched"
);
}
#[tokio::test]
async fn inject_event_with_live_actor() {
let tmp = TempDir::new().expect("failed to create temp dir");
let store = AggregateStore::open(tmp.path())
.await
.expect("open should succeed");
let handle = store
.get::<Counter>("c-1")
.await
.expect("get should succeed");
assert!(handle.is_alive(), "actor should be alive");
store
.inject_event::<Counter>("c-1", incremented_event(), InjectOptions::default())
.await
.expect("inject_event with live actor should succeed");
let state = handle.state().await.expect("state should succeed");
assert_eq!(state.value, 1, "actor should see the injected event");
}
#[tokio::test]
async fn inject_event_creates_new_stream() {
let tmp = TempDir::new().expect("failed to create temp dir");
let store = AggregateStore::open(tmp.path())
.await
.expect("open should succeed");
store
.inject_event::<Counter>(
"new-instance",
incremented_event(),
InjectOptions::default(),
)
.await
.expect("inject_event should create stream");
let stream_dir = tmp.path().join("streams/counter/new-instance");
assert!(stream_dir.is_dir(), "stream directory should exist");
let handle = store
.get::<Counter>("new-instance")
.await
.expect("get should succeed after inject");
let state = handle.state().await.expect("state should succeed");
assert_eq!(state.value, 1, "actor should replay the injected event");
}
#[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
struct Toggle {
pub on: bool,
}
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
#[serde(tag = "type", content = "data")]
enum ToggleEvent {
Toggled,
}
#[derive(Debug, thiserror::Error)]
enum ToggleError {}
impl Aggregate for Toggle {
const AGGREGATE_TYPE: &'static str = "toggle";
type Command = ();
type DomainEvent = ToggleEvent;
type Error = ToggleError;
fn handle(&self, _cmd: ()) -> Result<Vec<ToggleEvent>, ToggleError> {
Ok(vec![ToggleEvent::Toggled])
}
fn apply(mut self, _event: &ToggleEvent) -> Self {
self.on = !self.on;
self
}
}
async fn toggle(store: &AggregateStore, id: &str) {
let handle = store
.get::<Toggle>(id)
.await
.expect("get toggle should succeed");
handle
.execute((), CommandContext::default())
.await
.expect("toggle should succeed");
}
#[tokio::test]
async fn list_streams_none_returns_all_sorted() {
let tmp = TempDir::new().expect("failed to create temp dir");
let store = AggregateStore::open(tmp.path())
.await
.expect("open should succeed");
increment(&store, "c-1").await;
increment(&store, "c-2").await;
toggle(&store, "t-1").await;
let pairs = store
.list_streams(None)
.await
.expect("list_streams(None) should succeed");
assert_eq!(
pairs,
vec![
("counter".to_owned(), "c-1".to_owned()),
("counter".to_owned(), "c-2".to_owned()),
("toggle".to_owned(), "t-1".to_owned()),
]
);
}
#[tokio::test]
async fn list_streams_some_filters_by_type() {
let tmp = TempDir::new().expect("failed to create temp dir");
let store = AggregateStore::open(tmp.path())
.await
.expect("open should succeed");
increment(&store, "c-1").await;
increment(&store, "c-2").await;
toggle(&store, "t-1").await;
let pairs = store
.list_streams(Some("counter"))
.await
.expect("list_streams(Some) should succeed");
assert_eq!(
pairs,
vec![
("counter".to_owned(), "c-1".to_owned()),
("counter".to_owned(), "c-2".to_owned()),
]
);
}
#[tokio::test]
async fn list_streams_none_empty_store() {
let tmp = TempDir::new().expect("failed to create temp dir");
let store = AggregateStore::open(tmp.path())
.await
.expect("open should succeed");
let pairs = store
.list_streams(None)
.await
.expect("list_streams(None) on empty store should succeed");
assert!(pairs.is_empty());
}
#[tokio::test]
async fn list_streams_some_nonexistent_type() {
let tmp = TempDir::new().expect("failed to create temp dir");
let store = AggregateStore::open(tmp.path())
.await
.expect("open should succeed");
let pairs = store
.list_streams(Some("nonexistent"))
.await
.expect("list_streams(Some(nonexistent)) should succeed");
assert!(pairs.is_empty());
}
#[tokio::test]
async fn read_events_returns_all_events() {
let tmp = TempDir::new().expect("failed to create temp dir");
let store = AggregateStore::open(tmp.path())
.await
.expect("open should succeed");
increment(&store, "c-1").await;
increment(&store, "c-1").await;
let events = store
.read_events("counter", "c-1")
.await
.expect("read_events should succeed");
assert_eq!(events.len(), 2);
assert_eq!(events[0].event_type, "Incremented");
assert_eq!(events[1].event_type, "Incremented");
}
#[tokio::test]
async fn read_events_empty_stream_returns_empty_vec() {
let tmp = TempDir::new().expect("failed to create temp dir");
let store = AggregateStore::open(tmp.path())
.await
.expect("open should succeed");
let _handle = store
.get::<Counter>("c-1")
.await
.expect("get should succeed");
let events = store
.read_events("counter", "c-1")
.await
.expect("read_events on empty stream should succeed");
assert!(events.is_empty());
}
#[tokio::test]
async fn read_events_nonexistent_stream_returns_not_found() {
let tmp = TempDir::new().expect("failed to create temp dir");
let store = AggregateStore::open(tmp.path())
.await
.expect("open should succeed");
let result = store.read_events("nonexistent", "x").await;
assert!(result.is_err());
assert_eq!(result.unwrap_err().kind(), io::ErrorKind::NotFound);
}
}