use std::collections::HashMap;
use std::io;
use std::path::PathBuf;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use crate::command::CommandEnvelope;
use crate::error::DispatchError;
use crate::projection::CursorPosition;
use crate::storage::StreamLayout;
pub trait ProcessManager: Default + Serialize + DeserializeOwned + Send + Sync + 'static {
const NAME: &'static str;
fn subscriptions(&self) -> &'static [&'static str];
fn react(
&mut self,
aggregate_type: &str,
stream_id: &str,
event: &eventfold::Event,
) -> Vec<CommandEnvelope>;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct ProcessManagerCheckpoint<PM> {
state: PM,
#[serde(with = "cursor_map")]
cursors: HashMap<(String, String), CursorPosition>,
}
impl<PM: Default> Default for ProcessManagerCheckpoint<PM> {
fn default() -> Self {
Self {
state: PM::default(),
cursors: HashMap::new(),
}
}
}
mod cursor_map {
use super::*;
use serde::ser::SerializeMap;
use serde::{Deserializer, Serializer};
const SEP: char = '/';
pub fn serialize<S>(
map: &HashMap<(String, String), CursorPosition>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut ser_map = serializer.serialize_map(Some(map.len()))?;
for ((agg, id), cursor) in map {
let key = format!("{agg}{SEP}{id}");
ser_map.serialize_entry(&key, cursor)?;
}
ser_map.end()
}
pub fn deserialize<'de, D>(
deserializer: D,
) -> Result<HashMap<(String, String), CursorPosition>, D::Error>
where
D: Deserializer<'de>,
{
let raw: HashMap<String, CursorPosition> = HashMap::deserialize(deserializer)?;
raw.into_iter()
.map(|(key, cursor)| {
let (agg, id) = key.split_once(SEP).ok_or_else(|| {
serde::de::Error::custom(format!("cursor key missing '{SEP}' separator: {key}"))
})?;
Ok(((agg.to_string(), id.to_string()), cursor))
})
.collect()
}
}
fn save_pm_checkpoint<PM: ProcessManager>(
dir: &std::path::Path,
checkpoint: &ProcessManagerCheckpoint<PM>,
) -> io::Result<()> {
std::fs::create_dir_all(dir)?;
let path = dir.join("checkpoint.json");
let tmp_path = dir.join("checkpoint.json.tmp");
let json = serde_json::to_string_pretty(checkpoint).map_err(io::Error::other)?;
std::fs::write(&tmp_path, json)?;
std::fs::rename(&tmp_path, &path)?;
Ok(())
}
fn load_pm_checkpoint<PM: ProcessManager>(
dir: &std::path::Path,
) -> io::Result<Option<ProcessManagerCheckpoint<PM>>> {
let path = dir.join("checkpoint.json");
match std::fs::read_to_string(&path) {
Ok(content) => match serde_json::from_str(&content) {
Ok(checkpoint) => Ok(Some(checkpoint)),
Err(e) => {
tracing::warn!(
path = %path.display(),
error = %e,
"corrupt process manager checkpoint, will rebuild"
);
Ok(None)
}
},
Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(None),
Err(e) => Err(e),
}
}
#[allow(dead_code)] fn delete_pm_checkpoint(dir: &std::path::Path) -> io::Result<()> {
let path = dir.join("checkpoint.json");
match std::fs::remove_file(&path) {
Ok(()) => Ok(()),
Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
Err(e) => Err(e),
}
}
pub(crate) struct ProcessManagerRunner<PM: ProcessManager> {
checkpoint: ProcessManagerCheckpoint<PM>,
layout: StreamLayout,
checkpoint_dir: PathBuf,
}
impl<PM: ProcessManager> ProcessManagerRunner<PM> {
pub(crate) fn new(layout: StreamLayout) -> io::Result<Self> {
let checkpoint_dir = layout.process_managers_dir().join(PM::NAME);
let checkpoint = load_pm_checkpoint::<PM>(&checkpoint_dir)?.unwrap_or_default();
Ok(Self {
checkpoint,
layout,
checkpoint_dir,
})
}
pub(crate) fn catch_up(&mut self) -> io::Result<Vec<CommandEnvelope>> {
let _span = tracing::debug_span!("pm_catchup", pm_name = PM::NAME,).entered();
let mut envelopes = Vec::new();
for &aggregate_type in self.checkpoint.state.subscriptions() {
let instance_ids = self.layout.list_streams(aggregate_type)?;
for instance_id in &instance_ids {
let stream_dir = self.layout.stream_dir(aggregate_type, instance_id);
let reader = eventfold::EventReader::new(&stream_dir);
let key = (aggregate_type.to_owned(), instance_id.clone());
let offset = self
.checkpoint
.cursors
.get(&key)
.map(|c| c.offset)
.unwrap_or(0);
let iter = match reader.read_from(offset) {
Ok(iter) => iter,
Err(e) if e.kind() == io::ErrorKind::NotFound => continue,
Err(e) => return Err(e),
};
for result in iter {
let (event, next_offset, line_hash) = result?;
let produced = self
.checkpoint
.state
.react(aggregate_type, instance_id, &event);
envelopes.extend(produced);
self.checkpoint.cursors.insert(
key.clone(),
CursorPosition {
offset: next_offset,
hash: line_hash,
},
);
}
}
}
Ok(envelopes)
}
pub(crate) fn save(&self) -> io::Result<()> {
save_pm_checkpoint::<PM>(&self.checkpoint_dir, &self.checkpoint)
}
#[allow(dead_code)] pub(crate) fn state(&self) -> &PM {
&self.checkpoint.state
}
#[allow(dead_code)] pub(crate) fn rebuild(&mut self) -> io::Result<Vec<CommandEnvelope>> {
delete_pm_checkpoint(&self.checkpoint_dir)?;
self.checkpoint = ProcessManagerCheckpoint::default();
self.catch_up()
}
pub(crate) fn dead_letter_path(&self) -> PathBuf {
self.checkpoint_dir.join("dead_letters.jsonl")
}
#[allow(dead_code)] pub(crate) fn name(&self) -> &str {
PM::NAME
}
}
pub(crate) trait ProcessManagerCatchUp: Send + Sync {
fn catch_up(&mut self) -> io::Result<Vec<CommandEnvelope>>;
fn save(&self) -> io::Result<()>;
#[allow(dead_code)] fn name(&self) -> &str;
fn dead_letter_path(&self) -> PathBuf;
}
impl<PM: ProcessManager> ProcessManagerCatchUp for ProcessManagerRunner<PM> {
fn catch_up(&mut self) -> io::Result<Vec<CommandEnvelope>> {
self.catch_up()
}
fn save(&self) -> io::Result<()> {
self.save()
}
fn name(&self) -> &str {
self.name()
}
fn dead_letter_path(&self) -> PathBuf {
self.dead_letter_path()
}
}
pub(crate) trait AggregateDispatcher: Send + Sync {
fn dispatch<'a>(
&'a self,
store: &'a crate::store::AggregateStore,
envelope: CommandEnvelope,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), DispatchError>> + Send + 'a>>;
}
pub(crate) struct TypedDispatcher<A> {
_marker: std::marker::PhantomData<A>,
}
impl<A> TypedDispatcher<A> {
pub(crate) fn new() -> Self {
Self {
_marker: std::marker::PhantomData,
}
}
}
impl<A> AggregateDispatcher for TypedDispatcher<A>
where
A: crate::aggregate::Aggregate,
A::Command: DeserializeOwned,
{
fn dispatch<'a>(
&'a self,
store: &'a crate::store::AggregateStore,
envelope: CommandEnvelope,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), DispatchError>> + Send + 'a>>
{
Box::pin(async move {
let cmd: A::Command =
serde_json::from_value(envelope.command).map_err(DispatchError::Deserialization)?;
let handle = store
.get::<A>(&envelope.instance_id)
.await
.map_err(DispatchError::Io)?;
handle
.execute(cmd, envelope.context)
.await
.map_err(|e| DispatchError::Execution(Box::new(e)))?;
Ok(())
})
}
}
#[derive(Debug, Serialize, Deserialize)]
struct DeadLetterEntry {
envelope: CommandEnvelope,
error: String,
ts: u64,
}
pub(crate) fn append_dead_letter(
path: &std::path::Path,
envelope: CommandEnvelope,
error: &str,
) -> io::Result<()> {
use std::io::Write;
let ts = std::time::SystemTime::UNIX_EPOCH
.elapsed()
.expect("system clock is before Unix epoch")
.as_secs();
let entry = DeadLetterEntry {
envelope,
error: error.to_string(),
ts,
};
let json = serde_json::to_string(&entry).map_err(io::Error::other)?;
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
let mut file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)?;
writeln!(file, "{json}")?;
Ok(())
}
#[derive(Debug, Clone, Default)]
pub struct ProcessManagerReport {
pub dispatched: usize,
pub dead_lettered: usize,
}
#[cfg(test)]
pub(crate) mod test_fixtures {
use super::*;
use crate::command::CommandContext;
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
pub(crate) struct EchoSaga {
pub events_seen: u64,
}
impl ProcessManager for EchoSaga {
const NAME: &'static str = "echo-saga";
fn subscriptions(&self) -> &'static [&'static str] {
&["counter"]
}
fn react(
&mut self,
_aggregate_type: &str,
stream_id: &str,
event: &eventfold::Event,
) -> Vec<CommandEnvelope> {
self.events_seen += 1;
vec![CommandEnvelope {
aggregate_type: "target".to_string(),
instance_id: stream_id.to_string(),
command: serde_json::json!({
"source_event_type": event.event_type,
}),
context: CommandContext::default()
.with_correlation_id(format!("echo-{}", self.events_seen)),
}]
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use test_fixtures::EchoSaga;
use crate::aggregate::test_fixtures::{Counter, CounterCommand};
use crate::command::CommandContext;
use crate::store::AggregateStore;
fn dummy_event() -> eventfold::Event {
eventfold::Event::new("Incremented", serde_json::json!(null))
}
#[test]
fn echo_saga_react_produces_envelope() {
let mut saga = EchoSaga::default();
let event = dummy_event();
let envelopes = saga.react("counter", "c-1", &event);
assert_eq!(envelopes.len(), 1);
assert_eq!(envelopes[0].aggregate_type, "target");
assert_eq!(envelopes[0].instance_id, "c-1");
assert_eq!(envelopes[0].command["source_event_type"], "Incremented");
assert_eq!(saga.events_seen, 1);
}
#[test]
fn echo_saga_subscriptions() {
let saga = EchoSaga::default();
assert_eq!(saga.subscriptions(), &["counter"]);
}
#[test]
fn checkpoint_serde_roundtrip() {
let mut checkpoint = ProcessManagerCheckpoint {
state: EchoSaga { events_seen: 3 },
cursors: HashMap::new(),
};
checkpoint.cursors.insert(
("counter".to_string(), "c-1".to_string()),
CursorPosition {
offset: 256,
hash: "abc123".to_string(),
},
);
let json = serde_json::to_string(&checkpoint).expect("serialization should succeed");
let deserialized: ProcessManagerCheckpoint<EchoSaga> =
serde_json::from_str(&json).expect("deserialization should succeed");
assert_eq!(deserialized.state, checkpoint.state);
assert_eq!(deserialized.cursors, checkpoint.cursors);
}
async fn increment(store: &AggregateStore, id: &str) {
let handle = store.get::<Counter>(id).await.expect("get should succeed");
handle
.execute(CounterCommand::Increment, CommandContext::default())
.await
.expect("increment should succeed");
}
#[tokio::test]
async fn catch_up_produces_envelopes() {
let tmp = tempfile::tempdir().expect("failed to create tmpdir");
let store = AggregateStore::open(tmp.path())
.await
.expect("open should succeed");
increment(&store, "c-1").await;
increment(&store, "c-2").await;
let mut runner = ProcessManagerRunner::<EchoSaga>::new(store.layout().clone())
.expect("runner creation should succeed");
let envelopes = runner.catch_up().expect("catch_up should succeed");
assert_eq!(envelopes.len(), 2);
assert_eq!(runner.state().events_seen, 2);
}
#[tokio::test]
async fn cursors_advance_no_re_emit() {
let tmp = tempfile::tempdir().expect("failed to create tmpdir");
let store = AggregateStore::open(tmp.path())
.await
.expect("open should succeed");
increment(&store, "c-1").await;
let mut runner = ProcessManagerRunner::<EchoSaga>::new(store.layout().clone())
.expect("runner creation should succeed");
let first = runner.catch_up().expect("first catch_up should succeed");
assert_eq!(first.len(), 1);
runner.save().expect("save should succeed");
let second = runner.catch_up().expect("second catch_up should succeed");
assert!(second.is_empty());
}
#[tokio::test]
async fn checkpoint_persists_and_restores() {
let tmp = tempfile::tempdir().expect("failed to create tmpdir");
let store = AggregateStore::open(tmp.path())
.await
.expect("open should succeed");
increment(&store, "c-1").await;
increment(&store, "c-1").await;
{
let mut runner = ProcessManagerRunner::<EchoSaga>::new(store.layout().clone())
.expect("runner creation should succeed");
let envelopes = runner.catch_up().expect("catch_up should succeed");
assert_eq!(envelopes.len(), 2);
runner.save().expect("save should succeed");
}
let mut runner = ProcessManagerRunner::<EchoSaga>::new(store.layout().clone())
.expect("runner reload should succeed");
assert_eq!(runner.state().events_seen, 2);
let envelopes = runner.catch_up().expect("catch_up should succeed");
assert!(envelopes.is_empty());
}
#[tokio::test]
async fn rebuild_replays_all_events() {
let tmp = tempfile::tempdir().expect("failed to create tmpdir");
let store = AggregateStore::open(tmp.path())
.await
.expect("open should succeed");
increment(&store, "c-1").await;
increment(&store, "c-2").await;
increment(&store, "c-2").await;
let mut runner = ProcessManagerRunner::<EchoSaga>::new(store.layout().clone())
.expect("runner creation should succeed");
let first = runner.catch_up().expect("catch_up should succeed");
assert_eq!(first.len(), 3);
runner.save().expect("save should succeed");
let rebuilt = runner.rebuild().expect("rebuild should succeed");
assert_eq!(rebuilt.len(), 3);
assert_eq!(runner.state().events_seen, 3);
}
#[test]
fn dead_letter_append_creates_readable_jsonl() {
let tmp = tempfile::tempdir().expect("failed to create tmpdir");
let path = tmp.path().join("dead_letters.jsonl");
let envelope = CommandEnvelope {
aggregate_type: "target".to_string(),
instance_id: "t-1".to_string(),
command: serde_json::json!({"action": "test"}),
context: CommandContext::default(),
};
append_dead_letter(&path, envelope, "test error").expect("append should succeed");
let contents = std::fs::read_to_string(&path).expect("read should succeed");
let entry: DeadLetterEntry =
serde_json::from_str(contents.trim()).expect("should be valid JSON");
assert_eq!(entry.error, "test error");
assert_eq!(entry.envelope.aggregate_type, "target");
}
}