Skip to main content

mako_engine/
process.rs

1//! [`Process`] — ergonomic typed handle for a single MaKo process instance.
2//!
3//! Instead of threading `stream_id`, `workflow_id`, `tenant_id`, and a store
4//! reference through every call to the write path, bind them once into a
5//! `Process<W, S>` and call [`execute`] / [`state`] directly.
6//!
7//! # Starting a new process
8//!
9//! ```rust,ignore
10//! use mako_engine::{
11//!     event_store::InMemoryEventStore,
12//!     ids::TenantId,
13//!     process::Process,
14//!     version::WorkflowId,
15//! };
16//!
17//! let store = InMemoryEventStore::new();
18//! let process = Process::<MyWorkflow, _>::new(
19//!     store,
20//!     TenantId::new(),
21//!     WorkflowId::new("my-workflow", "FV2024-10-01"),
22//! );
23//!
24//! let envelopes = process.execute(my_command).await?;
25//! let current   = process.state().await?;
26//! ```
27//!
28//! # Resuming an existing process
29//!
30//! ```rust,ignore
31//! let process = Process::<MyWorkflow, _>::from_stream(
32//!     store, stream_id, process_id, tenant_id, workflow_id,
33//! );
34//! ```
35//!
36//! [`execute`]: Process::execute
37//! [`state`]: Process::state
38
39use std::marker::PhantomData;
40
41use crate::{
42    envelope::EventEnvelope,
43    error::EngineError,
44    event_store::EventStore,
45    ids::{ProcessId, ProcessIdentity, StreamId, TenantId},
46    snapshot::{Snapshot, SnapshotStore},
47    version::WorkflowId,
48    workflow::{
49        CommandContext, Workflow, execute_command, execute_command_and_collect,
50        execute_command_with_snapshot,
51    },
52};
53
54// ── Process ───────────────────────────────────────────────────────────────────
55
56/// An ergonomic typed handle for a single MaKo process instance.
57///
58/// `Process` bundles the [`StreamId`], [`ProcessId`], [`TenantId`],
59/// [`WorkflowId`], and event store into a single owned value so callers do not
60/// need to pass them on every command dispatch.
61///
62/// ## Generic parameters
63///
64/// - `W` — the [`Workflow`] implementation. In practice this is a zero-size
65///   marker struct; the type parameter carries the domain logic as associated
66///   types.
67/// - `S` — the [`EventStore`] backend. [`InMemoryEventStore`] is the default
68///   for tests; production deployments wrap a persistent backend in
69///   [`Arc`][std::sync::Arc] and use `Process<W, Arc<MyStore>>`.
70///
71/// ## Clone semantics
72///
73/// If `S: Clone` (e.g. [`InMemoryEventStore`] or `Arc<…>`), `Process` is also
74/// `Clone` and all clones share the same underlying storage.
75///
76/// [`InMemoryEventStore`]: crate::event_store::InMemoryEventStore
77#[allow(clippy::struct_field_names)] // `process_id` and `stream_id` are intentional: they
78// describe engine-layer concepts, not redundant prefixes.
79pub struct Process<W: Workflow, S: EventStore> {
80    stream_id: StreamId,
81    process_id: ProcessId,
82    tenant_id: TenantId,
83    workflow_id: WorkflowId,
84    store: S,
85    _phantom: PhantomData<fn() -> W>,
86}
87
88impl<W: Workflow, S: EventStore> Process<W, S> {
89    /// Create a fresh process instance.
90    ///
91    /// Generates a new [`ProcessId`] and derives the [`StreamId`] from
92    /// `tenant_id` and `process_id` (`process/{tenant_id}/{process_id}`).
93    /// Use this when starting a new MaKo process
94    /// (e.g. on receipt of the first inbound UTILMD Lieferbeginn).
95    #[must_use]
96    pub fn new(store: S, tenant_id: TenantId, workflow_id: WorkflowId) -> Self {
97        let process_id = ProcessId::new();
98        let stream_id = StreamId::for_process(tenant_id, &process_id);
99        Self {
100            stream_id,
101            process_id,
102            tenant_id,
103            workflow_id,
104            store,
105            _phantom: PhantomData,
106        }
107    }
108
109    /// Attach to an existing process stream.
110    ///
111    /// Use this on service restart or when routing an inbound message to an
112    /// already-running process whose identifiers were previously persisted.
113    #[must_use]
114    pub fn from_stream(
115        store: S,
116        stream_id: StreamId,
117        process_id: ProcessId,
118        tenant_id: TenantId,
119        workflow_id: WorkflowId,
120    ) -> Self {
121        Self {
122            stream_id,
123            process_id,
124            tenant_id,
125            workflow_id,
126            store,
127            _phantom: PhantomData,
128        }
129    }
130
131    /// The event stream identifier for this process.
132    #[must_use]
133    pub fn stream_id(&self) -> &StreamId {
134        &self.stream_id
135    }
136
137    /// The stable process identifier.
138    #[must_use]
139    pub fn process_id(&self) -> ProcessId {
140        self.process_id
141    }
142
143    /// The tenant that owns this process.
144    #[must_use]
145    pub fn tenant_id(&self) -> TenantId {
146        self.tenant_id
147    }
148
149    /// The workflow version under which this process was created.
150    #[must_use]
151    pub fn workflow_id(&self) -> &WorkflowId {
152        &self.workflow_id
153    }
154
155    /// Return a serializable value bundle of all four process identifiers.
156    ///
157    /// Persist this to a routing table (e.g. keyed by `conversation_id` or
158    /// `correlation_id`) so inbound messages can be routed to the correct
159    /// running process without the caller needing to manage four separate
160    /// fields.
161    ///
162    /// Use [`Process::from_identity`] to re-attach to the same process stream
163    /// on a subsequent request.
164    ///
165    /// ```rust,ignore
166    /// let id = process.identity();
167    /// routing_table.insert(conv_id, id.clone());
168    ///
169    /// // Later, on a subsequent inbound message:
170    /// let id = routing_table.get(&conv_id)?;
171    /// let process = Process::<MyWorkflow, _>::from_identity(store, id);
172    /// ```
173    #[must_use]
174    pub fn identity(&self) -> ProcessIdentity {
175        ProcessIdentity::new(self.process_id, self.tenant_id, self.workflow_id.clone())
176    }
177
178    /// Build a [`CommandContext`] for an inbound EDIFACT message dispatch.
179    ///
180    /// Derives a **deterministic** [`CorrelationId`] from `interchange_ref`
181    /// (UUID v5) so repeated dispatches of the same EDIFACT message — e.g.
182    /// AS4 retransmissions or idempotent REST replays — produce the same
183    /// correlation root. This makes EDIFACT-level idempotency observable in
184    /// distributed traces without any extra dedup logic at the engine level.
185    ///
186    /// Use this instead of [`Process::execute`] when you need to propagate
187    /// EDIFACT correlation metadata into the event stream. The returned
188    /// context is passed to [`Process::execute_with`].
189    ///
190    /// # Example
191    ///
192    /// ```rust,ignore
193    /// let process = ctx.resume::<GpkeSupplierChangeWorkflow>(identity);
194    /// let cmd_ctx = process.context_for_inbound(&utilmd_interchange_ref);
195    /// process.execute_with(command, cmd_ctx).await?;
196    /// ```
197    ///
198    /// [`CorrelationId`]: crate::ids::CorrelationId
199    #[must_use]
200    pub fn context_for_inbound(&self, interchange_ref: &str) -> CommandContext {
201        CommandContext::new(self.tenant_id, self.process_id, self.workflow_id.clone())
202            .with_correlation(crate::ids::CorrelationId::from_interchange_ref(
203                interchange_ref,
204            ))
205    }
206
207    /// Attach to an existing process stream from a previously persisted
208    /// [`ProcessIdentity`].
209    ///
210    /// This is the companion to [`Process::identity`]: look up the identity
211    /// from your routing table and call `from_identity` to get a live
212    /// `Process` handle bound to `store`.
213    #[must_use]
214    pub fn from_identity(store: S, identity: ProcessIdentity) -> Self {
215        Self {
216            stream_id: identity.stream_id().clone(),
217            process_id: identity.process_id,
218            tenant_id: identity.tenant_id,
219            workflow_id: identity.workflow_id,
220            store,
221            _phantom: PhantomData,
222        }
223    }
224
225    /// Return the number of events currently in the stream.
226    ///
227    /// Uses [`EventStore::stream_version`] for an efficient O(1) metadata
228    /// query on backends that override it. Falls back to loading all events
229    /// on stores that use the default implementation.
230    ///
231    /// Use this to decide whether to take a snapshot — e.g. with
232    /// [`Snapshot::should_take`]:
233    ///
234    /// ```rust,ignore
235    /// if Snapshot::should_take(process.event_count().await?, 100) {
236    ///     process.take_snapshot(&snap_store, 100).await?;
237    /// }
238    /// ```
239    ///
240    /// # Errors
241    ///
242    /// Returns [`EngineError::Store`] on storage failures.
243    ///
244    /// [`Snapshot::should_take`]: crate::snapshot::Snapshot::should_take
245    pub async fn event_count(&self) -> Result<u64, EngineError> {
246        self.store.stream_version(&self.stream_id).await
247    }
248
249    /// Dispatch `command` using a freshly generated [`CommandContext`].
250    ///
251    /// A new [`CorrelationId`] and [`ConversationId`] are auto-generated for
252    /// each call. To propagate tracing IDs from an inbound EDIFACT message
253    /// across a multi-step command chain, use [`execute_with`].
254    ///
255    /// # Errors
256    ///
257    /// - [`EngineError::VersionConflict`] when a concurrent writer raced ahead;
258    ///   retry by calling `execute` again.
259    /// - [`EngineError::Workflow`] when the workflow rejects the command.
260    /// - [`EngineError::Deserialization`] when a stored event cannot be decoded.
261    ///
262    /// [`CorrelationId`]: crate::ids::CorrelationId
263    /// [`ConversationId`]: crate::ids::ConversationId
264    /// [`execute_with`]: Process::execute_with
265    #[cfg_attr(
266        feature = "tracing",
267        tracing::instrument(skip(self, command), fields(
268            workflow = %self.workflow_id,
269            process_id = %self.process_id,
270            stream_id = %self.stream_id,
271        ))
272    )]
273    pub async fn execute(&self, command: W::Command) -> Result<Vec<EventEnvelope>, EngineError> {
274        let ctx = CommandContext::new(self.tenant_id, self.process_id, self.workflow_id.clone());
275        execute_command::<W, S>(&self.store, &self.stream_id, command, &ctx).await
276    }
277
278    /// Like [`execute`] but also returns the outbox messages produced by
279    /// [`Workflow::handle`], fully stamped with the real IDs from the persisted
280    /// event.
281    ///
282    /// The returned [`OutboxMessage`] entries have their `causation_event_id`
283    /// set to the `event_id` of the first persisted event — identical to what
284    /// `execute_and_enqueue` writes into the [`OutboxStore`] atomically.  This
285    /// makes the messages ready to pass directly to the EDIFACT renderer
286    /// without any manual ID stitching.
287    ///
288    /// Use this in E2E and integration tests that need to inspect or render
289    /// outbox messages after a command is persisted, without the awkward
290    /// `handle()` + `execute()` double invocation.
291    ///
292    /// [`execute`]: Process::execute
293    /// [`OutboxMessage`]: crate::outbox::OutboxMessage
294    /// [`OutboxStore`]: crate::outbox::OutboxStore
295    ///
296    /// # Errors
297    ///
298    /// Returns [`EngineError`] on storage or command handling failure.
299    pub async fn execute_and_collect(
300        &self,
301        command: W::Command,
302    ) -> Result<(Vec<EventEnvelope>, Vec<crate::outbox::OutboxMessage>), EngineError> {
303        let ctx = CommandContext::new(self.tenant_id, self.process_id, self.workflow_id.clone());
304        let (events, pending) =
305            execute_command_and_collect::<W, S>(&self.store, &self.stream_id, command, &ctx)
306                .await?;
307
308        // Stamp each PendingOutbox with the real IDs from the persisted event.
309        // Using the first event's event_id as causation_event_id mirrors what
310        // execute_and_enqueue writes into the OutboxStore atomically.
311        let causation_event_id = events
312            .first()
313            .map_or_else(crate::ids::EventId::new, |e| e.event_id);
314
315        let outbox = pending
316            .into_iter()
317            .map(|p| {
318                crate::outbox::OutboxMessage::new(
319                    self.stream_id.clone(),
320                    self.process_id,
321                    self.tenant_id,
322                    ctx.correlation_id,
323                    ctx.conversation_id,
324                    causation_event_id,
325                    p.message_type,
326                    p.recipient,
327                    p.payload,
328                )
329            })
330            .collect();
331
332        Ok((events, outbox))
333    }
334
335    /// Dispatch `command` with a caller-supplied [`CommandContext`].
336    ///
337    /// Use this when you need to thread a specific `correlation_id`,
338    /// `conversation_id`, or `causation_id` through the command. For example,
339    /// when dispatching an APERAK in response to a UTILMD, pass the
340    /// `conversation_id` from the UTILMD envelope so both exchanges are
341    /// traceable as a single business conversation.
342    ///
343    /// Build a context with:
344    ///
345    /// ```rust,ignore
346    /// let ctx = CommandContext::new(tenant_id, process_id, workflow_id)
347    ///     .with_causation(utilmd_event_id.into())  // From<EventId> for CausationId
348    ///     .with_conversation(utilmd_conversation_id);
349    /// process.execute_with(DispatchAperak { .. }, ctx).await?;
350    /// ```
351    ///
352    /// # Errors
353    ///
354    /// See [`Process::execute`] for the error contract.
355    ///
356    /// [`Process::execute`]: Process::execute
357    #[cfg_attr(
358        feature = "tracing",
359        tracing::instrument(skip(self, command, ctx), fields(
360            workflow = %self.workflow_id,
361            process_id = %self.process_id,
362            correlation_id = %ctx.correlation_id,
363        ))
364    )]
365    pub async fn execute_with(
366        &self,
367        command: W::Command,
368        ctx: CommandContext,
369    ) -> Result<Vec<EventEnvelope>, EngineError> {
370        execute_command::<W, S>(&self.store, &self.stream_id, command, &ctx).await
371    }
372
373    /// Dispatch `command` using a snapshot store to accelerate state reconstruction.
374    ///
375    /// Equivalent to [`Process::execute`] but starts replay from the most recent
376    /// snapshot rather than from sequence 0. For streams with thousands of events
377    /// and a snapshot within the last 100 events, this reduces replay cost from
378    /// O(n) to O(k) where k is the tail length since the last snapshot.
379    ///
380    /// When no snapshot exists or the schema version has changed, falls back to
381    /// full O(n) replay — identical in cost to [`Process::execute`].
382    ///
383    /// # Errors
384    ///
385    /// Same contract as [`Process::execute`].
386    pub async fn execute_snapshot<Snap>(
387        &self,
388        command: W::Command,
389        snap_store: &Snap,
390    ) -> Result<Vec<EventEnvelope>, EngineError>
391    where
392        W::State: serde::de::DeserializeOwned,
393        Snap: SnapshotStore,
394    {
395        let ctx = CommandContext::new(self.tenant_id, self.process_id, self.workflow_id.clone());
396        execute_command_with_snapshot::<W, S, Snap>(
397            &self.store,
398            snap_store,
399            &self.stream_id,
400            command,
401            &ctx,
402        )
403        .await
404    }
405
406    /// Reconstruct the current workflow state by replaying all persisted events.
407    ///
408    /// This is a **read-only** operation — it loads events but does not
409    /// acquire any write lock or check optimistic concurrency. Use it to:
410    ///
411    /// - Inspect process status in tests without dispatching a command.
412    /// - Build a diagnostic snapshot for observability or health checks.
413    /// - Implement query-side read models that need the full typed state.
414    ///
415    /// For production read models, prefer a [`Projection`] that is updated
416    /// incrementally rather than replaying the full stream on every query.
417    ///
418    /// To accelerate replay for long-lived streams, use
419    /// [`Process::state_with_snapshot`] instead.
420    ///
421    /// # Errors
422    ///
423    /// - [`EngineError::Store`] on storage failures.
424    /// - [`EngineError::Deserialization`] when a stored event cannot be decoded
425    ///   into `W::Event` (schema migration required).
426    ///
427    /// [`Projection`]: crate::projection::Projection
428    #[cfg_attr(
429        feature = "tracing",
430        tracing::instrument(skip(self), fields(
431            workflow = %self.workflow_id,
432            stream_id = %self.stream_id,
433        ))
434    )]
435    pub async fn state(&self) -> Result<W::State, EngineError> {
436        self.store
437            .fold_stream(&self.stream_id, 0, W::State::default(), |acc, env| {
438                let payload = W::upcast(&env.event_type, env.schema_version, env.payload)?;
439                let event: W::Event = serde_json::from_value(payload)
440                    .map_err(|e| EngineError::Deserialization(e.to_string()))?;
441                Ok(W::apply(acc, &event))
442            })
443            .await
444    }
445
446    // ── Snapshot-aware state reconstruction ──────────────────────────────────
447
448    /// Reconstruct current state using a snapshot as the starting point.
449    ///
450    /// Loads the most recent snapshot for this stream from `snap_store`. If
451    /// one exists, deserializes it into `W::State` and then replays only
452    /// events appended **after** the snapshot's `sequence_number`
453    /// (O(k) instead of O(n)). Falls back to full replay when no snapshot
454    /// exists.
455    ///
456    /// ## When to use
457    ///
458    /// Use this instead of [`Process::state`] for long-lived processes where
459    /// the event count grows large. Pair it with [`Process::take_snapshot`]
460    /// to keep the snapshot store current after each command.
461    ///
462    /// ## Schema version compatibility
463    ///
464    /// Snapshots whose `state` field cannot be deserialized into `W::State`
465    /// (e.g. after a breaking state schema change) will return
466    /// [`EngineError::Deserialization`]. In that case, fall back to
467    /// [`Process::state`] (full replay) and take a fresh snapshot.
468    ///
469    /// # Errors
470    ///
471    /// - [`EngineError::Store`] on snapshot or event storage failures.
472    /// - [`EngineError::Deserialization`] when the snapshot state or a tail
473    ///   event cannot be decoded.
474    #[cfg_attr(
475        feature = "tracing",
476        tracing::instrument(skip(self, snap_store), fields(
477            workflow = %self.workflow_id,
478            stream_id = %self.stream_id,
479        ))
480    )]
481    pub async fn state_with_snapshot<Snap: SnapshotStore>(
482        &self,
483        snap_store: &Snap,
484    ) -> Result<W::State, EngineError>
485    where
486        W::State: serde::de::DeserializeOwned,
487    {
488        let maybe_snap = snap_store.load(&self.stream_id).await?;
489
490        let (initial_state, from_sequence) = match maybe_snap {
491            Some(snap) => {
492                if snap.state_schema_version == W::state_schema_version() {
493                    let state = serde_json::from_value::<W::State>(snap.state)
494                        .map_err(|e| EngineError::Deserialization(e.to_string()))?;
495                    (state, snap.sequence_number)
496                } else {
497                    // Schema version mismatch: discard the stale snapshot and
498                    // fall back to full replay. The caller should take a fresh
499                    // snapshot after this reconstruction completes.
500                    tracing::warn!(
501                        expected = W::state_schema_version(),
502                        actual   = snap.state_schema_version,
503                        stream_id = %self.stream_id,
504                        "snapshot schema version mismatch; falling back to full replay"
505                    );
506                    (W::State::default(), 0)
507                }
508            }
509            None => (W::State::default(), 0),
510        };
511
512        let tail = self
513            .store
514            .fold_stream(&self.stream_id, from_sequence, initial_state, |acc, env| {
515                let payload = W::upcast(&env.event_type, env.schema_version, env.payload)?;
516                let event: W::Event = serde_json::from_value(payload)
517                    .map_err(|e| EngineError::Deserialization(e.to_string()))?;
518                Ok(W::apply(acc, &event))
519            })
520            .await?;
521        Ok(tail)
522    }
523
524    /// Reconstruct current state and save a snapshot if the event-count
525    /// threshold is reached.
526    ///
527    /// Checks [`Snapshot::should_take`] with `interval`. When at least
528    /// `interval` new events have accumulated since the last snapshot,
529    /// reconstructs state via full replay, serializes it, and calls
530    /// [`SnapshotStore::save`].
531    ///
532    /// Returns `true` when a snapshot was taken, `false` when the threshold
533    /// was not reached or `interval` is `0`.
534    ///
535    /// ## Integration pattern
536    ///
537    /// ```rust,ignore
538    /// // After every successful command:
539    /// process.execute(command).await?;
540    /// process.take_snapshot(&snap_store, 100).await?;
541    ///
542    /// // On the read path — O(k) instead of O(n):
543    /// let state = process.state_with_snapshot(&snap_store).await?;
544    /// ```
545    ///
546    /// # Errors
547    ///
548    /// - [`EngineError::Store`] on snapshot storage failures.
549    /// - [`EngineError::Serialization`] when the state cannot be JSON-encoded.
550    /// - [`EngineError::Deserialization`] when a stored event cannot be decoded.
551    ///
552    /// [`Snapshot::should_take`]: crate::snapshot::Snapshot::should_take
553    pub async fn take_snapshot<Snap: SnapshotStore>(
554        &self,
555        snap_store: &Snap,
556        interval: u64,
557    ) -> Result<bool, EngineError>
558    where
559        W::State: serde::Serialize,
560    {
561        let count = self.event_count().await?;
562        // Load the last snapshot (if any) to get its sequence number.
563        let last_snap_seq = snap_store
564            .load(&self.stream_id)
565            .await?
566            .map_or(0, |s| s.sequence_number);
567        if !Snapshot::should_take(count, last_snap_seq, interval) {
568            return Ok(false);
569        }
570        let state = self.state().await?;
571        let payload =
572            serde_json::to_value(&state).map_err(|e| EngineError::Serialization(e.to_string()))?;
573        let snap = Snapshot::new(
574            self.stream_id.clone(),
575            count,
576            W::state_schema_version(),
577            payload,
578        );
579        snap_store.save(&snap).await?;
580        Ok(true)
581    }
582
583    // ── Retry ─────────────────────────────────────────────────────────────────
584
585    /// Dispatch `command` with automatic retry on [`EngineError::VersionConflict`].
586    ///
587    /// A version conflict occurs when a concurrent writer appended events
588    /// between this process's read and its append attempt. On each conflict,
589    /// the engine **reloads the complete event stream from the store and
590    /// replays all events** to rebuild fresh state before re-handling the
591    /// command. Stale in-memory state from a previous attempt is never
592    /// carried forward — each retry always starts from a fully-rebuilt snapshot.
593    ///
594    /// Non-conflict errors (storage failures, workflow rejections) are
595    /// returned immediately without retrying.
596    ///
597    /// A freshly-generated [`CommandContext`] is pinned before the first
598    /// attempt and reused across all retries so all events share the same
599    /// correlation root regardless of retry count. Use
600    /// [`execute_with_retry_ctx`] to supply a specific context (e.g. one
601    /// derived from an inbound EDIFACT envelope).
602    ///
603    /// ## When to use
604    ///
605    /// Use for commands where two inbound EDIFACT messages for the same
606    /// process may arrive concurrently — e.g. a UTILMD and its APERAK
607    /// processed on separate async tasks.
608    ///
609    /// ## Command cloning
610    ///
611    /// `W::Command` must implement [`Clone`] so it can be resubmitted on
612    /// each retry without reconstructing it from scratch.
613    ///
614    /// # Errors
615    ///
616    /// - [`EngineError::VersionConflict`] when all `max_attempts` are
617    ///   exhausted without a successful append.
618    /// - Any non-conflict [`EngineError`] returned by the workflow or storage.
619    /// - [`EngineError::Store`] when `max_attempts` is `0`.
620    ///
621    /// [`execute_with_retry_ctx`]: Process::execute_with_retry_ctx
622    pub async fn execute_with_retry(
623        &self,
624        command: W::Command,
625        max_attempts: u32,
626    ) -> Result<Vec<EventEnvelope>, EngineError>
627    where
628        W::Command: Clone,
629    {
630        if max_attempts == 0 {
631            return Err(EngineError::store("max_attempts must be >= 1"));
632        }
633        // Pin context before the loop — all retry attempts share the same
634        // correlation root for consistent distributed tracing.
635        let ctx = CommandContext::new(self.tenant_id, self.process_id, self.workflow_id.clone());
636        self.execute_with_retry_ctx(command, ctx, max_attempts)
637            .await
638    }
639
640    /// Dispatch `command` with a caller-supplied [`CommandContext`] and
641    /// automatic retry on [`EngineError::VersionConflict`].
642    ///
643    /// Identical to [`execute_with_retry`] but threads the provided `ctx`
644    /// (including its `correlation_id`, `conversation_id`, and `causation_id`)
645    /// through every retry attempt. Use this when you need to propagate
646    /// tracing IDs from an inbound EDIFACT envelope across a retried command.
647    ///
648    /// # Example
649    ///
650    /// ```rust,ignore
651    /// let ctx = CommandContext::from_envelope(&utilmd_envelope, workflow_id);
652    /// process.execute_with_retry_ctx(HandleAperak { .. }, ctx, 3).await?;
653    /// ```
654    ///
655    /// # Errors
656    ///
657    /// See [`execute_with_retry`] for the error contract.
658    ///
659    /// # Panics
660    ///
661    /// Panics if `max_attempts` is 0 and the guard at the top of the function
662    /// is somehow bypassed (unreachable in practice).
663    ///
664    /// [`execute_with_retry`]: Process::execute_with_retry
665    pub async fn execute_with_retry_ctx(
666        &self,
667        command: W::Command,
668        ctx: CommandContext,
669        max_attempts: u32,
670    ) -> Result<Vec<EventEnvelope>, EngineError>
671    where
672        W::Command: Clone,
673    {
674        if max_attempts == 0 {
675            return Err(EngineError::store("max_attempts must be >= 1"));
676        }
677        let mut conflict_err: Option<EngineError> = None;
678        for attempt in 0..max_attempts {
679            // Each call to `execute_with` internally calls `fold_stream` from
680            // sequence 0 (or from the most recent snapshot if one is available).
681            // State is always freshly reconstructed from the event log on every
682            // attempt — there is no stale state carried forward between retries.
683            // Do NOT "optimise" this by caching state across attempts; doing so
684            // would allow a winning concurrent writer's events to be invisible
685            // to the retry, producing incorrect decisions and duplicate events.
686            match self.execute_with(command.clone(), ctx.clone()).await {
687                Ok(envs) => return Ok(envs),
688                Err(e) if e.is_version_conflict() => {
689                    conflict_err = Some(e);
690                    // Brief jittered sleep to reduce thundering-herd under
691                    // concurrent ERP commands targeting the same stream.
692                    // Delay = uniform random in [0, 10ms * attempt], capped at 80ms.
693                    // Uses the OS CSPRNG via rand so every retry gets independent
694                    // entropy regardless of stream-ID prefix.
695                    if attempt + 1 < max_attempts {
696                        let entropy: u64 = rand::random();
697                        let window_ms: u64 = (10 * (u64::from(attempt) + 1)).min(80);
698                        let jitter_ms = if window_ms == 0 {
699                            0
700                        } else {
701                            entropy % window_ms
702                        };
703                        tokio::time::sleep(std::time::Duration::from_millis(jitter_ms)).await;
704                    }
705                }
706                Err(e) => return Err(e), // non-retriable — propagate immediately
707            }
708        }
709        // At least one attempt ran (max_attempts >= 1), so conflict_err is Some.
710        Err(conflict_err.expect("loop ran at least once"))
711    }
712
713    /// Execute `command` and atomically co-persist any [`PendingOutbox`] messages
714    /// produced by [`Workflow::handle`].
715    ///
716    /// Like [`execute`], but requires `S: AtomicAppend`. When the workflow's
717    /// `handle` returns outbox messages alongside events, both are written to
718    /// storage in a single `WriteBatch`, eliminating the silent message-loss
719    /// window that would exist with separate writes.
720    ///
721    /// When the handle returns no outbox messages, this degenerates to a plain
722    /// `EventStore::append` (no performance cost).
723    ///
724    /// **Use this method instead of [`execute`] in all production code** that
725    /// needs outbox delivery guarantees. Plain `execute` silently drops any
726    /// outbox entries produced by the workflow handler — a crash between
727    /// `execute` and a subsequent manual `OutboxStore::enqueue` call would
728    /// lose the APERAK or UTILMD response permanently.
729    ///
730    /// For long event streams with periodic snapshots use
731    /// [`execute_and_enqueue_snapshot`] to reduce O(n) replay cost to O(k).
732    /// In concurrent environments where `VersionConflict` is expected, use
733    /// [`execute_and_enqueue_with_retry`] to retry automatically.
734    ///
735    /// # Example
736    ///
737    /// ```rust,ignore
738    /// use std::sync::Arc;
739    /// use mako_engine::process::Process;
740    /// use mako_engine::version::WorkflowId;
741    /// use mako_engine::ids::TenantId;
742    ///
743    /// // SlateDbStore implements AtomicAppend — required for execute_and_enqueue.
744    /// let store = Arc::new(SlateDbStore::open_in_memory().await?);
745    /// let tenant_id = TenantId::from_party_id("9904231000007");
746    /// let workflow_id = WorkflowId::new("gpke-supplier-change", fv);
747    ///
748    /// let process = Process::<GpkeSupplierChangeWorkflow, _>::new(
749    ///     Arc::clone(&store),
750    ///     tenant_id,
751    ///     workflow_id,
752    /// );
753    ///
754    /// // The workflow handle emits a PendingOutbox APERAK entry alongside the event.
755    /// // execute_and_enqueue writes both in one WriteBatch — no partial-write window.
756    /// let events = process
757    ///     .execute_and_enqueue(GpkeCommand::ReceiveUtilmd { pid: 55001, payload })
758    ///     .await?;
759    ///
760    /// assert!(!events.is_empty(), "at least one event was persisted");
761    ///
762    /// // The APERAK outbox entry is now visible to the outbox worker:
763    /// let pending = store.peek_outbox(tenant_id, 10).await?;
764    /// assert_eq!(pending.len(), 1, "APERAK enqueued atomically with the event");
765    /// ```
766    ///
767    /// # Errors
768    ///
769    /// - [`EngineError::VersionConflict`] — stream was modified concurrently;
770    ///   retry with [`execute_and_enqueue_with_retry`].
771    /// - [`EngineError::Workflow`] — the command was rejected by the workflow.
772    /// - [`EngineError::Store`] / [`EngineError::Outbox`] — storage failure.
773    ///
774    /// [`PendingOutbox`]: crate::outbox::PendingOutbox
775    /// [`execute`]: Process::execute
776    /// [`execute_and_enqueue_snapshot`]: Process::execute_and_enqueue_snapshot
777    /// [`execute_and_enqueue_with_retry`]: Process::execute_and_enqueue_with_retry
778    pub async fn execute_and_enqueue(
779        &self,
780        command: W::Command,
781    ) -> Result<Vec<EventEnvelope>, EngineError>
782    where
783        S: crate::event_store::AtomicAppend,
784    {
785        let ctx = CommandContext::new(self.tenant_id, self.process_id, self.workflow_id.clone());
786        crate::workflow::execute_command_atomic::<W, S>(&self.store, &self.stream_id, command, &ctx)
787            .await
788    }
789
790    /// Like [`execute_and_enqueue`] but uses a snapshot to accelerate replay.
791    ///
792    /// Atomically persists events and outbox entries while starting state
793    /// reconstruction from the most recent snapshot. For long streams with
794    /// periodic snapshots this reduces replay cost from O(n) to O(k).
795    ///
796    /// [`execute_and_enqueue`]: Process::execute_and_enqueue
797    ///
798    /// # Errors
799    ///
800    /// Returns [`EngineError`] on storage or command handling failure.
801    pub async fn execute_and_enqueue_snapshot<Snap>(
802        &self,
803        command: W::Command,
804        snap_store: &Snap,
805    ) -> Result<Vec<EventEnvelope>, EngineError>
806    where
807        W::State: serde::de::DeserializeOwned,
808        S: crate::event_store::AtomicAppend,
809        Snap: SnapshotStore,
810    {
811        let ctx = CommandContext::new(self.tenant_id, self.process_id, self.workflow_id.clone());
812        crate::workflow::execute_command_atomic_with_snapshot::<W, S, Snap>(
813            &self.store,
814            snap_store,
815            &self.stream_id,
816            command,
817            &ctx,
818        )
819        .await
820    }
821
822    /// Dispatch the compensation command returned by [`Workflow::on_deadline`].
823    ///
824    /// Reconstructs the current process state, calls
825    /// `W::on_deadline(deadline, &state)`, and — if the hook returns
826    /// `Some(command)` — executes it via [`Process::execute_and_enqueue`],
827    /// which atomically persists events **and** any outbox entries (e.g.
828    /// APERAK Ablehnung) produced by the compensation handler.
829    ///
830    /// Returns `Ok(Some(events))` when compensation fired, `Ok(None)` when
831    /// the hook returned `None` (deadline acknowledged as no-op).
832    ///
833    /// This is the canonical way to wire deadline firings to workflow
834    /// compensation logic.  Any [`WorkflowOutput::with_outbox`] entries
835    /// returned by `on_deadline` are guaranteed to be persisted atomically —
836    /// there is no window where the event is stored but the outbox entry is
837    /// lost.
838    ///
839    /// # Example
840    ///
841    /// ```rust,ignore
842    /// // In the deadline worker:
843    /// let overdue = ctx.deadline_store().due_now(50).await?;
844    /// for deadline in overdue {
845    ///     let identity = ctx.registry()
846    ///         .lookup(deadline.tenant_id(), &RegistryKey::from_process(deadline.process_id()))
847    ///         .await?
848    ///         .expect("process must be registered");
849    ///     let process = ctx.resume::<GpkeSupplierChangeWorkflow>(identity);
850    ///     if let Some(events) = process.execute_timeout(&deadline).await? {
851    ///         // compensation command was dispatched — APERAK Ablehnung enqueued
852    ///         tracing::info!(events = events.len(), "timeout compensation applied");
853    ///     }
854    ///     ctx.deadline_store().cancel(deadline.deadline_id()).await?;
855    /// }
856    /// ```
857    ///
858    /// # Errors
859    ///
860    /// Propagates [`EngineError::VersionConflict`], [`EngineError::Workflow`],
861    /// and storage errors from `execute_and_enqueue`. Use
862    /// [`execute_timeout_with_retry`] when `VersionConflict` retries are
863    /// required.
864    ///
865    /// [`Workflow::on_deadline`]: crate::workflow::Workflow::on_deadline
866    /// [`WorkflowOutput::with_outbox`]: crate::workflow::WorkflowOutput::with_outbox
867    /// [`execute_timeout_with_retry`]: Process::execute_timeout_with_retry
868    pub async fn execute_timeout(
869        &self,
870        deadline: &crate::deadline::Deadline,
871    ) -> Result<Option<Vec<EventEnvelope>>, EngineError>
872    where
873        S: crate::event_store::AtomicAppend,
874    {
875        let state = self.state().await?;
876        match W::on_deadline(deadline, &state) {
877            None => Ok(None),
878            Some(command) => self.execute_and_enqueue(command).await.map(Some),
879        }
880    }
881
882    /// Like [`execute_timeout`] but retries on [`VersionConflict`] up to
883    /// `max_attempts` times.
884    ///
885    /// Use this in production deadline workers where concurrent event appends
886    /// are expected.  Outbox entries (e.g. APERAK Ablehnung) produced by the
887    /// compensation handler are persisted atomically on every attempt.
888    ///
889    /// [`execute_timeout`]: Process::execute_timeout
890    /// [`VersionConflict`]: crate::error::EngineError::VersionConflict
891    ///
892    /// # Errors
893    ///
894    /// Returns [`EngineError`] on storage or command handling failure.
895    ///
896    /// # Panics
897    ///
898    /// Panics if the deadline produces a command but the retry loop somehow
899    /// exhausts without capturing an error (unreachable in practice).
900    pub async fn execute_timeout_with_retry(
901        &self,
902        deadline: &crate::deadline::Deadline,
903        max_attempts: u32,
904    ) -> Result<Option<Vec<EventEnvelope>>, EngineError>
905    where
906        S: crate::event_store::AtomicAppend,
907        W::Command: Clone,
908    {
909        let state = self.state().await?;
910        match W::on_deadline(deadline, &state) {
911            None => Ok(None),
912            Some(command) => self
913                .execute_and_enqueue_with_retry(command, max_attempts)
914                .await
915                .map(Some),
916        }
917    }
918
919    /// Like [`execute_and_enqueue`] but retries on [`crate::error::EngineError::VersionConflict`] up to
920    /// `max_attempts` times.
921    ///
922    /// [`execute_and_enqueue`]: Process::execute_and_enqueue
923    ///
924    /// # Errors
925    ///
926    /// Returns [`EngineError`] on storage or command handling failure.
927    ///
928    /// # Panics
929    ///
930    /// Panics if `max_attempts` is 0 and the guard is bypassed (unreachable).
931    pub async fn execute_and_enqueue_with_retry(
932        &self,
933        command: W::Command,
934        max_attempts: u32,
935    ) -> Result<Vec<EventEnvelope>, EngineError>
936    where
937        S: crate::event_store::AtomicAppend,
938        W::Command: Clone,
939    {
940        if max_attempts == 0 {
941            return Err(EngineError::store("max_attempts must be >= 1"));
942        }
943        let ctx = CommandContext::new(self.tenant_id, self.process_id, self.workflow_id.clone());
944        let mut conflict_err: Option<EngineError> = None;
945        for _ in 0..max_attempts {
946            match crate::workflow::execute_command_atomic::<W, S>(
947                &self.store,
948                &self.stream_id,
949                command.clone(),
950                &ctx,
951            )
952            .await
953            {
954                Ok(envs) => return Ok(envs),
955                Err(e) if e.is_version_conflict() => conflict_err = Some(e),
956                Err(e) => return Err(e),
957            }
958        }
959        Err(conflict_err.expect("loop ran at least once"))
960    }
961
962    /// Execute `command` atomically with outbox, then automatically snapshot
963    /// if the event-count threshold is reached.
964    ///
965    /// Combines [`execute_and_enqueue`] with [`take_snapshot`]: after a
966    /// successful write, checks whether `event_count % snapshot_interval == 0`
967    /// and, if so, serialises and saves a snapshot via `snap_store`.
968    ///
969    /// Pass `snapshot_interval = 0` to disable auto-snapshotting; the call
970    /// then behaves identically to [`execute_and_enqueue`].
971    ///
972    /// Returns `(events, snapshot_taken)` where `snapshot_taken` is `true` when
973    /// a snapshot was written this call.
974    ///
975    /// # Errors
976    ///
977    /// - [`EngineError::VersionConflict`] — stream was modified concurrently;
978    ///   retry with [`execute_and_enqueue_with_retry`].
979    /// - [`EngineError::Workflow`] — the command was rejected by the workflow.
980    /// - [`EngineError::Store`] / [`EngineError::Outbox`] — storage failure.
981    /// - [`EngineError::Serialization`] — state serialisation failed during snapshot.
982    ///
983    /// [`execute_and_enqueue`]: Process::execute_and_enqueue
984    /// [`take_snapshot`]: Process::take_snapshot
985    /// [`execute_and_enqueue_with_retry`]: Process::execute_and_enqueue_with_retry
986    pub async fn execute_and_enqueue_with_snapshot<Snap>(
987        &self,
988        command: W::Command,
989        snap_store: &Snap,
990        snapshot_interval: u64,
991    ) -> Result<(Vec<EventEnvelope>, bool), EngineError>
992    where
993        S: crate::event_store::AtomicAppend,
994        Snap: crate::snapshot::SnapshotStore,
995        W::State: serde::Serialize,
996    {
997        let events = self.execute_and_enqueue(command).await?;
998        let snapped = if snapshot_interval > 0 {
999            self.take_snapshot(snap_store, snapshot_interval).await?
1000        } else {
1001            false
1002        };
1003        Ok((events, snapped))
1004    }
1005
1006    /// Like [`execute_and_enqueue_with_snapshot`] but retries on
1007    /// [`crate::error::EngineError::VersionConflict`] up to `max_attempts` times.
1008    ///
1009    /// [`execute_and_enqueue_with_retry`]: Process::execute_and_enqueue_with_retry
1010    ///
1011    /// # Errors
1012    ///
1013    /// - [`EngineError::VersionConflict`] — stream was modified concurrently;
1014    ///   retry with [`execute_and_enqueue_with_snapshot_and_retry`].
1015    /// - [`EngineError::Workflow`] — the command was rejected by the workflow.
1016    /// - [`EngineError::Store`] / [`EngineError::Outbox`] — storage failure.
1017    /// - [`EngineError::Serialization`] — state serialisation failed during snapshot.
1018    ///
1019    /// # Panics
1020    ///
1021    /// Panics if `max_attempts` is 0 and the loop guard is bypassed (unreachable).
1022    ///
1023    /// [`execute_and_enqueue_with_snapshot`]: Process::execute_and_enqueue_with_snapshot
1024    /// [`execute_and_enqueue_with_snapshot_and_retry`]: Process::execute_and_enqueue_with_snapshot_and_retry
1025    pub async fn execute_and_enqueue_with_snapshot_and_retry<Snap>(
1026        &self,
1027        command: W::Command,
1028        max_attempts: u32,
1029        snap_store: &Snap,
1030        snapshot_interval: u64,
1031    ) -> Result<(Vec<EventEnvelope>, bool), EngineError>
1032    where
1033        S: crate::event_store::AtomicAppend,
1034        W::Command: Clone,
1035        Snap: crate::snapshot::SnapshotStore,
1036        W::State: serde::Serialize,
1037    {
1038        let events = self
1039            .execute_and_enqueue_with_retry(command, max_attempts)
1040            .await?;
1041        let snapped = if snapshot_interval > 0 {
1042            self.take_snapshot(snap_store, snapshot_interval).await?
1043        } else {
1044            false
1045        };
1046        Ok((events, snapped))
1047    }
1048}
1049
1050impl<W: Workflow, S: EventStore + Clone> Clone for Process<W, S> {
1051    fn clone(&self) -> Self {
1052        Self {
1053            stream_id: self.stream_id.clone(),
1054            process_id: self.process_id,
1055            tenant_id: self.tenant_id,
1056            workflow_id: self.workflow_id.clone(),
1057            store: self.store.clone(),
1058            _phantom: PhantomData,
1059        }
1060    }
1061}
1062
1063impl<W: Workflow, S: EventStore + std::fmt::Debug> std::fmt::Debug for Process<W, S> {
1064    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1065        f.debug_struct("Process")
1066            .field("stream_id", &self.stream_id)
1067            .field("process_id", &self.process_id)
1068            .field("workflow_id", &self.workflow_id)
1069            .finish_non_exhaustive()
1070    }
1071}
1072
1073// ── Unit tests ────────────────────────────────────────────────────────────────
1074
1075#[cfg(test)]
1076mod tests {
1077    use super::*;
1078    use crate::{
1079        envelope::NewEvent,
1080        error::WorkflowError,
1081        event_store::{EventStore, ExpectedVersion, InMemoryEventStore},
1082        ids::{ConversationId, CorrelationId, TenantId},
1083        snapshot::{InMemorySnapshotStore, NoopSnapshotStore},
1084        version::WorkflowId,
1085        workflow::{CommandPayload, EventPayload},
1086    };
1087
1088    // ── Minimal test workflow ─────────────────────────────────────────────────
1089
1090    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
1091    enum CounterEvent {
1092        Incremented { by: u32 },
1093        Reset,
1094    }
1095
1096    impl EventPayload for CounterEvent {
1097        fn event_type(&self) -> &'static str {
1098            match self {
1099                Self::Incremented { .. } => "Incremented",
1100                Self::Reset => "Reset",
1101            }
1102        }
1103    }
1104
1105    #[derive(Debug, Clone)]
1106    enum CounterCommand {
1107        Increment { by: u32 },
1108        Reset,
1109    }
1110
1111    impl CommandPayload for CounterCommand {}
1112
1113    #[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
1114    struct CounterState {
1115        value: u32,
1116    }
1117
1118    struct CounterWorkflow;
1119
1120    impl Workflow for CounterWorkflow {
1121        type State = CounterState;
1122        type Event = CounterEvent;
1123        type Command = CounterCommand;
1124
1125        fn apply(mut state: CounterState, event: &CounterEvent) -> CounterState {
1126            match event {
1127                CounterEvent::Incremented { by } => state.value += by,
1128                CounterEvent::Reset => state.value = 0,
1129            }
1130            state
1131        }
1132
1133        fn handle(
1134            _state: &CounterState,
1135            command: CounterCommand,
1136        ) -> Result<crate::workflow::WorkflowOutput<CounterEvent>, WorkflowError> {
1137            Ok(match command {
1138                CounterCommand::Increment { by } => vec![CounterEvent::Incremented { by }].into(),
1139                CounterCommand::Reset => vec![CounterEvent::Reset].into(),
1140            })
1141        }
1142    }
1143
1144    fn make_process() -> Process<CounterWorkflow, InMemoryEventStore> {
1145        Process::new(
1146            InMemoryEventStore::new(),
1147            TenantId::new(),
1148            WorkflowId::new("counter", "FV2024-10-01"),
1149        )
1150    }
1151
1152    // ── execute + state round-trip ────────────────────────────────────────────
1153
1154    #[tokio::test]
1155    async fn execute_then_state_round_trip() {
1156        let p = make_process();
1157
1158        p.execute(CounterCommand::Increment { by: 3 })
1159            .await
1160            .unwrap();
1161        p.execute(CounterCommand::Increment { by: 7 })
1162            .await
1163            .unwrap();
1164
1165        let state = p.state().await.unwrap();
1166        assert_eq!(state.value, 10);
1167    }
1168
1169    #[tokio::test]
1170    async fn event_count_matches_dispatched_commands() {
1171        let p = make_process();
1172
1173        assert_eq!(p.event_count().await.unwrap(), 0);
1174        p.execute(CounterCommand::Increment { by: 1 })
1175            .await
1176            .unwrap();
1177        assert_eq!(p.event_count().await.unwrap(), 1);
1178        p.execute(CounterCommand::Reset).await.unwrap();
1179        assert_eq!(p.event_count().await.unwrap(), 2);
1180    }
1181
1182    // ── identity round-trip ───────────────────────────────────────────────────
1183
1184    #[tokio::test]
1185    async fn identity_round_trip_via_from_identity() {
1186        let store = InMemoryEventStore::new();
1187        let p1 = Process::<CounterWorkflow, _>::new(
1188            store.clone(),
1189            TenantId::new(),
1190            WorkflowId::new("counter", "FV2024-10-01"),
1191        );
1192
1193        p1.execute(CounterCommand::Increment { by: 5 })
1194            .await
1195            .unwrap();
1196
1197        let identity = p1.identity();
1198        assert_eq!(*identity.stream_id(), *p1.stream_id());
1199        assert_eq!(identity.process_id, p1.process_id());
1200
1201        // Re-attach from identity and confirm state is visible.
1202        let p2 = Process::<CounterWorkflow, _>::from_identity(store, identity);
1203        let state = p2.state().await.unwrap();
1204        assert_eq!(state.value, 5);
1205    }
1206
1207    #[test]
1208    fn process_identity_is_serializable() {
1209        let p = make_process();
1210        let id = p.identity();
1211        let json = serde_json::to_string(&id).expect("ProcessIdentity must be serializable");
1212        let back: ProcessIdentity = serde_json::from_str(&json).unwrap();
1213        assert_eq!(*back.stream_id(), *id.stream_id());
1214        assert_eq!(back.process_id, id.process_id);
1215    }
1216
1217    // ── snapshot-accelerated state reconstruction ─────────────────────────────
1218
1219    #[tokio::test]
1220    async fn take_snapshot_and_state_with_snapshot() {
1221        let snap_store = InMemorySnapshotStore::new();
1222        let p = make_process();
1223
1224        // Dispatch 4 commands; the interval is 4.
1225        for i in 1u32..=4 {
1226            p.execute(CounterCommand::Increment { by: i })
1227                .await
1228                .unwrap();
1229        }
1230
1231        let took = p.take_snapshot(&snap_store, 4).await.unwrap();
1232        assert!(took, "snapshot must be taken at event_count = 4");
1233
1234        // Dispatch one more command after the snapshot.
1235        p.execute(CounterCommand::Increment { by: 10 })
1236            .await
1237            .unwrap();
1238
1239        let state = p.state_with_snapshot(&snap_store).await.unwrap();
1240        // 1+2+3+4 = 10, plus the final +10 = 20.
1241        assert_eq!(state.value, 20);
1242    }
1243
1244    #[tokio::test]
1245    async fn state_with_snapshot_falls_back_to_full_replay() {
1246        let p = make_process();
1247        p.execute(CounterCommand::Increment { by: 42 })
1248            .await
1249            .unwrap();
1250
1251        // NoopSnapshotStore always returns None → full replay.
1252        let state = p.state_with_snapshot(&NoopSnapshotStore).await.unwrap();
1253        assert_eq!(state.value, 42);
1254    }
1255
1256    #[tokio::test]
1257    async fn take_snapshot_skipped_between_intervals() {
1258        let snap_store = InMemorySnapshotStore::new();
1259        let p = make_process();
1260
1261        p.execute(CounterCommand::Increment { by: 1 })
1262            .await
1263            .unwrap();
1264        p.execute(CounterCommand::Increment { by: 1 })
1265            .await
1266            .unwrap();
1267        p.execute(CounterCommand::Increment { by: 1 })
1268            .await
1269            .unwrap();
1270
1271        // 3 events, interval = 4 → must not take.
1272        let took = p.take_snapshot(&snap_store, 4).await.unwrap();
1273        assert!(!took);
1274        assert!(snap_store.is_empty().await);
1275    }
1276
1277    /// Regression test for when a persisted snapshot carries a
1278    /// `state_schema_version` that does not match the current workflow's
1279    /// `state_schema_version()`, `state_with_snapshot` must silently discard
1280    /// the stale snapshot and fall back to full event replay.
1281    ///
1282    /// This guards against silent data corruption when state layout changes
1283    /// incompatibly — e.g. after adding a new required field to `CounterState`.
1284    #[tokio::test]
1285    async fn stale_snapshot_schema_version_falls_back_to_full_replay() {
1286        // CounterWorkflow uses state_schema_version() == 1 (the default).
1287        // We simulate a "migrated" workflow by injecting a snapshot whose
1288        // state_schema_version is bumped to 99, representing a schema that the
1289        // current workflow code does not understand.
1290        let snap_store = InMemorySnapshotStore::new();
1291        let p = make_process();
1292
1293        // Dispatch some events so there is something to replay.
1294        p.execute(CounterCommand::Increment { by: 5 })
1295            .await
1296            .unwrap();
1297        p.execute(CounterCommand::Increment { by: 3 })
1298            .await
1299            .unwrap();
1300
1301        // Manually save a stale snapshot with schema_version = 99.
1302        // The state payload is intentionally wrong — it should never be used.
1303        let stale = crate::snapshot::Snapshot::new(
1304            p.stream_id().clone(),
1305            2,                                    // sequence_number after 2 events
1306            99,                                   // ← unknown schema version
1307            serde_json::json!({ "value": 9999 }), // ← wrong value; must not be read
1308        );
1309        snap_store.save(&stale).await.unwrap();
1310
1311        // state_with_snapshot must discard the stale snapshot and replay all
1312        // events from sequence 0, producing the correct state (5+3=8).
1313        let current_state = p.state_with_snapshot(&snap_store).await.unwrap();
1314        assert_eq!(
1315            current_state.value, 8,
1316            "stale snapshot must be discarded; full replay must yield correct state"
1317        );
1318    }
1319
1320    // ── execute_with_retry ────────────────────────────────────────────────────
1321
1322    #[tokio::test]
1323    async fn execute_with_retry_succeeds_on_first_attempt() {
1324        let p = make_process();
1325        let envs = p
1326            .execute_with_retry(CounterCommand::Increment { by: 99 }, 3)
1327            .await
1328            .unwrap();
1329        assert_eq!(envs.len(), 1);
1330        assert_eq!(p.state().await.unwrap().value, 99);
1331    }
1332
1333    #[tokio::test]
1334    async fn execute_with_retry_returns_err_on_zero_attempts() {
1335        let p = make_process();
1336        let err = p
1337            .execute_with_retry(CounterCommand::Increment { by: 1 }, 0)
1338            .await
1339            .unwrap_err();
1340        assert!(
1341            matches!(err, EngineError::Store { ref message, .. } if message.contains("max_attempts")),
1342            "expected Store error about max_attempts, got: {err:?}",
1343        );
1344    }
1345
1346    // ── execute_with (explicit context) ──────────────────────────────────────
1347
1348    #[tokio::test]
1349    async fn execute_with_explicit_context_propagates_ids() {
1350        use crate::ids::{ConversationId, CorrelationId};
1351        let p = make_process();
1352
1353        let corr = CorrelationId::new();
1354        let conv = ConversationId::new();
1355        let ctx = CommandContext::new(p.tenant_id(), p.process_id(), p.workflow_id().clone())
1356            .with_correlation(corr)
1357            .with_conversation(conv);
1358
1359        let envs = p
1360            .execute_with(CounterCommand::Increment { by: 1 }, ctx)
1361            .await
1362            .unwrap();
1363        assert_eq!(envs.len(), 1);
1364        assert_eq!(envs[0].correlation_id, corr);
1365        assert_eq!(envs[0].conversation_id, conv);
1366    }
1367
1368    // ── upcast / schema-migration ─────────────────────────────────────────────
1369    //
1370    // A v2 workflow adds a `label: String` field to its single event.
1371    // Old (v1) events stored without `label` must be migrated by `upcast`.
1372    //
1373    // `#[serde(untagged)]` is used so the serialized payload is the flat
1374    // inner struct `{"count": N, "label": "..."}` rather than the externally-
1375    // tagged `{"Tagged": {"count": N}}` form.  This matches the common
1376    // real-world pattern where each `EventPayload::event_type()` discriminant
1377    // IS the variant selector stored in the envelope, and the payload holds
1378    // only the fields.
1379
1380    #[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
1381    struct TagState {
1382        total: u32,
1383        last_label: String,
1384    }
1385
1386    /// v1 schema (legacy): `{ "count": u32 }` — `label` field absent.
1387    /// v2 schema: `{ "count": u32, "label": String }`.
1388    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
1389    #[serde(untagged)]
1390    enum TagEvent {
1391        Tagged { count: u32, label: String },
1392    }
1393
1394    impl EventPayload for TagEvent {
1395        fn event_type(&self) -> &'static str {
1396            "Tagged"
1397        }
1398        fn schema_version(&self) -> u32 {
1399            2
1400        }
1401    }
1402
1403    #[derive(Debug, Clone)]
1404    struct TagCommand {
1405        count: u32,
1406        label: String,
1407    }
1408    impl CommandPayload for TagCommand {}
1409
1410    struct TagWorkflow;
1411
1412    impl Workflow for TagWorkflow {
1413        type State = TagState;
1414        type Event = TagEvent;
1415        type Command = TagCommand;
1416
1417        fn apply(mut state: TagState, event: &TagEvent) -> TagState {
1418            let TagEvent::Tagged { count, label } = event;
1419            state.total += count;
1420            state.last_label = label.clone();
1421            state
1422        }
1423
1424        fn handle(
1425            _state: &TagState,
1426            cmd: TagCommand,
1427        ) -> Result<crate::workflow::WorkflowOutput<TagEvent>, WorkflowError> {
1428            Ok(vec![TagEvent::Tagged {
1429                count: cmd.count,
1430                label: cmd.label,
1431            }]
1432            .into())
1433        }
1434
1435        /// Migrate v1 `Tagged` events (missing `label`) to v2.
1436        ///
1437        /// v1 payload: `{"count": N}` (no `label` field)
1438        /// v2 payload: `{"count": N, "label": ""}` (default empty string)
1439        ///
1440        /// Because the event uses `#[serde(untagged)]`, the envelope payload
1441        /// is the flat struct — variant discrimination comes from `event_type`.
1442        fn upcast(
1443            event_type: &str,
1444            from_version: u32,
1445            mut payload: serde_json::Value,
1446        ) -> Result<serde_json::Value, EngineError> {
1447            if event_type == "Tagged"
1448                && from_version == 1
1449                && let Some(obj) = payload.as_object_mut()
1450            {
1451                obj.entry("label")
1452                    .or_insert_with(|| serde_json::Value::String(String::new()));
1453            }
1454            Ok(payload)
1455        }
1456    }
1457
1458    /// Inject a raw v1 event (no `label` field) directly into the store and
1459    /// confirm that `state()` replays it correctly via `upcast`.
1460    #[tokio::test]
1461    async fn upcast_v1_event_adds_default_label() {
1462        let store = InMemoryEventStore::new();
1463        let p = Process::<TagWorkflow, _>::new(
1464            store.clone(), // shares the underlying Arc<RwLock<_>>
1465            TenantId::new(),
1466            WorkflowId::new("tag", "FV2025-10-01"),
1467        );
1468
1469        // v1 payload: flat struct fields, no `label` (untagged serde repr).
1470        let v1_payload = serde_json::json!({ "count": 7 });
1471        let raw = NewEvent {
1472            correlation_id: CorrelationId::new(),
1473            causation_id: None,
1474            conversation_id: ConversationId::new(),
1475            process_id: p.process_id(),
1476            tenant_id: p.tenant_id(),
1477            workflow_id: p.workflow_id().clone(),
1478            event_type: "Tagged".into(),
1479            schema_version: 1, // ← schema_version 1 (old format)
1480            payload: v1_payload,
1481        };
1482        store
1483            .append(p.stream_id(), ExpectedVersion::Any, &[raw])
1484            .await
1485            .expect("inject v1 event");
1486
1487        // Replay via the v2 workflow — `upcast` must fill in `label: ""`.
1488        let state = p.state().await.expect("state must replay without error");
1489        assert_eq!(state.total, 7, "count must be accumulated");
1490        assert_eq!(
1491            state.last_label, "",
1492            "missing v1 label must default to empty string"
1493        );
1494
1495        // Also verify that a normally-executed v2 event round-trips correctly.
1496        p.execute(TagCommand {
1497            count: 3,
1498            label: "hello".into(),
1499        })
1500        .await
1501        .unwrap();
1502        let state2 = p.state().await.unwrap();
1503        assert_eq!(state2.total, 10);
1504        assert_eq!(state2.last_label, "hello");
1505    }
1506}