Skip to main content

eventfold_es/
process_manager.rs

1//! Process managers: cross-aggregate workflow coordination.
2//!
3//! A process manager reacts to events from one or more aggregate streams and
4//! produces [`CommandEnvelope`]s that are dispatched to (potentially different)
5//! aggregates. They are structurally similar to projections -- they use
6//! checkpointed cursors for catch-up -- but produce side effects (commands)
7//! rather than read models.
8
9use std::collections::HashMap;
10use std::io;
11use std::path::PathBuf;
12
13use serde::de::DeserializeOwned;
14use serde::{Deserialize, Serialize};
15
16use crate::command::CommandEnvelope;
17use crate::error::DispatchError;
18use crate::projection::CursorPosition;
19use crate::storage::StreamLayout;
20
21/// A cross-aggregate workflow coordinator that reacts to events by producing
22/// commands.
23///
24/// Process managers subscribe to aggregate event streams (like projections)
25/// but instead of building a read model, they emit [`CommandEnvelope`]s that
26/// the store dispatches to target aggregates.
27///
28/// # Contract
29///
30/// - [`react`](ProcessManager::react) must be deterministic: given the same
31///   sequence of events, it must produce the same command envelopes.
32/// - Unknown event types should be silently ignored for forward compatibility.
33/// - State is checkpointed after all envelopes from a catch-up pass have been
34///   dispatched (or dead-lettered), ensuring crash safety via re-processing.
35pub trait ProcessManager: Default + Serialize + DeserializeOwned + Send + Sync + 'static {
36    /// Human-readable name, used as a directory name under `process_managers/`.
37    const NAME: &'static str;
38
39    /// The aggregate types this process manager subscribes to.
40    ///
41    /// Each entry must match an `Aggregate::AGGREGATE_TYPE` string.
42    fn subscriptions(&self) -> &'static [&'static str];
43
44    /// React to a single event from a subscribed stream.
45    ///
46    /// Returns zero or more [`CommandEnvelope`]s to dispatch.
47    ///
48    /// # Arguments
49    ///
50    /// * `aggregate_type` - Which aggregate produced the event.
51    /// * `stream_id` - The specific aggregate instance.
52    /// * `event` - The raw eventfold event.
53    fn react(
54        &mut self,
55        aggregate_type: &str,
56        stream_id: &str,
57        event: &eventfold::Event,
58    ) -> Vec<CommandEnvelope>;
59}
60
61/// Persisted state of a process manager including per-stream cursors.
62///
63/// Structurally identical to `ProjectionCheckpoint`, but kept separate to
64/// avoid coupling the two subsystems. The `cursors` map uses the same
65/// `"aggregate_type/instance_id"` key encoding.
66#[derive(Debug, Clone, Serialize, Deserialize)]
67struct ProcessManagerCheckpoint<PM> {
68    /// The process manager's current state.
69    state: PM,
70    /// Per-stream cursor positions, keyed by (aggregate_type, instance_id).
71    #[serde(with = "cursor_map")]
72    cursors: HashMap<(String, String), CursorPosition>,
73}
74
75impl<PM: Default> Default for ProcessManagerCheckpoint<PM> {
76    fn default() -> Self {
77        Self {
78            state: PM::default(),
79            cursors: HashMap::new(),
80        }
81    }
82}
83
84/// Custom serde for `HashMap<(String, String), CursorPosition>`.
85///
86/// Mirrors the identical module in `projection.rs`. JSON object keys must be
87/// strings, so we encode each `(aggregate_type, instance_id)` tuple as
88/// `"aggregate_type/instance_id"`.
89mod cursor_map {
90    use super::*;
91    use serde::ser::SerializeMap;
92    use serde::{Deserializer, Serializer};
93
94    const SEP: char = '/';
95
96    pub fn serialize<S>(
97        map: &HashMap<(String, String), CursorPosition>,
98        serializer: S,
99    ) -> Result<S::Ok, S::Error>
100    where
101        S: Serializer,
102    {
103        let mut ser_map = serializer.serialize_map(Some(map.len()))?;
104        for ((agg, id), cursor) in map {
105            let key = format!("{agg}{SEP}{id}");
106            ser_map.serialize_entry(&key, cursor)?;
107        }
108        ser_map.end()
109    }
110
111    pub fn deserialize<'de, D>(
112        deserializer: D,
113    ) -> Result<HashMap<(String, String), CursorPosition>, D::Error>
114    where
115        D: Deserializer<'de>,
116    {
117        let raw: HashMap<String, CursorPosition> = HashMap::deserialize(deserializer)?;
118        raw.into_iter()
119            .map(|(key, cursor)| {
120                let (agg, id) = key.split_once(SEP).ok_or_else(|| {
121                    serde::de::Error::custom(format!("cursor key missing '{SEP}' separator: {key}"))
122                })?;
123                Ok(((agg.to_string(), id.to_string()), cursor))
124            })
125            .collect()
126    }
127}
128
129// --- Checkpoint persistence helpers ---
130
131/// Save a process manager checkpoint atomically.
132///
133/// Writes to a temporary file then renames to `checkpoint.json` in `dir`.
134/// Creates `dir` if it does not exist.
135fn save_pm_checkpoint<PM: ProcessManager>(
136    dir: &std::path::Path,
137    checkpoint: &ProcessManagerCheckpoint<PM>,
138) -> io::Result<()> {
139    std::fs::create_dir_all(dir)?;
140    let path = dir.join("checkpoint.json");
141    let tmp_path = dir.join("checkpoint.json.tmp");
142    let json = serde_json::to_string_pretty(checkpoint).map_err(io::Error::other)?;
143    std::fs::write(&tmp_path, json)?;
144    std::fs::rename(&tmp_path, &path)?;
145    Ok(())
146}
147
148/// Load a process manager checkpoint from disk.
149///
150/// Returns `Ok(None)` if the file does not exist or is corrupt.
151fn load_pm_checkpoint<PM: ProcessManager>(
152    dir: &std::path::Path,
153) -> io::Result<Option<ProcessManagerCheckpoint<PM>>> {
154    let path = dir.join("checkpoint.json");
155    match std::fs::read_to_string(&path) {
156        Ok(content) => match serde_json::from_str(&content) {
157            Ok(checkpoint) => Ok(Some(checkpoint)),
158            Err(e) => {
159                tracing::warn!(
160                    path = %path.display(),
161                    error = %e,
162                    "corrupt process manager checkpoint, will rebuild"
163                );
164                Ok(None)
165            }
166        },
167        Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(None),
168        Err(e) => Err(e),
169    }
170}
171
172/// Delete the checkpoint file if it exists.
173#[allow(dead_code)] // Used by ProcessManagerRunner::rebuild, which is test-only.
174fn delete_pm_checkpoint(dir: &std::path::Path) -> io::Result<()> {
175    let path = dir.join("checkpoint.json");
176    match std::fs::remove_file(&path) {
177        Ok(()) => Ok(()),
178        Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
179        Err(e) => Err(e),
180    }
181}
182
183// --- ProcessManagerRunner ---
184
185/// Drives a process manager's catch-up loop, reading events from subscribed
186/// streams and collecting command envelopes for dispatch.
187///
188/// Unlike [`ProjectionRunner`](crate::projection::ProjectionRunner), `catch_up`
189/// does **not** persist the checkpoint. Instead it returns the collected
190/// envelopes, and the caller must invoke [`save`](ProcessManagerRunner::save)
191/// after all envelopes have been dispatched (or dead-lettered). This ensures
192/// crash safety: a crash mid-dispatch causes re-processing on restart.
193pub(crate) struct ProcessManagerRunner<PM: ProcessManager> {
194    checkpoint: ProcessManagerCheckpoint<PM>,
195    layout: StreamLayout,
196    checkpoint_dir: PathBuf,
197}
198
199impl<PM: ProcessManager> ProcessManagerRunner<PM> {
200    /// Create a new runner, loading an existing checkpoint from disk if
201    /// available.
202    ///
203    /// # Arguments
204    ///
205    /// * `layout` - The shared [`StreamLayout`] for resolving stream paths.
206    ///
207    /// # Errors
208    ///
209    /// Returns `io::Error` if reading an existing checkpoint file fails
210    /// (other than file-not-found).
211    pub(crate) fn new(layout: StreamLayout) -> io::Result<Self> {
212        let checkpoint_dir = layout.process_managers_dir().join(PM::NAME);
213        let checkpoint = load_pm_checkpoint::<PM>(&checkpoint_dir)?.unwrap_or_default();
214        Ok(Self {
215            checkpoint,
216            layout,
217            checkpoint_dir,
218        })
219    }
220
221    /// Catch up on all subscribed streams, returning command envelopes.
222    ///
223    /// Reads new events from each subscribed stream, invokes
224    /// [`ProcessManager::react`] for each event, and collects the resulting
225    /// envelopes. Updates in-memory cursors but does **not** persist the
226    /// checkpoint.
227    ///
228    /// # Errors
229    ///
230    /// Returns `io::Error` if listing streams or reading events fails.
231    pub(crate) fn catch_up(&mut self) -> io::Result<Vec<CommandEnvelope>> {
232        let _span = tracing::debug_span!("pm_catchup", pm_name = PM::NAME,).entered();
233
234        let mut envelopes = Vec::new();
235
236        for &aggregate_type in self.checkpoint.state.subscriptions() {
237            let instance_ids = self.layout.list_streams(aggregate_type)?;
238            for instance_id in &instance_ids {
239                let stream_dir = self.layout.stream_dir(aggregate_type, instance_id);
240                let reader = eventfold::EventReader::new(&stream_dir);
241                let key = (aggregate_type.to_owned(), instance_id.clone());
242                let offset = self
243                    .checkpoint
244                    .cursors
245                    .get(&key)
246                    .map(|c| c.offset)
247                    .unwrap_or(0);
248                // Skip streams with no log file yet.
249                let iter = match reader.read_from(offset) {
250                    Ok(iter) => iter,
251                    Err(e) if e.kind() == io::ErrorKind::NotFound => continue,
252                    Err(e) => return Err(e),
253                };
254                for result in iter {
255                    let (event, next_offset, line_hash) = result?;
256                    let produced = self
257                        .checkpoint
258                        .state
259                        .react(aggregate_type, instance_id, &event);
260                    envelopes.extend(produced);
261                    self.checkpoint.cursors.insert(
262                        key.clone(),
263                        CursorPosition {
264                            offset: next_offset,
265                            hash: line_hash,
266                        },
267                    );
268                }
269            }
270        }
271
272        Ok(envelopes)
273    }
274
275    /// Persist the current checkpoint to disk.
276    ///
277    /// Should be called after all envelopes from a catch-up pass have been
278    /// dispatched (or dead-lettered).
279    ///
280    /// # Errors
281    ///
282    /// Returns `io::Error` if writing the checkpoint fails.
283    pub(crate) fn save(&self) -> io::Result<()> {
284        save_pm_checkpoint::<PM>(&self.checkpoint_dir, &self.checkpoint)
285    }
286
287    /// Returns the current process manager state.
288    #[allow(dead_code)] // Used in integration tests.
289    pub(crate) fn state(&self) -> &PM {
290        &self.checkpoint.state
291    }
292
293    /// Delete checkpoint and replay all events from scratch.
294    ///
295    /// # Errors
296    ///
297    /// Returns `io::Error` if deleting the checkpoint or catching up fails.
298    #[allow(dead_code)] // Used in integration tests.
299    pub(crate) fn rebuild(&mut self) -> io::Result<Vec<CommandEnvelope>> {
300        delete_pm_checkpoint(&self.checkpoint_dir)?;
301        self.checkpoint = ProcessManagerCheckpoint::default();
302        self.catch_up()
303    }
304
305    /// Returns the path to this process manager's dead-letter log.
306    pub(crate) fn dead_letter_path(&self) -> PathBuf {
307        self.checkpoint_dir.join("dead_letters.jsonl")
308    }
309
310    /// Returns the process manager name.
311    #[allow(dead_code)] // Part of the ProcessManagerCatchUp interface.
312    pub(crate) fn name(&self) -> &str {
313        PM::NAME
314    }
315}
316
317// --- Type-erased trait for store integration ---
318
319/// Trait object interface for process manager runners.
320///
321/// Allows `run_process_managers` to iterate over heterogeneous process
322/// managers without knowing each concrete `PM` type.
323pub(crate) trait ProcessManagerCatchUp: Send + Sync {
324    /// Catch up on subscribed streams and return command envelopes.
325    fn catch_up(&mut self) -> io::Result<Vec<CommandEnvelope>>;
326
327    /// Persist the checkpoint to disk.
328    fn save(&self) -> io::Result<()>;
329
330    /// Returns the process manager name.
331    #[allow(dead_code)] // Not yet consumed outside tests.
332    fn name(&self) -> &str;
333
334    /// Returns the path to the dead-letter log file.
335    fn dead_letter_path(&self) -> PathBuf;
336}
337
338impl<PM: ProcessManager> ProcessManagerCatchUp for ProcessManagerRunner<PM> {
339    fn catch_up(&mut self) -> io::Result<Vec<CommandEnvelope>> {
340        self.catch_up()
341    }
342
343    fn save(&self) -> io::Result<()> {
344        self.save()
345    }
346
347    fn name(&self) -> &str {
348        self.name()
349    }
350
351    fn dead_letter_path(&self) -> PathBuf {
352        self.dead_letter_path()
353    }
354}
355
356// --- Dispatch infrastructure ---
357
358/// Type-erased dispatcher for a single aggregate type.
359///
360/// Each registered aggregate type gets a `TypedDispatcher<A>` that knows
361/// how to deserialize the command JSON and route it through the store.
362pub(crate) trait AggregateDispatcher: Send + Sync {
363    /// Dispatch a command envelope to the target aggregate.
364    ///
365    /// # Arguments
366    ///
367    /// * `store` - The aggregate store for looking up aggregate handles.
368    /// * `envelope` - The command envelope to dispatch.
369    fn dispatch<'a>(
370        &'a self,
371        store: &'a crate::store::AggregateStore,
372        envelope: CommandEnvelope,
373    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), DispatchError>> + Send + 'a>>;
374}
375
376/// Concrete dispatcher for aggregate type `A`.
377///
378/// Deserializes `CommandEnvelope.command` into `A::Command`, then calls
379/// `store.get::<A>().execute()`.
380pub(crate) struct TypedDispatcher<A> {
381    // PhantomData to carry the aggregate type without storing a value.
382    _marker: std::marker::PhantomData<A>,
383}
384
385impl<A> TypedDispatcher<A> {
386    pub(crate) fn new() -> Self {
387        Self {
388            _marker: std::marker::PhantomData,
389        }
390    }
391}
392
393impl<A> AggregateDispatcher for TypedDispatcher<A>
394where
395    A: crate::aggregate::Aggregate,
396    A::Command: DeserializeOwned,
397{
398    fn dispatch<'a>(
399        &'a self,
400        store: &'a crate::store::AggregateStore,
401        envelope: CommandEnvelope,
402    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), DispatchError>> + Send + 'a>>
403    {
404        Box::pin(async move {
405            // Deserialize the type-erased JSON into the concrete command type.
406            let cmd: A::Command =
407                serde_json::from_value(envelope.command).map_err(DispatchError::Deserialization)?;
408
409            let handle = store
410                .get::<A>(&envelope.instance_id)
411                .await
412                .map_err(DispatchError::Io)?;
413
414            handle
415                .execute(cmd, envelope.context)
416                .await
417                .map_err(|e| DispatchError::Execution(Box::new(e)))?;
418
419            Ok(())
420        })
421    }
422}
423
424// --- Dead-letter log ---
425
426/// An entry in the dead-letter log, recording a failed dispatch attempt.
427#[derive(Debug, Serialize, Deserialize)]
428struct DeadLetterEntry {
429    /// The command envelope that failed to dispatch.
430    envelope: CommandEnvelope,
431    /// Human-readable error message.
432    error: String,
433    /// Unix timestamp (seconds since epoch) of the failure.
434    ts: u64,
435}
436
437/// Append a single dead-letter entry to the JSONL log at `path`.
438///
439/// Creates the file if it does not exist. Each entry is a single JSON line.
440///
441/// # Errors
442///
443/// Returns `io::Error` if file I/O fails.
444pub(crate) fn append_dead_letter(
445    path: &std::path::Path,
446    envelope: CommandEnvelope,
447    error: &str,
448) -> io::Result<()> {
449    use std::io::Write;
450    let ts = std::time::SystemTime::UNIX_EPOCH
451        .elapsed()
452        .expect("system clock is before Unix epoch")
453        .as_secs();
454    let entry = DeadLetterEntry {
455        envelope,
456        error: error.to_string(),
457        ts,
458    };
459    let json = serde_json::to_string(&entry).map_err(io::Error::other)?;
460    // Ensure the parent directory exists (the PM checkpoint dir may not
461    // have been created yet if no catch_up save has occurred).
462    if let Some(parent) = path.parent() {
463        std::fs::create_dir_all(parent)?;
464    }
465    let mut file = std::fs::OpenOptions::new()
466        .create(true)
467        .append(true)
468        .open(path)?;
469    writeln!(file, "{json}")?;
470    Ok(())
471}
472
473/// Summary of a `run_process_managers` pass.
474#[derive(Debug, Clone, Default)]
475pub struct ProcessManagerReport {
476    /// Number of command envelopes successfully dispatched.
477    pub dispatched: usize,
478    /// Number of command envelopes written to dead-letter logs.
479    pub dead_lettered: usize,
480}
481
482#[cfg(test)]
483pub(crate) mod test_fixtures {
484    use super::*;
485    use crate::command::CommandContext;
486
487    /// A test process manager that reacts to counter events by emitting
488    /// a command envelope targeting a "target" aggregate.
489    #[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
490    pub(crate) struct EchoSaga {
491        /// Number of events processed (for testing state persistence).
492        pub events_seen: u64,
493    }
494
495    impl ProcessManager for EchoSaga {
496        const NAME: &'static str = "echo-saga";
497
498        fn subscriptions(&self) -> &'static [&'static str] {
499            &["counter"]
500        }
501
502        fn react(
503            &mut self,
504            _aggregate_type: &str,
505            stream_id: &str,
506            event: &eventfold::Event,
507        ) -> Vec<CommandEnvelope> {
508            self.events_seen += 1;
509            // Echo each event as a command to a "target" aggregate,
510            // using the source stream_id as the target instance_id.
511            vec![CommandEnvelope {
512                aggregate_type: "target".to_string(),
513                instance_id: stream_id.to_string(),
514                command: serde_json::json!({
515                    "source_event_type": event.event_type,
516                }),
517                context: CommandContext::default()
518                    .with_correlation_id(format!("echo-{}", self.events_seen)),
519            }]
520        }
521    }
522}
523
524#[cfg(test)]
525mod tests {
526    use super::*;
527    use test_fixtures::EchoSaga;
528
529    use crate::aggregate::test_fixtures::{Counter, CounterCommand};
530    use crate::command::CommandContext;
531    use crate::store::AggregateStore;
532
533    fn dummy_event() -> eventfold::Event {
534        eventfold::Event::new("Incremented", serde_json::json!(null))
535    }
536
537    #[test]
538    fn echo_saga_react_produces_envelope() {
539        let mut saga = EchoSaga::default();
540        let event = dummy_event();
541        let envelopes = saga.react("counter", "c-1", &event);
542
543        assert_eq!(envelopes.len(), 1);
544        assert_eq!(envelopes[0].aggregate_type, "target");
545        assert_eq!(envelopes[0].instance_id, "c-1");
546        assert_eq!(envelopes[0].command["source_event_type"], "Incremented");
547        assert_eq!(saga.events_seen, 1);
548    }
549
550    #[test]
551    fn echo_saga_subscriptions() {
552        let saga = EchoSaga::default();
553        assert_eq!(saga.subscriptions(), &["counter"]);
554    }
555
556    #[test]
557    fn checkpoint_serde_roundtrip() {
558        let mut checkpoint = ProcessManagerCheckpoint {
559            state: EchoSaga { events_seen: 3 },
560            cursors: HashMap::new(),
561        };
562        checkpoint.cursors.insert(
563            ("counter".to_string(), "c-1".to_string()),
564            CursorPosition {
565                offset: 256,
566                hash: "abc123".to_string(),
567            },
568        );
569
570        let json = serde_json::to_string(&checkpoint).expect("serialization should succeed");
571        let deserialized: ProcessManagerCheckpoint<EchoSaga> =
572            serde_json::from_str(&json).expect("deserialization should succeed");
573
574        assert_eq!(deserialized.state, checkpoint.state);
575        assert_eq!(deserialized.cursors, checkpoint.cursors);
576    }
577
578    // --- ProcessManagerRunner integration tests ---
579
580    /// Helper: execute a single `Increment` command on the given instance.
581    async fn increment(store: &AggregateStore, id: &str) {
582        let handle = store.get::<Counter>(id).await.expect("get should succeed");
583        handle
584            .execute(CounterCommand::Increment, CommandContext::default())
585            .await
586            .expect("increment should succeed");
587    }
588
589    #[tokio::test]
590    async fn catch_up_produces_envelopes() {
591        let tmp = tempfile::tempdir().expect("failed to create tmpdir");
592        let store = AggregateStore::open(tmp.path())
593            .await
594            .expect("open should succeed");
595
596        increment(&store, "c-1").await;
597        increment(&store, "c-2").await;
598
599        let mut runner = ProcessManagerRunner::<EchoSaga>::new(store.layout().clone())
600            .expect("runner creation should succeed");
601        let envelopes = runner.catch_up().expect("catch_up should succeed");
602
603        assert_eq!(envelopes.len(), 2);
604        assert_eq!(runner.state().events_seen, 2);
605    }
606
607    #[tokio::test]
608    async fn cursors_advance_no_re_emit() {
609        let tmp = tempfile::tempdir().expect("failed to create tmpdir");
610        let store = AggregateStore::open(tmp.path())
611            .await
612            .expect("open should succeed");
613
614        increment(&store, "c-1").await;
615
616        let mut runner = ProcessManagerRunner::<EchoSaga>::new(store.layout().clone())
617            .expect("runner creation should succeed");
618        let first = runner.catch_up().expect("first catch_up should succeed");
619        assert_eq!(first.len(), 1);
620        runner.save().expect("save should succeed");
621
622        // Second catch_up with no new events should produce nothing.
623        let second = runner.catch_up().expect("second catch_up should succeed");
624        assert!(second.is_empty());
625    }
626
627    #[tokio::test]
628    async fn checkpoint_persists_and_restores() {
629        let tmp = tempfile::tempdir().expect("failed to create tmpdir");
630        let store = AggregateStore::open(tmp.path())
631            .await
632            .expect("open should succeed");
633
634        increment(&store, "c-1").await;
635        increment(&store, "c-1").await;
636
637        // First runner: catch up and save.
638        {
639            let mut runner = ProcessManagerRunner::<EchoSaga>::new(store.layout().clone())
640                .expect("runner creation should succeed");
641            let envelopes = runner.catch_up().expect("catch_up should succeed");
642            assert_eq!(envelopes.len(), 2);
643            runner.save().expect("save should succeed");
644        }
645
646        // Second runner: should load checkpoint, no re-emit.
647        let mut runner = ProcessManagerRunner::<EchoSaga>::new(store.layout().clone())
648            .expect("runner reload should succeed");
649        assert_eq!(runner.state().events_seen, 2);
650        let envelopes = runner.catch_up().expect("catch_up should succeed");
651        assert!(envelopes.is_empty());
652    }
653
654    #[tokio::test]
655    async fn rebuild_replays_all_events() {
656        let tmp = tempfile::tempdir().expect("failed to create tmpdir");
657        let store = AggregateStore::open(tmp.path())
658            .await
659            .expect("open should succeed");
660
661        increment(&store, "c-1").await;
662        increment(&store, "c-2").await;
663        increment(&store, "c-2").await;
664
665        let mut runner = ProcessManagerRunner::<EchoSaga>::new(store.layout().clone())
666            .expect("runner creation should succeed");
667        let first = runner.catch_up().expect("catch_up should succeed");
668        assert_eq!(first.len(), 3);
669        runner.save().expect("save should succeed");
670
671        let rebuilt = runner.rebuild().expect("rebuild should succeed");
672        assert_eq!(rebuilt.len(), 3);
673        assert_eq!(runner.state().events_seen, 3);
674    }
675
676    #[test]
677    fn dead_letter_append_creates_readable_jsonl() {
678        let tmp = tempfile::tempdir().expect("failed to create tmpdir");
679        let path = tmp.path().join("dead_letters.jsonl");
680
681        let envelope = CommandEnvelope {
682            aggregate_type: "target".to_string(),
683            instance_id: "t-1".to_string(),
684            command: serde_json::json!({"action": "test"}),
685            context: CommandContext::default(),
686        };
687
688        append_dead_letter(&path, envelope, "test error").expect("append should succeed");
689
690        let contents = std::fs::read_to_string(&path).expect("read should succeed");
691        let entry: DeadLetterEntry =
692            serde_json::from_str(contents.trim()).expect("should be valid JSON");
693        assert_eq!(entry.error, "test error");
694        assert_eq!(entry.envelope.aggregate_type, "target");
695    }
696}