eventfold-es 0.2.0

Embedded event-sourcing framework built on eventfold
Documentation
# Phase 3: Process Managers

> Ref: [design.md]../design.md -- sections 5.3, 8
> Depends on: Phase 2

## Goal

Add cross-aggregate workflow coordination. After this phase, users can define process
managers that react to events in one aggregate by dispatching commands to another:

```rust
let store = AggregateStore::builder(tmp.path())
    .process_manager::<ReservationSaga>()
    .open()
    .await?;

// Place an order -- the saga reacts by reserving inventory.
let order = store.get::<Order>("ord-1").await?;
order.execute(OrderCommand::Place { sku: "W-1", qty: 2 }, ctx).await?;

// The saga dispatched a command to the Inventory aggregate.
let inv = store.get::<Inventory>("inv-W-1").await?;
let inv_state = inv.state().await?;
assert_eq!(inv_state.reserved, 2);
```

Process managers maintain persisted state and cursors (like projections) but produce
side effects (commands) rather than read models. Failed command dispatches are captured
in a dead-letter log for manual inspection.

---

## Prerequisites

- Phase 2 complete (projections, AggregateStoreBuilder, catch-up loop).
- The catch-up loop infrastructure from Phase 2 is reusable for process managers.

---

## Deliverables

### Step 1: CommandEnvelope + ProcessManager trait

**What**: Define the types that process managers produce and the trait itself.

**File**: `src/command.rs` (extend), `src/process_manager.rs` (new)

**`CommandEnvelope`**:

```
struct CommandEnvelope {
    pub aggregate_type: String,
    pub instance_id: String,
    pub command: Value,
    pub context: CommandContext,
}
```

- Carries a type-erased command payload (`serde_json::Value`) because the process
  manager may target aggregate types different from the one it subscribes to.
- The dispatch layer deserializes `command` into the concrete `A::Command` using the
  `aggregate_type` to look up the target aggregate.

**`ProcessManager` trait**:

```
trait ProcessManager: Default + Serialize + DeserializeOwned + Send + Sync + 'static {
    const NAME: &'static str;
    fn subscriptions(&self) -> &'static [&'static str];
    fn react(
        &mut self,
        aggregate_type: &str,
        stream_id: &str,
        event: &eventfold::Event,
    ) -> Vec<CommandEnvelope>;
}
```

**Tests**:

- Define a test process manager (`EchoSaga`) in `#[cfg(test)]` that reacts to events
  by producing a command envelope.
- Unit test: given an event, `react` returns the expected envelopes.
- `CommandEnvelope` serialization round-trip.

**Acceptance**:

- `cargo build` + `cargo test` + clippy clean.
- Types re-exported from `lib.rs`.

---

### Step 2: Process manager persistence + catch-up

**What**: Reuse the cursor/checkpoint infrastructure from projections for process
managers. Add the catch-up loop that also dispatches commands.

**File**: `src/process_manager.rs` (extend)

**`ProcessManagerRunner<PM: ProcessManager>`** (internal):

- Structurally similar to `ProjectionRunner<P>`: holds checkpoint with state + cursors,
  discovers streams, reads from cursors, calls `PM::react`.
- After each `react` call, collected `CommandEnvelope`s are returned to the caller
  (the store) for dispatch.
- Checkpoint is saved after each successful catch-up pass (after all envelopes for
  that pass have been dispatched).

**Checkpoint format**: same as projections but stored under
`<base_dir>/process_managers/<NAME>/checkpoint.json`.

**Tests** (integration):

- Catch-up produces the expected command envelopes.
- Cursors advance; re-running catch-up does not re-emit old envelopes.
- Checkpoint persists and restores.
- Rebuild replays all events.

**Acceptance**:

- `cargo test` passes.
- Clippy clean.

---

### Step 3: Command dispatch + dead-letter log

**What**: The store dispatches command envelopes produced by process managers and
records failures.

**File**: `src/store.rs` (extend), `src/process_manager.rs` (extend)

**Dispatch flow**:

1. `store.projection::<P>()` was the lazy catch-up trigger for projections. Process
   managers use a similar approach: `store.run_process_managers()` catches up all
   registered process managers and dispatches their envelopes.
2. For each `CommandEnvelope`:
   - Look up the target aggregate type in a type registry.
   - Deserialize `envelope.command` into the concrete `A::Command`.
   - Call `store.get::<A>(&envelope.instance_id).await?.execute(cmd, ctx).await`.
3. If dispatch fails (deserialization error, command rejection, I/O), the envelope is
   written to a dead-letter log.

**Type registry**:

- The `AggregateStoreBuilder` gains a method: `.aggregate_type::<A>()` which registers
  `A::AGGREGATE_TYPE` string with a dispatch function
  `fn(store, envelope) -> Result<()>`.
- This is necessary because process managers produce type-erased envelopes; the store
  needs to know how to deserialize and route them.

**Dead-letter log**:

- Stored at `<base_dir>/process_managers/<NAME>/dead_letters.jsonl`.
- Each entry: `{ "envelope": ..., "error": "...", "ts": ... }`.
- Append-only, not an `eventfold` log (just plain JSONL for simplicity).

**Tests** (integration):

- End-to-end: event in aggregate A triggers process manager, which dispatches command
  to aggregate B, whose state updates accordingly.
- Failed dispatch (command rejected) writes to dead-letter log.
- Dead-letter log is human-readable JSONL.
- Process manager does not re-dispatch on subsequent catch-up (cursors advanced past
  the triggering event).

**Acceptance**:

- `cargo test` passes.
- Clippy clean.

---

### Step 4: AggregateStore integration

**What**: Wire process managers into the builder and provide a query API.

**Files**: `src/store.rs` (extend), `src/lib.rs` (re-exports)

**Changes to `AggregateStoreBuilder`**:

- `.process_manager::<PM>()` registers a process manager.
- `.aggregate_type::<A>()` registers a dispatchable aggregate type (required for
  process manager command routing).

**Changes to `AggregateStore`**:

- `run_process_managers(&self) -> Result<ProcessManagerReport>`:
  catches up all process managers, dispatches envelopes, returns a report of
  successes and dead-letters.
- The report is a simple struct:
  `ProcessManagerReport { dispatched: usize, dead_lettered: usize }`.

**Tests** (integration):

- Full saga round-trip: OrderPlaced -> ReservationSaga -> ReserveInventory.
- Multiple process managers coexist.
- `run_process_managers` is idempotent when no new events exist.
- Store restart recovers process manager state; no duplicate dispatches.

**Acceptance**:

- `cargo test` passes (Phase 1 + 2 + 3, no regressions).
- `cargo clippy -- -D warnings` clean.
- `cargo fmt --check` clean.
- New public items have doc comments.
- Re-exports updated: `ProcessManager`, `CommandEnvelope`,
  `ProcessManagerReport`.

---

## End-of-phase checklist

- [ ] `cargo build` -- no errors, no warnings
- [ ] `cargo test` -- all tests pass (Phase 1 + 2 + 3)
- [ ] `cargo clippy -- -D warnings` -- clean
- [ ] `cargo fmt --check` -- clean
- [ ] All public items have doc comments
- [ ] No `unwrap()` in library code
- [ ] Public API additions:
      `ProcessManager`, `CommandEnvelope`, `ProcessManagerReport`,
      `AggregateStoreBuilder::process_manager`,
      `AggregateStoreBuilder::aggregate_type`,
      `AggregateStore::run_process_managers`