1use std::collections::HashMap;
10use std::io;
11use std::path::PathBuf;
12
13use serde::de::DeserializeOwned;
14use serde::{Deserialize, Serialize};
15
16use crate::command::CommandEnvelope;
17use crate::error::DispatchError;
18use crate::projection::CursorPosition;
19use crate::storage::StreamLayout;
20
21pub trait ProcessManager: Default + Serialize + DeserializeOwned + Send + Sync + 'static {
36 const NAME: &'static str;
38
39 fn subscriptions(&self) -> &'static [&'static str];
43
44 fn react(
54 &mut self,
55 aggregate_type: &str,
56 stream_id: &str,
57 event: &eventfold::Event,
58 ) -> Vec<CommandEnvelope>;
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
67struct ProcessManagerCheckpoint<PM> {
68 state: PM,
70 #[serde(with = "cursor_map")]
72 cursors: HashMap<(String, String), CursorPosition>,
73}
74
75impl<PM: Default> Default for ProcessManagerCheckpoint<PM> {
76 fn default() -> Self {
77 Self {
78 state: PM::default(),
79 cursors: HashMap::new(),
80 }
81 }
82}
83
84mod cursor_map {
90 use super::*;
91 use serde::ser::SerializeMap;
92 use serde::{Deserializer, Serializer};
93
94 const SEP: char = '/';
95
96 pub fn serialize<S>(
97 map: &HashMap<(String, String), CursorPosition>,
98 serializer: S,
99 ) -> Result<S::Ok, S::Error>
100 where
101 S: Serializer,
102 {
103 let mut ser_map = serializer.serialize_map(Some(map.len()))?;
104 for ((agg, id), cursor) in map {
105 let key = format!("{agg}{SEP}{id}");
106 ser_map.serialize_entry(&key, cursor)?;
107 }
108 ser_map.end()
109 }
110
111 pub fn deserialize<'de, D>(
112 deserializer: D,
113 ) -> Result<HashMap<(String, String), CursorPosition>, D::Error>
114 where
115 D: Deserializer<'de>,
116 {
117 let raw: HashMap<String, CursorPosition> = HashMap::deserialize(deserializer)?;
118 raw.into_iter()
119 .map(|(key, cursor)| {
120 let (agg, id) = key.split_once(SEP).ok_or_else(|| {
121 serde::de::Error::custom(format!("cursor key missing '{SEP}' separator: {key}"))
122 })?;
123 Ok(((agg.to_string(), id.to_string()), cursor))
124 })
125 .collect()
126 }
127}
128
129fn save_pm_checkpoint<PM: ProcessManager>(
136 dir: &std::path::Path,
137 checkpoint: &ProcessManagerCheckpoint<PM>,
138) -> io::Result<()> {
139 std::fs::create_dir_all(dir)?;
140 let path = dir.join("checkpoint.json");
141 let tmp_path = dir.join("checkpoint.json.tmp");
142 let json = serde_json::to_string_pretty(checkpoint).map_err(io::Error::other)?;
143 std::fs::write(&tmp_path, json)?;
144 std::fs::rename(&tmp_path, &path)?;
145 Ok(())
146}
147
148fn load_pm_checkpoint<PM: ProcessManager>(
152 dir: &std::path::Path,
153) -> io::Result<Option<ProcessManagerCheckpoint<PM>>> {
154 let path = dir.join("checkpoint.json");
155 match std::fs::read_to_string(&path) {
156 Ok(content) => match serde_json::from_str(&content) {
157 Ok(checkpoint) => Ok(Some(checkpoint)),
158 Err(e) => {
159 tracing::warn!(
160 path = %path.display(),
161 error = %e,
162 "corrupt process manager checkpoint, will rebuild"
163 );
164 Ok(None)
165 }
166 },
167 Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(None),
168 Err(e) => Err(e),
169 }
170}
171
172#[allow(dead_code)] fn delete_pm_checkpoint(dir: &std::path::Path) -> io::Result<()> {
175 let path = dir.join("checkpoint.json");
176 match std::fs::remove_file(&path) {
177 Ok(()) => Ok(()),
178 Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
179 Err(e) => Err(e),
180 }
181}
182
183pub(crate) struct ProcessManagerRunner<PM: ProcessManager> {
194 checkpoint: ProcessManagerCheckpoint<PM>,
195 layout: StreamLayout,
196 checkpoint_dir: PathBuf,
197}
198
199impl<PM: ProcessManager> ProcessManagerRunner<PM> {
200 pub(crate) fn new(layout: StreamLayout) -> io::Result<Self> {
212 let checkpoint_dir = layout.process_managers_dir().join(PM::NAME);
213 let checkpoint = load_pm_checkpoint::<PM>(&checkpoint_dir)?.unwrap_or_default();
214 Ok(Self {
215 checkpoint,
216 layout,
217 checkpoint_dir,
218 })
219 }
220
221 pub(crate) fn catch_up(&mut self) -> io::Result<Vec<CommandEnvelope>> {
232 let _span = tracing::debug_span!("pm_catchup", pm_name = PM::NAME,).entered();
233
234 let mut envelopes = Vec::new();
235
236 for &aggregate_type in self.checkpoint.state.subscriptions() {
237 let instance_ids = self.layout.list_streams(aggregate_type)?;
238 for instance_id in &instance_ids {
239 let stream_dir = self.layout.stream_dir(aggregate_type, instance_id);
240 let reader = eventfold::EventReader::new(&stream_dir);
241 let key = (aggregate_type.to_owned(), instance_id.clone());
242 let offset = self
243 .checkpoint
244 .cursors
245 .get(&key)
246 .map(|c| c.offset)
247 .unwrap_or(0);
248 let iter = match reader.read_from(offset) {
250 Ok(iter) => iter,
251 Err(e) if e.kind() == io::ErrorKind::NotFound => continue,
252 Err(e) => return Err(e),
253 };
254 for result in iter {
255 let (event, next_offset, line_hash) = result?;
256 let produced = self
257 .checkpoint
258 .state
259 .react(aggregate_type, instance_id, &event);
260 envelopes.extend(produced);
261 self.checkpoint.cursors.insert(
262 key.clone(),
263 CursorPosition {
264 offset: next_offset,
265 hash: line_hash,
266 },
267 );
268 }
269 }
270 }
271
272 Ok(envelopes)
273 }
274
275 pub(crate) fn save(&self) -> io::Result<()> {
284 save_pm_checkpoint::<PM>(&self.checkpoint_dir, &self.checkpoint)
285 }
286
287 #[allow(dead_code)] pub(crate) fn state(&self) -> &PM {
290 &self.checkpoint.state
291 }
292
293 #[allow(dead_code)] pub(crate) fn rebuild(&mut self) -> io::Result<Vec<CommandEnvelope>> {
300 delete_pm_checkpoint(&self.checkpoint_dir)?;
301 self.checkpoint = ProcessManagerCheckpoint::default();
302 self.catch_up()
303 }
304
305 pub(crate) fn dead_letter_path(&self) -> PathBuf {
307 self.checkpoint_dir.join("dead_letters.jsonl")
308 }
309
310 #[allow(dead_code)] pub(crate) fn name(&self) -> &str {
313 PM::NAME
314 }
315}
316
317pub(crate) trait ProcessManagerCatchUp: Send + Sync {
324 fn catch_up(&mut self) -> io::Result<Vec<CommandEnvelope>>;
326
327 fn save(&self) -> io::Result<()>;
329
330 #[allow(dead_code)] fn name(&self) -> &str;
333
334 fn dead_letter_path(&self) -> PathBuf;
336}
337
338impl<PM: ProcessManager> ProcessManagerCatchUp for ProcessManagerRunner<PM> {
339 fn catch_up(&mut self) -> io::Result<Vec<CommandEnvelope>> {
340 self.catch_up()
341 }
342
343 fn save(&self) -> io::Result<()> {
344 self.save()
345 }
346
347 fn name(&self) -> &str {
348 self.name()
349 }
350
351 fn dead_letter_path(&self) -> PathBuf {
352 self.dead_letter_path()
353 }
354}
355
356pub(crate) trait AggregateDispatcher: Send + Sync {
363 fn dispatch<'a>(
370 &'a self,
371 store: &'a crate::store::AggregateStore,
372 envelope: CommandEnvelope,
373 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), DispatchError>> + Send + 'a>>;
374}
375
376pub(crate) struct TypedDispatcher<A> {
381 _marker: std::marker::PhantomData<A>,
383}
384
385impl<A> TypedDispatcher<A> {
386 pub(crate) fn new() -> Self {
387 Self {
388 _marker: std::marker::PhantomData,
389 }
390 }
391}
392
393impl<A> AggregateDispatcher for TypedDispatcher<A>
394where
395 A: crate::aggregate::Aggregate,
396 A::Command: DeserializeOwned,
397{
398 fn dispatch<'a>(
399 &'a self,
400 store: &'a crate::store::AggregateStore,
401 envelope: CommandEnvelope,
402 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), DispatchError>> + Send + 'a>>
403 {
404 Box::pin(async move {
405 let cmd: A::Command =
407 serde_json::from_value(envelope.command).map_err(DispatchError::Deserialization)?;
408
409 let handle = store
410 .get::<A>(&envelope.instance_id)
411 .await
412 .map_err(DispatchError::Io)?;
413
414 handle
415 .execute(cmd, envelope.context)
416 .await
417 .map_err(|e| DispatchError::Execution(Box::new(e)))?;
418
419 Ok(())
420 })
421 }
422}
423
424#[derive(Debug, Serialize, Deserialize)]
428struct DeadLetterEntry {
429 envelope: CommandEnvelope,
431 error: String,
433 ts: u64,
435}
436
437pub(crate) fn append_dead_letter(
445 path: &std::path::Path,
446 envelope: CommandEnvelope,
447 error: &str,
448) -> io::Result<()> {
449 use std::io::Write;
450 let ts = std::time::SystemTime::UNIX_EPOCH
451 .elapsed()
452 .expect("system clock is before Unix epoch")
453 .as_secs();
454 let entry = DeadLetterEntry {
455 envelope,
456 error: error.to_string(),
457 ts,
458 };
459 let json = serde_json::to_string(&entry).map_err(io::Error::other)?;
460 if let Some(parent) = path.parent() {
463 std::fs::create_dir_all(parent)?;
464 }
465 let mut file = std::fs::OpenOptions::new()
466 .create(true)
467 .append(true)
468 .open(path)?;
469 writeln!(file, "{json}")?;
470 Ok(())
471}
472
473#[derive(Debug, Clone, Default)]
475pub struct ProcessManagerReport {
476 pub dispatched: usize,
478 pub dead_lettered: usize,
480}
481
482#[cfg(test)]
483pub(crate) mod test_fixtures {
484 use super::*;
485 use crate::command::CommandContext;
486
487 #[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
490 pub(crate) struct EchoSaga {
491 pub events_seen: u64,
493 }
494
495 impl ProcessManager for EchoSaga {
496 const NAME: &'static str = "echo-saga";
497
498 fn subscriptions(&self) -> &'static [&'static str] {
499 &["counter"]
500 }
501
502 fn react(
503 &mut self,
504 _aggregate_type: &str,
505 stream_id: &str,
506 event: &eventfold::Event,
507 ) -> Vec<CommandEnvelope> {
508 self.events_seen += 1;
509 vec![CommandEnvelope {
512 aggregate_type: "target".to_string(),
513 instance_id: stream_id.to_string(),
514 command: serde_json::json!({
515 "source_event_type": event.event_type,
516 }),
517 context: CommandContext::default()
518 .with_correlation_id(format!("echo-{}", self.events_seen)),
519 }]
520 }
521 }
522}
523
524#[cfg(test)]
525mod tests {
526 use super::*;
527 use test_fixtures::EchoSaga;
528
529 use crate::aggregate::test_fixtures::{Counter, CounterCommand};
530 use crate::command::CommandContext;
531 use crate::store::AggregateStore;
532
533 fn dummy_event() -> eventfold::Event {
534 eventfold::Event::new("Incremented", serde_json::json!(null))
535 }
536
537 #[test]
538 fn echo_saga_react_produces_envelope() {
539 let mut saga = EchoSaga::default();
540 let event = dummy_event();
541 let envelopes = saga.react("counter", "c-1", &event);
542
543 assert_eq!(envelopes.len(), 1);
544 assert_eq!(envelopes[0].aggregate_type, "target");
545 assert_eq!(envelopes[0].instance_id, "c-1");
546 assert_eq!(envelopes[0].command["source_event_type"], "Incremented");
547 assert_eq!(saga.events_seen, 1);
548 }
549
550 #[test]
551 fn echo_saga_subscriptions() {
552 let saga = EchoSaga::default();
553 assert_eq!(saga.subscriptions(), &["counter"]);
554 }
555
556 #[test]
557 fn checkpoint_serde_roundtrip() {
558 let mut checkpoint = ProcessManagerCheckpoint {
559 state: EchoSaga { events_seen: 3 },
560 cursors: HashMap::new(),
561 };
562 checkpoint.cursors.insert(
563 ("counter".to_string(), "c-1".to_string()),
564 CursorPosition {
565 offset: 256,
566 hash: "abc123".to_string(),
567 },
568 );
569
570 let json = serde_json::to_string(&checkpoint).expect("serialization should succeed");
571 let deserialized: ProcessManagerCheckpoint<EchoSaga> =
572 serde_json::from_str(&json).expect("deserialization should succeed");
573
574 assert_eq!(deserialized.state, checkpoint.state);
575 assert_eq!(deserialized.cursors, checkpoint.cursors);
576 }
577
578 async fn increment(store: &AggregateStore, id: &str) {
582 let handle = store.get::<Counter>(id).await.expect("get should succeed");
583 handle
584 .execute(CounterCommand::Increment, CommandContext::default())
585 .await
586 .expect("increment should succeed");
587 }
588
589 #[tokio::test]
590 async fn catch_up_produces_envelopes() {
591 let tmp = tempfile::tempdir().expect("failed to create tmpdir");
592 let store = AggregateStore::open(tmp.path())
593 .await
594 .expect("open should succeed");
595
596 increment(&store, "c-1").await;
597 increment(&store, "c-2").await;
598
599 let mut runner = ProcessManagerRunner::<EchoSaga>::new(store.layout().clone())
600 .expect("runner creation should succeed");
601 let envelopes = runner.catch_up().expect("catch_up should succeed");
602
603 assert_eq!(envelopes.len(), 2);
604 assert_eq!(runner.state().events_seen, 2);
605 }
606
607 #[tokio::test]
608 async fn cursors_advance_no_re_emit() {
609 let tmp = tempfile::tempdir().expect("failed to create tmpdir");
610 let store = AggregateStore::open(tmp.path())
611 .await
612 .expect("open should succeed");
613
614 increment(&store, "c-1").await;
615
616 let mut runner = ProcessManagerRunner::<EchoSaga>::new(store.layout().clone())
617 .expect("runner creation should succeed");
618 let first = runner.catch_up().expect("first catch_up should succeed");
619 assert_eq!(first.len(), 1);
620 runner.save().expect("save should succeed");
621
622 let second = runner.catch_up().expect("second catch_up should succeed");
624 assert!(second.is_empty());
625 }
626
627 #[tokio::test]
628 async fn checkpoint_persists_and_restores() {
629 let tmp = tempfile::tempdir().expect("failed to create tmpdir");
630 let store = AggregateStore::open(tmp.path())
631 .await
632 .expect("open should succeed");
633
634 increment(&store, "c-1").await;
635 increment(&store, "c-1").await;
636
637 {
639 let mut runner = ProcessManagerRunner::<EchoSaga>::new(store.layout().clone())
640 .expect("runner creation should succeed");
641 let envelopes = runner.catch_up().expect("catch_up should succeed");
642 assert_eq!(envelopes.len(), 2);
643 runner.save().expect("save should succeed");
644 }
645
646 let mut runner = ProcessManagerRunner::<EchoSaga>::new(store.layout().clone())
648 .expect("runner reload should succeed");
649 assert_eq!(runner.state().events_seen, 2);
650 let envelopes = runner.catch_up().expect("catch_up should succeed");
651 assert!(envelopes.is_empty());
652 }
653
654 #[tokio::test]
655 async fn rebuild_replays_all_events() {
656 let tmp = tempfile::tempdir().expect("failed to create tmpdir");
657 let store = AggregateStore::open(tmp.path())
658 .await
659 .expect("open should succeed");
660
661 increment(&store, "c-1").await;
662 increment(&store, "c-2").await;
663 increment(&store, "c-2").await;
664
665 let mut runner = ProcessManagerRunner::<EchoSaga>::new(store.layout().clone())
666 .expect("runner creation should succeed");
667 let first = runner.catch_up().expect("catch_up should succeed");
668 assert_eq!(first.len(), 3);
669 runner.save().expect("save should succeed");
670
671 let rebuilt = runner.rebuild().expect("rebuild should succeed");
672 assert_eq!(rebuilt.len(), 3);
673 assert_eq!(runner.state().events_seen, 3);
674 }
675
676 #[test]
677 fn dead_letter_append_creates_readable_jsonl() {
678 let tmp = tempfile::tempdir().expect("failed to create tmpdir");
679 let path = tmp.path().join("dead_letters.jsonl");
680
681 let envelope = CommandEnvelope {
682 aggregate_type: "target".to_string(),
683 instance_id: "t-1".to_string(),
684 command: serde_json::json!({"action": "test"}),
685 context: CommandContext::default(),
686 };
687
688 append_dead_letter(&path, envelope, "test error").expect("append should succeed");
689
690 let contents = std::fs::read_to_string(&path).expect("read should succeed");
691 let entry: DeadLetterEntry =
692 serde_json::from_str(contents.trim()).expect("should be valid JSON");
693 assert_eq!(entry.error, "test error");
694 assert_eq!(entry.envelope.aggregate_type, "target");
695 }
696}