mako_engine/process.rs
1//! [`Process`] — ergonomic typed handle for a single MaKo process instance.
2//!
3//! Instead of threading `stream_id`, `workflow_id`, `tenant_id`, and a store
4//! reference through every call to the write path, bind them once into a
5//! `Process<W, S>` and call [`execute`] / [`state`] directly.
6//!
7//! # Starting a new process
8//!
9//! ```rust,ignore
10//! use mako_engine::{
11//! event_store::InMemoryEventStore,
12//! ids::TenantId,
13//! process::Process,
14//! version::WorkflowId,
15//! };
16//!
17//! let store = InMemoryEventStore::new();
18//! let process = Process::<MyWorkflow, _>::new(
19//! store,
20//! TenantId::new(),
21//! WorkflowId::new("my-workflow", "FV2024-10-01"),
22//! );
23//!
24//! let envelopes = process.execute(my_command).await?;
25//! let current = process.state().await?;
26//! ```
27//!
28//! # Resuming an existing process
29//!
30//! ```rust,ignore
31//! let process = Process::<MyWorkflow, _>::from_stream(
32//! store, stream_id, process_id, tenant_id, workflow_id,
33//! );
34//! ```
35//!
36//! [`execute`]: Process::execute
37//! [`state`]: Process::state
38
39use std::marker::PhantomData;
40
41use crate::{
42 envelope::EventEnvelope,
43 error::EngineError,
44 event_store::EventStore,
45 ids::{ProcessId, ProcessIdentity, StreamId, TenantId},
46 snapshot::{Snapshot, SnapshotStore},
47 version::WorkflowId,
48 workflow::{
49 CommandContext, Workflow, execute_command, execute_command_and_collect,
50 execute_command_with_snapshot,
51 },
52};
53
54// ── Process ───────────────────────────────────────────────────────────────────
55
56/// An ergonomic typed handle for a single MaKo process instance.
57///
58/// `Process` bundles the [`StreamId`], [`ProcessId`], [`TenantId`],
59/// [`WorkflowId`], and event store into a single owned value so callers do not
60/// need to pass them on every command dispatch.
61///
62/// ## Generic parameters
63///
64/// - `W` — the [`Workflow`] implementation. In practice this is a zero-size
65/// marker struct; the type parameter carries the domain logic as associated
66/// types.
67/// - `S` — the [`EventStore`] backend. [`InMemoryEventStore`] is the default
68/// for tests; production deployments wrap a persistent backend in
69/// [`Arc`][std::sync::Arc] and use `Process<W, Arc<MyStore>>`.
70///
71/// ## Clone semantics
72///
73/// If `S: Clone` (e.g. [`InMemoryEventStore`] or `Arc<…>`), `Process` is also
74/// `Clone` and all clones share the same underlying storage.
75///
76/// [`InMemoryEventStore`]: crate::event_store::InMemoryEventStore
77#[allow(clippy::struct_field_names)] // `process_id` and `stream_id` are intentional: they
78// describe engine-layer concepts, not redundant prefixes.
79pub struct Process<W: Workflow, S: EventStore> {
80 stream_id: StreamId,
81 process_id: ProcessId,
82 tenant_id: TenantId,
83 workflow_id: WorkflowId,
84 store: S,
85 _phantom: PhantomData<fn() -> W>,
86}
87
88impl<W: Workflow, S: EventStore> Process<W, S> {
89 /// Create a fresh process instance.
90 ///
91 /// Generates a new [`ProcessId`] and derives the [`StreamId`] from
92 /// `tenant_id` and `process_id` (`process/{tenant_id}/{process_id}`).
93 /// Use this when starting a new MaKo process
94 /// (e.g. on receipt of the first inbound UTILMD Lieferbeginn).
95 #[must_use]
96 pub fn new(store: S, tenant_id: TenantId, workflow_id: WorkflowId) -> Self {
97 let process_id = ProcessId::new();
98 let stream_id = StreamId::for_process(tenant_id, &process_id);
99 Self {
100 stream_id,
101 process_id,
102 tenant_id,
103 workflow_id,
104 store,
105 _phantom: PhantomData,
106 }
107 }
108
109 /// Attach to an existing process stream.
110 ///
111 /// Use this on service restart or when routing an inbound message to an
112 /// already-running process whose identifiers were previously persisted.
113 #[must_use]
114 pub fn from_stream(
115 store: S,
116 stream_id: StreamId,
117 process_id: ProcessId,
118 tenant_id: TenantId,
119 workflow_id: WorkflowId,
120 ) -> Self {
121 Self {
122 stream_id,
123 process_id,
124 tenant_id,
125 workflow_id,
126 store,
127 _phantom: PhantomData,
128 }
129 }
130
131 /// The event stream identifier for this process.
132 #[must_use]
133 pub fn stream_id(&self) -> &StreamId {
134 &self.stream_id
135 }
136
137 /// The stable process identifier.
138 #[must_use]
139 pub fn process_id(&self) -> ProcessId {
140 self.process_id
141 }
142
143 /// The tenant that owns this process.
144 #[must_use]
145 pub fn tenant_id(&self) -> TenantId {
146 self.tenant_id
147 }
148
149 /// The workflow version under which this process was created.
150 #[must_use]
151 pub fn workflow_id(&self) -> &WorkflowId {
152 &self.workflow_id
153 }
154
155 /// Return a serializable value bundle of all four process identifiers.
156 ///
157 /// Persist this to a routing table (e.g. keyed by `conversation_id` or
158 /// `correlation_id`) so inbound messages can be routed to the correct
159 /// running process without the caller needing to manage four separate
160 /// fields.
161 ///
162 /// Use [`Process::from_identity`] to re-attach to the same process stream
163 /// on a subsequent request.
164 ///
165 /// ```rust,ignore
166 /// let id = process.identity();
167 /// routing_table.insert(conv_id, id.clone());
168 ///
169 /// // Later, on a subsequent inbound message:
170 /// let id = routing_table.get(&conv_id)?;
171 /// let process = Process::<MyWorkflow, _>::from_identity(store, id);
172 /// ```
173 #[must_use]
174 pub fn identity(&self) -> ProcessIdentity {
175 ProcessIdentity::new(self.process_id, self.tenant_id, self.workflow_id.clone())
176 }
177
178 /// Build a [`CommandContext`] for an inbound EDIFACT message dispatch.
179 ///
180 /// Derives a **deterministic** [`CorrelationId`] from `interchange_ref`
181 /// (UUID v5) so repeated dispatches of the same EDIFACT message — e.g.
182 /// AS4 retransmissions or idempotent REST replays — produce the same
183 /// correlation root. This makes EDIFACT-level idempotency observable in
184 /// distributed traces without any extra dedup logic at the engine level.
185 ///
186 /// Use this instead of [`Process::execute`] when you need to propagate
187 /// EDIFACT correlation metadata into the event stream. The returned
188 /// context is passed to [`Process::execute_with`].
189 ///
190 /// # Example
191 ///
192 /// ```rust,ignore
193 /// let process = ctx.resume::<GpkeSupplierChangeWorkflow>(identity);
194 /// let cmd_ctx = process.context_for_inbound(&utilmd_interchange_ref);
195 /// process.execute_with(command, cmd_ctx).await?;
196 /// ```
197 ///
198 /// [`CorrelationId`]: crate::ids::CorrelationId
199 #[must_use]
200 pub fn context_for_inbound(&self, interchange_ref: &str) -> CommandContext {
201 CommandContext::new(self.tenant_id, self.process_id, self.workflow_id.clone())
202 .with_correlation(crate::ids::CorrelationId::from_interchange_ref(
203 interchange_ref,
204 ))
205 }
206
207 /// Attach to an existing process stream from a previously persisted
208 /// [`ProcessIdentity`].
209 ///
210 /// This is the companion to [`Process::identity`]: look up the identity
211 /// from your routing table and call `from_identity` to get a live
212 /// `Process` handle bound to `store`.
213 #[must_use]
214 pub fn from_identity(store: S, identity: ProcessIdentity) -> Self {
215 Self {
216 stream_id: identity.stream_id().clone(),
217 process_id: identity.process_id,
218 tenant_id: identity.tenant_id,
219 workflow_id: identity.workflow_id,
220 store,
221 _phantom: PhantomData,
222 }
223 }
224
225 /// Return the number of events currently in the stream.
226 ///
227 /// Uses [`EventStore::stream_version`] for an efficient O(1) metadata
228 /// query on backends that override it. Falls back to loading all events
229 /// on stores that use the default implementation.
230 ///
231 /// Use this to decide whether to take a snapshot — e.g. with
232 /// [`Snapshot::should_take`]:
233 ///
234 /// ```rust,ignore
235 /// if Snapshot::should_take(process.event_count().await?, 100) {
236 /// process.take_snapshot(&snap_store, 100).await?;
237 /// }
238 /// ```
239 ///
240 /// # Errors
241 ///
242 /// Returns [`EngineError::Store`] on storage failures.
243 ///
244 /// [`Snapshot::should_take`]: crate::snapshot::Snapshot::should_take
245 pub async fn event_count(&self) -> Result<u64, EngineError> {
246 self.store.stream_version(&self.stream_id).await
247 }
248
249 /// Dispatch `command` using a freshly generated [`CommandContext`].
250 ///
251 /// A new [`CorrelationId`] and [`ConversationId`] are auto-generated for
252 /// each call. To propagate tracing IDs from an inbound EDIFACT message
253 /// across a multi-step command chain, use [`execute_with`].
254 ///
255 /// # Errors
256 ///
257 /// - [`EngineError::VersionConflict`] when a concurrent writer raced ahead;
258 /// retry by calling `execute` again.
259 /// - [`EngineError::Workflow`] when the workflow rejects the command.
260 /// - [`EngineError::Deserialization`] when a stored event cannot be decoded.
261 ///
262 /// [`CorrelationId`]: crate::ids::CorrelationId
263 /// [`ConversationId`]: crate::ids::ConversationId
264 /// [`execute_with`]: Process::execute_with
265 #[cfg_attr(
266 feature = "tracing",
267 tracing::instrument(skip(self, command), fields(
268 workflow = %self.workflow_id,
269 process_id = %self.process_id,
270 stream_id = %self.stream_id,
271 ))
272 )]
273 pub async fn execute(&self, command: W::Command) -> Result<Vec<EventEnvelope>, EngineError> {
274 let ctx = CommandContext::new(self.tenant_id, self.process_id, self.workflow_id.clone());
275 execute_command::<W, S>(&self.store, &self.stream_id, command, &ctx).await
276 }
277
278 /// Like [`execute`] but also returns the outbox messages produced by
279 /// [`Workflow::handle`], fully stamped with the real IDs from the persisted
280 /// event.
281 ///
282 /// The returned [`OutboxMessage`] entries have their `causation_event_id`
283 /// set to the `event_id` of the first persisted event — identical to what
284 /// `execute_and_enqueue` writes into the [`OutboxStore`] atomically. This
285 /// makes the messages ready to pass directly to the EDIFACT renderer
286 /// without any manual ID stitching.
287 ///
288 /// Use this in E2E and integration tests that need to inspect or render
289 /// outbox messages after a command is persisted, without the awkward
290 /// `handle()` + `execute()` double invocation.
291 ///
292 /// [`execute`]: Process::execute
293 /// [`OutboxMessage`]: crate::outbox::OutboxMessage
294 /// [`OutboxStore`]: crate::outbox::OutboxStore
295 ///
296 /// # Errors
297 ///
298 /// Returns [`EngineError`] on storage or command handling failure.
299 pub async fn execute_and_collect(
300 &self,
301 command: W::Command,
302 ) -> Result<(Vec<EventEnvelope>, Vec<crate::outbox::OutboxMessage>), EngineError> {
303 let ctx = CommandContext::new(self.tenant_id, self.process_id, self.workflow_id.clone());
304 let (events, pending) =
305 execute_command_and_collect::<W, S>(&self.store, &self.stream_id, command, &ctx)
306 .await?;
307
308 // Stamp each PendingOutbox with the real IDs from the persisted event.
309 // Using the first event's event_id as causation_event_id mirrors what
310 // execute_and_enqueue writes into the OutboxStore atomically.
311 let causation_event_id = events
312 .first()
313 .map_or_else(crate::ids::EventId::new, |e| e.event_id);
314
315 let outbox = pending
316 .into_iter()
317 .map(|p| {
318 crate::outbox::OutboxMessage::new(
319 self.stream_id.clone(),
320 self.process_id,
321 self.tenant_id,
322 ctx.correlation_id,
323 ctx.conversation_id,
324 causation_event_id,
325 p.message_type,
326 p.recipient,
327 p.payload,
328 )
329 })
330 .collect();
331
332 Ok((events, outbox))
333 }
334
335 /// Dispatch `command` with a caller-supplied [`CommandContext`].
336 ///
337 /// Use this when you need to thread a specific `correlation_id`,
338 /// `conversation_id`, or `causation_id` through the command. For example,
339 /// when dispatching an APERAK in response to a UTILMD, pass the
340 /// `conversation_id` from the UTILMD envelope so both exchanges are
341 /// traceable as a single business conversation.
342 ///
343 /// Build a context with:
344 ///
345 /// ```rust,ignore
346 /// let ctx = CommandContext::new(tenant_id, process_id, workflow_id)
347 /// .with_causation(utilmd_event_id.into()) // From<EventId> for CausationId
348 /// .with_conversation(utilmd_conversation_id);
349 /// process.execute_with(DispatchAperak { .. }, ctx).await?;
350 /// ```
351 ///
352 /// # Errors
353 ///
354 /// See [`Process::execute`] for the error contract.
355 ///
356 /// [`Process::execute`]: Process::execute
357 #[cfg_attr(
358 feature = "tracing",
359 tracing::instrument(skip(self, command, ctx), fields(
360 workflow = %self.workflow_id,
361 process_id = %self.process_id,
362 correlation_id = %ctx.correlation_id,
363 ))
364 )]
365 pub async fn execute_with(
366 &self,
367 command: W::Command,
368 ctx: CommandContext,
369 ) -> Result<Vec<EventEnvelope>, EngineError> {
370 execute_command::<W, S>(&self.store, &self.stream_id, command, &ctx).await
371 }
372
373 /// Dispatch `command` using a snapshot store to accelerate state reconstruction.
374 ///
375 /// Equivalent to [`Process::execute`] but starts replay from the most recent
376 /// snapshot rather than from sequence 0. For streams with thousands of events
377 /// and a snapshot within the last 100 events, this reduces replay cost from
378 /// O(n) to O(k) where k is the tail length since the last snapshot.
379 ///
380 /// When no snapshot exists or the schema version has changed, falls back to
381 /// full O(n) replay — identical in cost to [`Process::execute`].
382 ///
383 /// # Errors
384 ///
385 /// Same contract as [`Process::execute`].
386 pub async fn execute_snapshot<Snap>(
387 &self,
388 command: W::Command,
389 snap_store: &Snap,
390 ) -> Result<Vec<EventEnvelope>, EngineError>
391 where
392 W::State: serde::de::DeserializeOwned,
393 Snap: SnapshotStore,
394 {
395 let ctx = CommandContext::new(self.tenant_id, self.process_id, self.workflow_id.clone());
396 execute_command_with_snapshot::<W, S, Snap>(
397 &self.store,
398 snap_store,
399 &self.stream_id,
400 command,
401 &ctx,
402 )
403 .await
404 }
405
406 /// Reconstruct the current workflow state by replaying all persisted events.
407 ///
408 /// This is a **read-only** operation — it loads events but does not
409 /// acquire any write lock or check optimistic concurrency. Use it to:
410 ///
411 /// - Inspect process status in tests without dispatching a command.
412 /// - Build a diagnostic snapshot for observability or health checks.
413 /// - Implement query-side read models that need the full typed state.
414 ///
415 /// For production read models, prefer a [`Projection`] that is updated
416 /// incrementally rather than replaying the full stream on every query.
417 ///
418 /// To accelerate replay for long-lived streams, use
419 /// [`Process::state_with_snapshot`] instead.
420 ///
421 /// # Errors
422 ///
423 /// - [`EngineError::Store`] on storage failures.
424 /// - [`EngineError::Deserialization`] when a stored event cannot be decoded
425 /// into `W::Event` (schema migration required).
426 ///
427 /// [`Projection`]: crate::projection::Projection
428 #[cfg_attr(
429 feature = "tracing",
430 tracing::instrument(skip(self), fields(
431 workflow = %self.workflow_id,
432 stream_id = %self.stream_id,
433 ))
434 )]
435 pub async fn state(&self) -> Result<W::State, EngineError> {
436 self.store
437 .fold_stream(&self.stream_id, 0, W::State::default(), |acc, env| {
438 let payload = W::upcast(&env.event_type, env.schema_version, env.payload)?;
439 let event: W::Event = serde_json::from_value(payload)
440 .map_err(|e| EngineError::Deserialization(e.to_string()))?;
441 Ok(W::apply(acc, &event))
442 })
443 .await
444 }
445
446 // ── Snapshot-aware state reconstruction ──────────────────────────────────
447
448 /// Reconstruct current state using a snapshot as the starting point.
449 ///
450 /// Loads the most recent snapshot for this stream from `snap_store`. If
451 /// one exists, deserializes it into `W::State` and then replays only
452 /// events appended **after** the snapshot's `sequence_number`
453 /// (O(k) instead of O(n)). Falls back to full replay when no snapshot
454 /// exists.
455 ///
456 /// ## When to use
457 ///
458 /// Use this instead of [`Process::state`] for long-lived processes where
459 /// the event count grows large. Pair it with [`Process::take_snapshot`]
460 /// to keep the snapshot store current after each command.
461 ///
462 /// ## Schema version compatibility
463 ///
464 /// Snapshots whose `state` field cannot be deserialized into `W::State`
465 /// (e.g. after a breaking state schema change) will return
466 /// [`EngineError::Deserialization`]. In that case, fall back to
467 /// [`Process::state`] (full replay) and take a fresh snapshot.
468 ///
469 /// # Errors
470 ///
471 /// - [`EngineError::Store`] on snapshot or event storage failures.
472 /// - [`EngineError::Deserialization`] when the snapshot state or a tail
473 /// event cannot be decoded.
474 #[cfg_attr(
475 feature = "tracing",
476 tracing::instrument(skip(self, snap_store), fields(
477 workflow = %self.workflow_id,
478 stream_id = %self.stream_id,
479 ))
480 )]
481 pub async fn state_with_snapshot<Snap: SnapshotStore>(
482 &self,
483 snap_store: &Snap,
484 ) -> Result<W::State, EngineError>
485 where
486 W::State: serde::de::DeserializeOwned,
487 {
488 let maybe_snap = snap_store.load(&self.stream_id).await?;
489
490 let (initial_state, from_sequence) = match maybe_snap {
491 Some(snap) => {
492 if snap.state_schema_version == W::state_schema_version() {
493 let state = serde_json::from_value::<W::State>(snap.state)
494 .map_err(|e| EngineError::Deserialization(e.to_string()))?;
495 (state, snap.sequence_number)
496 } else {
497 // Schema version mismatch: discard the stale snapshot and
498 // fall back to full replay. The caller should take a fresh
499 // snapshot after this reconstruction completes.
500 tracing::warn!(
501 expected = W::state_schema_version(),
502 actual = snap.state_schema_version,
503 stream_id = %self.stream_id,
504 "snapshot schema version mismatch; falling back to full replay"
505 );
506 (W::State::default(), 0)
507 }
508 }
509 None => (W::State::default(), 0),
510 };
511
512 let tail = self
513 .store
514 .fold_stream(&self.stream_id, from_sequence, initial_state, |acc, env| {
515 let payload = W::upcast(&env.event_type, env.schema_version, env.payload)?;
516 let event: W::Event = serde_json::from_value(payload)
517 .map_err(|e| EngineError::Deserialization(e.to_string()))?;
518 Ok(W::apply(acc, &event))
519 })
520 .await?;
521 Ok(tail)
522 }
523
524 /// Reconstruct current state and save a snapshot if the event-count
525 /// threshold is reached.
526 ///
527 /// Checks [`Snapshot::should_take`] with `interval`. When at least
528 /// `interval` new events have accumulated since the last snapshot,
529 /// reconstructs state via full replay, serializes it, and calls
530 /// [`SnapshotStore::save`].
531 ///
532 /// Returns `true` when a snapshot was taken, `false` when the threshold
533 /// was not reached or `interval` is `0`.
534 ///
535 /// ## Integration pattern
536 ///
537 /// ```rust,ignore
538 /// // After every successful command:
539 /// process.execute(command).await?;
540 /// process.take_snapshot(&snap_store, 100).await?;
541 ///
542 /// // On the read path — O(k) instead of O(n):
543 /// let state = process.state_with_snapshot(&snap_store).await?;
544 /// ```
545 ///
546 /// # Errors
547 ///
548 /// - [`EngineError::Store`] on snapshot storage failures.
549 /// - [`EngineError::Serialization`] when the state cannot be JSON-encoded.
550 /// - [`EngineError::Deserialization`] when a stored event cannot be decoded.
551 ///
552 /// [`Snapshot::should_take`]: crate::snapshot::Snapshot::should_take
553 pub async fn take_snapshot<Snap: SnapshotStore>(
554 &self,
555 snap_store: &Snap,
556 interval: u64,
557 ) -> Result<bool, EngineError>
558 where
559 W::State: serde::Serialize,
560 {
561 let count = self.event_count().await?;
562 // Load the last snapshot (if any) to get its sequence number.
563 let last_snap_seq = snap_store
564 .load(&self.stream_id)
565 .await?
566 .map_or(0, |s| s.sequence_number);
567 if !Snapshot::should_take(count, last_snap_seq, interval) {
568 return Ok(false);
569 }
570 let state = self.state().await?;
571 let payload =
572 serde_json::to_value(&state).map_err(|e| EngineError::Serialization(e.to_string()))?;
573 let snap = Snapshot::new(
574 self.stream_id.clone(),
575 count,
576 W::state_schema_version(),
577 payload,
578 );
579 snap_store.save(&snap).await?;
580 Ok(true)
581 }
582
583 // ── Retry ─────────────────────────────────────────────────────────────────
584
585 /// Dispatch `command` with automatic retry on [`EngineError::VersionConflict`].
586 ///
587 /// A version conflict occurs when a concurrent writer appended events
588 /// between this process's read and its append attempt. On each conflict,
589 /// the engine **reloads the complete event stream from the store and
590 /// replays all events** to rebuild fresh state before re-handling the
591 /// command. Stale in-memory state from a previous attempt is never
592 /// carried forward — each retry always starts from a fully-rebuilt snapshot.
593 ///
594 /// Non-conflict errors (storage failures, workflow rejections) are
595 /// returned immediately without retrying.
596 ///
597 /// A freshly-generated [`CommandContext`] is pinned before the first
598 /// attempt and reused across all retries so all events share the same
599 /// correlation root regardless of retry count. Use
600 /// [`execute_with_retry_ctx`] to supply a specific context (e.g. one
601 /// derived from an inbound EDIFACT envelope).
602 ///
603 /// ## When to use
604 ///
605 /// Use for commands where two inbound EDIFACT messages for the same
606 /// process may arrive concurrently — e.g. a UTILMD and its APERAK
607 /// processed on separate async tasks.
608 ///
609 /// ## Command cloning
610 ///
611 /// `W::Command` must implement [`Clone`] so it can be resubmitted on
612 /// each retry without reconstructing it from scratch.
613 ///
614 /// # Errors
615 ///
616 /// - [`EngineError::VersionConflict`] when all `max_attempts` are
617 /// exhausted without a successful append.
618 /// - Any non-conflict [`EngineError`] returned by the workflow or storage.
619 /// - [`EngineError::Store`] when `max_attempts` is `0`.
620 ///
621 /// [`execute_with_retry_ctx`]: Process::execute_with_retry_ctx
622 pub async fn execute_with_retry(
623 &self,
624 command: W::Command,
625 max_attempts: u32,
626 ) -> Result<Vec<EventEnvelope>, EngineError>
627 where
628 W::Command: Clone,
629 {
630 if max_attempts == 0 {
631 return Err(EngineError::store("max_attempts must be >= 1"));
632 }
633 // Pin context before the loop — all retry attempts share the same
634 // correlation root for consistent distributed tracing.
635 let ctx = CommandContext::new(self.tenant_id, self.process_id, self.workflow_id.clone());
636 self.execute_with_retry_ctx(command, ctx, max_attempts)
637 .await
638 }
639
640 /// Dispatch `command` with a caller-supplied [`CommandContext`] and
641 /// automatic retry on [`EngineError::VersionConflict`].
642 ///
643 /// Identical to [`execute_with_retry`] but threads the provided `ctx`
644 /// (including its `correlation_id`, `conversation_id`, and `causation_id`)
645 /// through every retry attempt. Use this when you need to propagate
646 /// tracing IDs from an inbound EDIFACT envelope across a retried command.
647 ///
648 /// # Example
649 ///
650 /// ```rust,ignore
651 /// let ctx = CommandContext::from_envelope(&utilmd_envelope, workflow_id);
652 /// process.execute_with_retry_ctx(HandleAperak { .. }, ctx, 3).await?;
653 /// ```
654 ///
655 /// # Errors
656 ///
657 /// See [`execute_with_retry`] for the error contract.
658 ///
659 /// # Panics
660 ///
661 /// Panics if `max_attempts` is 0 and the guard at the top of the function
662 /// is somehow bypassed (unreachable in practice).
663 ///
664 /// [`execute_with_retry`]: Process::execute_with_retry
665 pub async fn execute_with_retry_ctx(
666 &self,
667 command: W::Command,
668 ctx: CommandContext,
669 max_attempts: u32,
670 ) -> Result<Vec<EventEnvelope>, EngineError>
671 where
672 W::Command: Clone,
673 {
674 if max_attempts == 0 {
675 return Err(EngineError::store("max_attempts must be >= 1"));
676 }
677 let mut conflict_err: Option<EngineError> = None;
678 for attempt in 0..max_attempts {
679 // Each call to `execute_with` internally calls `fold_stream` from
680 // sequence 0 (or from the most recent snapshot if one is available).
681 // State is always freshly reconstructed from the event log on every
682 // attempt — there is no stale state carried forward between retries.
683 // Do NOT "optimise" this by caching state across attempts; doing so
684 // would allow a winning concurrent writer's events to be invisible
685 // to the retry, producing incorrect decisions and duplicate events.
686 match self.execute_with(command.clone(), ctx.clone()).await {
687 Ok(envs) => return Ok(envs),
688 Err(e) if e.is_version_conflict() => {
689 conflict_err = Some(e);
690 // Brief jittered sleep to reduce thundering-herd under
691 // concurrent ERP commands targeting the same stream.
692 // Delay = uniform random in [0, 10ms * attempt], capped at 80ms.
693 // Uses the OS CSPRNG via rand so every retry gets independent
694 // entropy regardless of stream-ID prefix.
695 if attempt + 1 < max_attempts {
696 let entropy: u64 = rand::random();
697 let window_ms: u64 = (10 * (u64::from(attempt) + 1)).min(80);
698 let jitter_ms = if window_ms == 0 {
699 0
700 } else {
701 entropy % window_ms
702 };
703 tokio::time::sleep(std::time::Duration::from_millis(jitter_ms)).await;
704 }
705 }
706 Err(e) => return Err(e), // non-retriable — propagate immediately
707 }
708 }
709 // At least one attempt ran (max_attempts >= 1), so conflict_err is Some.
710 Err(conflict_err.expect("loop ran at least once"))
711 }
712
713 /// Execute `command` and atomically co-persist any [`PendingOutbox`] messages
714 /// produced by [`Workflow::handle`].
715 ///
716 /// Like [`execute`], but requires `S: AtomicAppend`. When the workflow's
717 /// `handle` returns outbox messages alongside events, both are written to
718 /// storage in a single `WriteBatch`, eliminating the silent message-loss
719 /// window that would exist with separate writes.
720 ///
721 /// When the handle returns no outbox messages, this degenerates to a plain
722 /// `EventStore::append` (no performance cost).
723 ///
724 /// **Use this method instead of [`execute`] in all production code** that
725 /// needs outbox delivery guarantees. Plain `execute` silently drops any
726 /// outbox entries produced by the workflow handler — a crash between
727 /// `execute` and a subsequent manual `OutboxStore::enqueue` call would
728 /// lose the APERAK or UTILMD response permanently.
729 ///
730 /// For long event streams with periodic snapshots use
731 /// [`execute_and_enqueue_snapshot`] to reduce O(n) replay cost to O(k).
732 /// In concurrent environments where `VersionConflict` is expected, use
733 /// [`execute_and_enqueue_with_retry`] to retry automatically.
734 ///
735 /// # Example
736 ///
737 /// ```rust,ignore
738 /// use std::sync::Arc;
739 /// use mako_engine::process::Process;
740 /// use mako_engine::version::WorkflowId;
741 /// use mako_engine::ids::TenantId;
742 ///
743 /// // SlateDbStore implements AtomicAppend — required for execute_and_enqueue.
744 /// let store = Arc::new(SlateDbStore::open_in_memory().await?);
745 /// let tenant_id = TenantId::from_party_id("9904231000007");
746 /// let workflow_id = WorkflowId::new("gpke-supplier-change", fv);
747 ///
748 /// let process = Process::<GpkeSupplierChangeWorkflow, _>::new(
749 /// Arc::clone(&store),
750 /// tenant_id,
751 /// workflow_id,
752 /// );
753 ///
754 /// // The workflow handle emits a PendingOutbox APERAK entry alongside the event.
755 /// // execute_and_enqueue writes both in one WriteBatch — no partial-write window.
756 /// let events = process
757 /// .execute_and_enqueue(GpkeCommand::ReceiveUtilmd { pid: 55001, payload })
758 /// .await?;
759 ///
760 /// assert!(!events.is_empty(), "at least one event was persisted");
761 ///
762 /// // The APERAK outbox entry is now visible to the outbox worker:
763 /// let pending = store.peek_outbox(tenant_id, 10).await?;
764 /// assert_eq!(pending.len(), 1, "APERAK enqueued atomically with the event");
765 /// ```
766 ///
767 /// # Errors
768 ///
769 /// - [`EngineError::VersionConflict`] — stream was modified concurrently;
770 /// retry with [`execute_and_enqueue_with_retry`].
771 /// - [`EngineError::Workflow`] — the command was rejected by the workflow.
772 /// - [`EngineError::Store`] / [`EngineError::Outbox`] — storage failure.
773 ///
774 /// [`PendingOutbox`]: crate::outbox::PendingOutbox
775 /// [`execute`]: Process::execute
776 /// [`execute_and_enqueue_snapshot`]: Process::execute_and_enqueue_snapshot
777 /// [`execute_and_enqueue_with_retry`]: Process::execute_and_enqueue_with_retry
778 pub async fn execute_and_enqueue(
779 &self,
780 command: W::Command,
781 ) -> Result<Vec<EventEnvelope>, EngineError>
782 where
783 S: crate::event_store::AtomicAppend,
784 {
785 let ctx = CommandContext::new(self.tenant_id, self.process_id, self.workflow_id.clone());
786 crate::workflow::execute_command_atomic::<W, S>(&self.store, &self.stream_id, command, &ctx)
787 .await
788 }
789
790 /// Like [`execute_and_enqueue`] but uses a snapshot to accelerate replay.
791 ///
792 /// Atomically persists events and outbox entries while starting state
793 /// reconstruction from the most recent snapshot. For long streams with
794 /// periodic snapshots this reduces replay cost from O(n) to O(k).
795 ///
796 /// [`execute_and_enqueue`]: Process::execute_and_enqueue
797 ///
798 /// # Errors
799 ///
800 /// Returns [`EngineError`] on storage or command handling failure.
801 pub async fn execute_and_enqueue_snapshot<Snap>(
802 &self,
803 command: W::Command,
804 snap_store: &Snap,
805 ) -> Result<Vec<EventEnvelope>, EngineError>
806 where
807 W::State: serde::de::DeserializeOwned,
808 S: crate::event_store::AtomicAppend,
809 Snap: SnapshotStore,
810 {
811 let ctx = CommandContext::new(self.tenant_id, self.process_id, self.workflow_id.clone());
812 crate::workflow::execute_command_atomic_with_snapshot::<W, S, Snap>(
813 &self.store,
814 snap_store,
815 &self.stream_id,
816 command,
817 &ctx,
818 )
819 .await
820 }
821
822 /// Dispatch the compensation command returned by [`Workflow::on_deadline`].
823 ///
824 /// Reconstructs the current process state, calls
825 /// `W::on_deadline(deadline, &state)`, and — if the hook returns
826 /// `Some(command)` — executes it via [`Process::execute_and_enqueue`],
827 /// which atomically persists events **and** any outbox entries (e.g.
828 /// APERAK Ablehnung) produced by the compensation handler.
829 ///
830 /// Returns `Ok(Some(events))` when compensation fired, `Ok(None)` when
831 /// the hook returned `None` (deadline acknowledged as no-op).
832 ///
833 /// This is the canonical way to wire deadline firings to workflow
834 /// compensation logic. Any [`WorkflowOutput::with_outbox`] entries
835 /// returned by `on_deadline` are guaranteed to be persisted atomically —
836 /// there is no window where the event is stored but the outbox entry is
837 /// lost.
838 ///
839 /// # Example
840 ///
841 /// ```rust,ignore
842 /// // In the deadline worker:
843 /// let overdue = ctx.deadline_store().due_now(50).await?;
844 /// for deadline in overdue {
845 /// let identity = ctx.registry()
846 /// .lookup(deadline.tenant_id(), &RegistryKey::from_process(deadline.process_id()))
847 /// .await?
848 /// .expect("process must be registered");
849 /// let process = ctx.resume::<GpkeSupplierChangeWorkflow>(identity);
850 /// if let Some(events) = process.execute_timeout(&deadline).await? {
851 /// // compensation command was dispatched — APERAK Ablehnung enqueued
852 /// tracing::info!(events = events.len(), "timeout compensation applied");
853 /// }
854 /// ctx.deadline_store().cancel(deadline.deadline_id()).await?;
855 /// }
856 /// ```
857 ///
858 /// # Errors
859 ///
860 /// Propagates [`EngineError::VersionConflict`], [`EngineError::Workflow`],
861 /// and storage errors from `execute_and_enqueue`. Use
862 /// [`execute_timeout_with_retry`] when `VersionConflict` retries are
863 /// required.
864 ///
865 /// [`Workflow::on_deadline`]: crate::workflow::Workflow::on_deadline
866 /// [`WorkflowOutput::with_outbox`]: crate::workflow::WorkflowOutput::with_outbox
867 /// [`execute_timeout_with_retry`]: Process::execute_timeout_with_retry
868 pub async fn execute_timeout(
869 &self,
870 deadline: &crate::deadline::Deadline,
871 ) -> Result<Option<Vec<EventEnvelope>>, EngineError>
872 where
873 S: crate::event_store::AtomicAppend,
874 {
875 let state = self.state().await?;
876 match W::on_deadline(deadline, &state) {
877 None => Ok(None),
878 Some(command) => self.execute_and_enqueue(command).await.map(Some),
879 }
880 }
881
882 /// Like [`execute_timeout`] but retries on [`VersionConflict`] up to
883 /// `max_attempts` times.
884 ///
885 /// Use this in production deadline workers where concurrent event appends
886 /// are expected. Outbox entries (e.g. APERAK Ablehnung) produced by the
887 /// compensation handler are persisted atomically on every attempt.
888 ///
889 /// [`execute_timeout`]: Process::execute_timeout
890 /// [`VersionConflict`]: crate::error::EngineError::VersionConflict
891 ///
892 /// # Errors
893 ///
894 /// Returns [`EngineError`] on storage or command handling failure.
895 ///
896 /// # Panics
897 ///
898 /// Panics if the deadline produces a command but the retry loop somehow
899 /// exhausts without capturing an error (unreachable in practice).
900 pub async fn execute_timeout_with_retry(
901 &self,
902 deadline: &crate::deadline::Deadline,
903 max_attempts: u32,
904 ) -> Result<Option<Vec<EventEnvelope>>, EngineError>
905 where
906 S: crate::event_store::AtomicAppend,
907 W::Command: Clone,
908 {
909 let state = self.state().await?;
910 match W::on_deadline(deadline, &state) {
911 None => Ok(None),
912 Some(command) => self
913 .execute_and_enqueue_with_retry(command, max_attempts)
914 .await
915 .map(Some),
916 }
917 }
918
919 /// Like [`execute_and_enqueue`] but retries on [`crate::error::EngineError::VersionConflict`] up to
920 /// `max_attempts` times.
921 ///
922 /// [`execute_and_enqueue`]: Process::execute_and_enqueue
923 ///
924 /// # Errors
925 ///
926 /// Returns [`EngineError`] on storage or command handling failure.
927 ///
928 /// # Panics
929 ///
930 /// Panics if `max_attempts` is 0 and the guard is bypassed (unreachable).
931 pub async fn execute_and_enqueue_with_retry(
932 &self,
933 command: W::Command,
934 max_attempts: u32,
935 ) -> Result<Vec<EventEnvelope>, EngineError>
936 where
937 S: crate::event_store::AtomicAppend,
938 W::Command: Clone,
939 {
940 if max_attempts == 0 {
941 return Err(EngineError::store("max_attempts must be >= 1"));
942 }
943 let ctx = CommandContext::new(self.tenant_id, self.process_id, self.workflow_id.clone());
944 let mut conflict_err: Option<EngineError> = None;
945 for _ in 0..max_attempts {
946 match crate::workflow::execute_command_atomic::<W, S>(
947 &self.store,
948 &self.stream_id,
949 command.clone(),
950 &ctx,
951 )
952 .await
953 {
954 Ok(envs) => return Ok(envs),
955 Err(e) if e.is_version_conflict() => conflict_err = Some(e),
956 Err(e) => return Err(e),
957 }
958 }
959 Err(conflict_err.expect("loop ran at least once"))
960 }
961
962 /// Execute `command` atomically with outbox, then automatically snapshot
963 /// if the event-count threshold is reached.
964 ///
965 /// Combines [`execute_and_enqueue`] with [`take_snapshot`]: after a
966 /// successful write, checks whether `event_count % snapshot_interval == 0`
967 /// and, if so, serialises and saves a snapshot via `snap_store`.
968 ///
969 /// Pass `snapshot_interval = 0` to disable auto-snapshotting; the call
970 /// then behaves identically to [`execute_and_enqueue`].
971 ///
972 /// Returns `(events, snapshot_taken)` where `snapshot_taken` is `true` when
973 /// a snapshot was written this call.
974 ///
975 /// # Errors
976 ///
977 /// - [`EngineError::VersionConflict`] — stream was modified concurrently;
978 /// retry with [`execute_and_enqueue_with_retry`].
979 /// - [`EngineError::Workflow`] — the command was rejected by the workflow.
980 /// - [`EngineError::Store`] / [`EngineError::Outbox`] — storage failure.
981 /// - [`EngineError::Serialization`] — state serialisation failed during snapshot.
982 ///
983 /// [`execute_and_enqueue`]: Process::execute_and_enqueue
984 /// [`take_snapshot`]: Process::take_snapshot
985 /// [`execute_and_enqueue_with_retry`]: Process::execute_and_enqueue_with_retry
986 pub async fn execute_and_enqueue_with_snapshot<Snap>(
987 &self,
988 command: W::Command,
989 snap_store: &Snap,
990 snapshot_interval: u64,
991 ) -> Result<(Vec<EventEnvelope>, bool), EngineError>
992 where
993 S: crate::event_store::AtomicAppend,
994 Snap: crate::snapshot::SnapshotStore,
995 W::State: serde::Serialize,
996 {
997 let events = self.execute_and_enqueue(command).await?;
998 let snapped = if snapshot_interval > 0 {
999 self.take_snapshot(snap_store, snapshot_interval).await?
1000 } else {
1001 false
1002 };
1003 Ok((events, snapped))
1004 }
1005
1006 /// Like [`execute_and_enqueue_with_snapshot`] but retries on
1007 /// [`crate::error::EngineError::VersionConflict`] up to `max_attempts` times.
1008 ///
1009 /// [`execute_and_enqueue_with_retry`]: Process::execute_and_enqueue_with_retry
1010 ///
1011 /// # Errors
1012 ///
1013 /// - [`EngineError::VersionConflict`] — stream was modified concurrently;
1014 /// retry with [`execute_and_enqueue_with_snapshot_and_retry`].
1015 /// - [`EngineError::Workflow`] — the command was rejected by the workflow.
1016 /// - [`EngineError::Store`] / [`EngineError::Outbox`] — storage failure.
1017 /// - [`EngineError::Serialization`] — state serialisation failed during snapshot.
1018 ///
1019 /// # Panics
1020 ///
1021 /// Panics if `max_attempts` is 0 and the loop guard is bypassed (unreachable).
1022 ///
1023 /// [`execute_and_enqueue_with_snapshot`]: Process::execute_and_enqueue_with_snapshot
1024 /// [`execute_and_enqueue_with_snapshot_and_retry`]: Process::execute_and_enqueue_with_snapshot_and_retry
1025 pub async fn execute_and_enqueue_with_snapshot_and_retry<Snap>(
1026 &self,
1027 command: W::Command,
1028 max_attempts: u32,
1029 snap_store: &Snap,
1030 snapshot_interval: u64,
1031 ) -> Result<(Vec<EventEnvelope>, bool), EngineError>
1032 where
1033 S: crate::event_store::AtomicAppend,
1034 W::Command: Clone,
1035 Snap: crate::snapshot::SnapshotStore,
1036 W::State: serde::Serialize,
1037 {
1038 let events = self
1039 .execute_and_enqueue_with_retry(command, max_attempts)
1040 .await?;
1041 let snapped = if snapshot_interval > 0 {
1042 self.take_snapshot(snap_store, snapshot_interval).await?
1043 } else {
1044 false
1045 };
1046 Ok((events, snapped))
1047 }
1048}
1049
1050impl<W: Workflow, S: EventStore + Clone> Clone for Process<W, S> {
1051 fn clone(&self) -> Self {
1052 Self {
1053 stream_id: self.stream_id.clone(),
1054 process_id: self.process_id,
1055 tenant_id: self.tenant_id,
1056 workflow_id: self.workflow_id.clone(),
1057 store: self.store.clone(),
1058 _phantom: PhantomData,
1059 }
1060 }
1061}
1062
1063impl<W: Workflow, S: EventStore + std::fmt::Debug> std::fmt::Debug for Process<W, S> {
1064 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1065 f.debug_struct("Process")
1066 .field("stream_id", &self.stream_id)
1067 .field("process_id", &self.process_id)
1068 .field("workflow_id", &self.workflow_id)
1069 .finish_non_exhaustive()
1070 }
1071}
1072
1073// ── Unit tests ────────────────────────────────────────────────────────────────
1074
1075#[cfg(test)]
1076mod tests {
1077 use super::*;
1078 use crate::{
1079 envelope::NewEvent,
1080 error::WorkflowError,
1081 event_store::{EventStore, ExpectedVersion, InMemoryEventStore},
1082 ids::{ConversationId, CorrelationId, TenantId},
1083 snapshot::{InMemorySnapshotStore, NoopSnapshotStore},
1084 version::WorkflowId,
1085 workflow::{CommandPayload, EventPayload},
1086 };
1087
1088 // ── Minimal test workflow ─────────────────────────────────────────────────
1089
1090 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
1091 enum CounterEvent {
1092 Incremented { by: u32 },
1093 Reset,
1094 }
1095
1096 impl EventPayload for CounterEvent {
1097 fn event_type(&self) -> &'static str {
1098 match self {
1099 Self::Incremented { .. } => "Incremented",
1100 Self::Reset => "Reset",
1101 }
1102 }
1103 }
1104
1105 #[derive(Debug, Clone)]
1106 enum CounterCommand {
1107 Increment { by: u32 },
1108 Reset,
1109 }
1110
1111 impl CommandPayload for CounterCommand {}
1112
1113 #[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
1114 struct CounterState {
1115 value: u32,
1116 }
1117
1118 struct CounterWorkflow;
1119
1120 impl Workflow for CounterWorkflow {
1121 type State = CounterState;
1122 type Event = CounterEvent;
1123 type Command = CounterCommand;
1124
1125 fn apply(mut state: CounterState, event: &CounterEvent) -> CounterState {
1126 match event {
1127 CounterEvent::Incremented { by } => state.value += by,
1128 CounterEvent::Reset => state.value = 0,
1129 }
1130 state
1131 }
1132
1133 fn handle(
1134 _state: &CounterState,
1135 command: CounterCommand,
1136 ) -> Result<crate::workflow::WorkflowOutput<CounterEvent>, WorkflowError> {
1137 Ok(match command {
1138 CounterCommand::Increment { by } => vec![CounterEvent::Incremented { by }].into(),
1139 CounterCommand::Reset => vec![CounterEvent::Reset].into(),
1140 })
1141 }
1142 }
1143
1144 fn make_process() -> Process<CounterWorkflow, InMemoryEventStore> {
1145 Process::new(
1146 InMemoryEventStore::new(),
1147 TenantId::new(),
1148 WorkflowId::new("counter", "FV2024-10-01"),
1149 )
1150 }
1151
1152 // ── execute + state round-trip ────────────────────────────────────────────
1153
1154 #[tokio::test]
1155 async fn execute_then_state_round_trip() {
1156 let p = make_process();
1157
1158 p.execute(CounterCommand::Increment { by: 3 })
1159 .await
1160 .unwrap();
1161 p.execute(CounterCommand::Increment { by: 7 })
1162 .await
1163 .unwrap();
1164
1165 let state = p.state().await.unwrap();
1166 assert_eq!(state.value, 10);
1167 }
1168
1169 #[tokio::test]
1170 async fn event_count_matches_dispatched_commands() {
1171 let p = make_process();
1172
1173 assert_eq!(p.event_count().await.unwrap(), 0);
1174 p.execute(CounterCommand::Increment { by: 1 })
1175 .await
1176 .unwrap();
1177 assert_eq!(p.event_count().await.unwrap(), 1);
1178 p.execute(CounterCommand::Reset).await.unwrap();
1179 assert_eq!(p.event_count().await.unwrap(), 2);
1180 }
1181
1182 // ── identity round-trip ───────────────────────────────────────────────────
1183
1184 #[tokio::test]
1185 async fn identity_round_trip_via_from_identity() {
1186 let store = InMemoryEventStore::new();
1187 let p1 = Process::<CounterWorkflow, _>::new(
1188 store.clone(),
1189 TenantId::new(),
1190 WorkflowId::new("counter", "FV2024-10-01"),
1191 );
1192
1193 p1.execute(CounterCommand::Increment { by: 5 })
1194 .await
1195 .unwrap();
1196
1197 let identity = p1.identity();
1198 assert_eq!(*identity.stream_id(), *p1.stream_id());
1199 assert_eq!(identity.process_id, p1.process_id());
1200
1201 // Re-attach from identity and confirm state is visible.
1202 let p2 = Process::<CounterWorkflow, _>::from_identity(store, identity);
1203 let state = p2.state().await.unwrap();
1204 assert_eq!(state.value, 5);
1205 }
1206
1207 #[test]
1208 fn process_identity_is_serializable() {
1209 let p = make_process();
1210 let id = p.identity();
1211 let json = serde_json::to_string(&id).expect("ProcessIdentity must be serializable");
1212 let back: ProcessIdentity = serde_json::from_str(&json).unwrap();
1213 assert_eq!(*back.stream_id(), *id.stream_id());
1214 assert_eq!(back.process_id, id.process_id);
1215 }
1216
1217 // ── snapshot-accelerated state reconstruction ─────────────────────────────
1218
1219 #[tokio::test]
1220 async fn take_snapshot_and_state_with_snapshot() {
1221 let snap_store = InMemorySnapshotStore::new();
1222 let p = make_process();
1223
1224 // Dispatch 4 commands; the interval is 4.
1225 for i in 1u32..=4 {
1226 p.execute(CounterCommand::Increment { by: i })
1227 .await
1228 .unwrap();
1229 }
1230
1231 let took = p.take_snapshot(&snap_store, 4).await.unwrap();
1232 assert!(took, "snapshot must be taken at event_count = 4");
1233
1234 // Dispatch one more command after the snapshot.
1235 p.execute(CounterCommand::Increment { by: 10 })
1236 .await
1237 .unwrap();
1238
1239 let state = p.state_with_snapshot(&snap_store).await.unwrap();
1240 // 1+2+3+4 = 10, plus the final +10 = 20.
1241 assert_eq!(state.value, 20);
1242 }
1243
1244 #[tokio::test]
1245 async fn state_with_snapshot_falls_back_to_full_replay() {
1246 let p = make_process();
1247 p.execute(CounterCommand::Increment { by: 42 })
1248 .await
1249 .unwrap();
1250
1251 // NoopSnapshotStore always returns None → full replay.
1252 let state = p.state_with_snapshot(&NoopSnapshotStore).await.unwrap();
1253 assert_eq!(state.value, 42);
1254 }
1255
1256 #[tokio::test]
1257 async fn take_snapshot_skipped_between_intervals() {
1258 let snap_store = InMemorySnapshotStore::new();
1259 let p = make_process();
1260
1261 p.execute(CounterCommand::Increment { by: 1 })
1262 .await
1263 .unwrap();
1264 p.execute(CounterCommand::Increment { by: 1 })
1265 .await
1266 .unwrap();
1267 p.execute(CounterCommand::Increment { by: 1 })
1268 .await
1269 .unwrap();
1270
1271 // 3 events, interval = 4 → must not take.
1272 let took = p.take_snapshot(&snap_store, 4).await.unwrap();
1273 assert!(!took);
1274 assert!(snap_store.is_empty().await);
1275 }
1276
1277 /// Regression test for when a persisted snapshot carries a
1278 /// `state_schema_version` that does not match the current workflow's
1279 /// `state_schema_version()`, `state_with_snapshot` must silently discard
1280 /// the stale snapshot and fall back to full event replay.
1281 ///
1282 /// This guards against silent data corruption when state layout changes
1283 /// incompatibly — e.g. after adding a new required field to `CounterState`.
1284 #[tokio::test]
1285 async fn stale_snapshot_schema_version_falls_back_to_full_replay() {
1286 // CounterWorkflow uses state_schema_version() == 1 (the default).
1287 // We simulate a "migrated" workflow by injecting a snapshot whose
1288 // state_schema_version is bumped to 99, representing a schema that the
1289 // current workflow code does not understand.
1290 let snap_store = InMemorySnapshotStore::new();
1291 let p = make_process();
1292
1293 // Dispatch some events so there is something to replay.
1294 p.execute(CounterCommand::Increment { by: 5 })
1295 .await
1296 .unwrap();
1297 p.execute(CounterCommand::Increment { by: 3 })
1298 .await
1299 .unwrap();
1300
1301 // Manually save a stale snapshot with schema_version = 99.
1302 // The state payload is intentionally wrong — it should never be used.
1303 let stale = crate::snapshot::Snapshot::new(
1304 p.stream_id().clone(),
1305 2, // sequence_number after 2 events
1306 99, // ← unknown schema version
1307 serde_json::json!({ "value": 9999 }), // ← wrong value; must not be read
1308 );
1309 snap_store.save(&stale).await.unwrap();
1310
1311 // state_with_snapshot must discard the stale snapshot and replay all
1312 // events from sequence 0, producing the correct state (5+3=8).
1313 let current_state = p.state_with_snapshot(&snap_store).await.unwrap();
1314 assert_eq!(
1315 current_state.value, 8,
1316 "stale snapshot must be discarded; full replay must yield correct state"
1317 );
1318 }
1319
1320 // ── execute_with_retry ────────────────────────────────────────────────────
1321
1322 #[tokio::test]
1323 async fn execute_with_retry_succeeds_on_first_attempt() {
1324 let p = make_process();
1325 let envs = p
1326 .execute_with_retry(CounterCommand::Increment { by: 99 }, 3)
1327 .await
1328 .unwrap();
1329 assert_eq!(envs.len(), 1);
1330 assert_eq!(p.state().await.unwrap().value, 99);
1331 }
1332
1333 #[tokio::test]
1334 async fn execute_with_retry_returns_err_on_zero_attempts() {
1335 let p = make_process();
1336 let err = p
1337 .execute_with_retry(CounterCommand::Increment { by: 1 }, 0)
1338 .await
1339 .unwrap_err();
1340 assert!(
1341 matches!(err, EngineError::Store { ref message, .. } if message.contains("max_attempts")),
1342 "expected Store error about max_attempts, got: {err:?}",
1343 );
1344 }
1345
1346 // ── execute_with (explicit context) ──────────────────────────────────────
1347
1348 #[tokio::test]
1349 async fn execute_with_explicit_context_propagates_ids() {
1350 use crate::ids::{ConversationId, CorrelationId};
1351 let p = make_process();
1352
1353 let corr = CorrelationId::new();
1354 let conv = ConversationId::new();
1355 let ctx = CommandContext::new(p.tenant_id(), p.process_id(), p.workflow_id().clone())
1356 .with_correlation(corr)
1357 .with_conversation(conv);
1358
1359 let envs = p
1360 .execute_with(CounterCommand::Increment { by: 1 }, ctx)
1361 .await
1362 .unwrap();
1363 assert_eq!(envs.len(), 1);
1364 assert_eq!(envs[0].correlation_id, corr);
1365 assert_eq!(envs[0].conversation_id, conv);
1366 }
1367
1368 // ── upcast / schema-migration ─────────────────────────────────────────────
1369 //
1370 // A v2 workflow adds a `label: String` field to its single event.
1371 // Old (v1) events stored without `label` must be migrated by `upcast`.
1372 //
1373 // `#[serde(untagged)]` is used so the serialized payload is the flat
1374 // inner struct `{"count": N, "label": "..."}` rather than the externally-
1375 // tagged `{"Tagged": {"count": N}}` form. This matches the common
1376 // real-world pattern where each `EventPayload::event_type()` discriminant
1377 // IS the variant selector stored in the envelope, and the payload holds
1378 // only the fields.
1379
1380 #[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
1381 struct TagState {
1382 total: u32,
1383 last_label: String,
1384 }
1385
1386 /// v1 schema (legacy): `{ "count": u32 }` — `label` field absent.
1387 /// v2 schema: `{ "count": u32, "label": String }`.
1388 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
1389 #[serde(untagged)]
1390 enum TagEvent {
1391 Tagged { count: u32, label: String },
1392 }
1393
1394 impl EventPayload for TagEvent {
1395 fn event_type(&self) -> &'static str {
1396 "Tagged"
1397 }
1398 fn schema_version(&self) -> u32 {
1399 2
1400 }
1401 }
1402
1403 #[derive(Debug, Clone)]
1404 struct TagCommand {
1405 count: u32,
1406 label: String,
1407 }
1408 impl CommandPayload for TagCommand {}
1409
1410 struct TagWorkflow;
1411
1412 impl Workflow for TagWorkflow {
1413 type State = TagState;
1414 type Event = TagEvent;
1415 type Command = TagCommand;
1416
1417 fn apply(mut state: TagState, event: &TagEvent) -> TagState {
1418 let TagEvent::Tagged { count, label } = event;
1419 state.total += count;
1420 state.last_label = label.clone();
1421 state
1422 }
1423
1424 fn handle(
1425 _state: &TagState,
1426 cmd: TagCommand,
1427 ) -> Result<crate::workflow::WorkflowOutput<TagEvent>, WorkflowError> {
1428 Ok(vec![TagEvent::Tagged {
1429 count: cmd.count,
1430 label: cmd.label,
1431 }]
1432 .into())
1433 }
1434
1435 /// Migrate v1 `Tagged` events (missing `label`) to v2.
1436 ///
1437 /// v1 payload: `{"count": N}` (no `label` field)
1438 /// v2 payload: `{"count": N, "label": ""}` (default empty string)
1439 ///
1440 /// Because the event uses `#[serde(untagged)]`, the envelope payload
1441 /// is the flat struct — variant discrimination comes from `event_type`.
1442 fn upcast(
1443 event_type: &str,
1444 from_version: u32,
1445 mut payload: serde_json::Value,
1446 ) -> Result<serde_json::Value, EngineError> {
1447 if event_type == "Tagged" && from_version == 1 {
1448 if let Some(obj) = payload.as_object_mut() {
1449 obj.entry("label")
1450 .or_insert_with(|| serde_json::Value::String(String::new()));
1451 }
1452 }
1453 Ok(payload)
1454 }
1455 }
1456
1457 /// Inject a raw v1 event (no `label` field) directly into the store and
1458 /// confirm that `state()` replays it correctly via `upcast`.
1459 #[tokio::test]
1460 async fn upcast_v1_event_adds_default_label() {
1461 let store = InMemoryEventStore::new();
1462 let p = Process::<TagWorkflow, _>::new(
1463 store.clone(), // shares the underlying Arc<RwLock<_>>
1464 TenantId::new(),
1465 WorkflowId::new("tag", "FV2025-10-01"),
1466 );
1467
1468 // v1 payload: flat struct fields, no `label` (untagged serde repr).
1469 let v1_payload = serde_json::json!({ "count": 7 });
1470 let raw = NewEvent {
1471 correlation_id: CorrelationId::new(),
1472 causation_id: None,
1473 conversation_id: ConversationId::new(),
1474 process_id: p.process_id(),
1475 tenant_id: p.tenant_id(),
1476 workflow_id: p.workflow_id().clone(),
1477 event_type: "Tagged".into(),
1478 schema_version: 1, // ← schema_version 1 (old format)
1479 payload: v1_payload,
1480 };
1481 store
1482 .append(p.stream_id(), ExpectedVersion::Any, &[raw])
1483 .await
1484 .expect("inject v1 event");
1485
1486 // Replay via the v2 workflow — `upcast` must fill in `label: ""`.
1487 let state = p.state().await.expect("state must replay without error");
1488 assert_eq!(state.total, 7, "count must be accumulated");
1489 assert_eq!(
1490 state.last_label, "",
1491 "missing v1 label must default to empty string"
1492 );
1493
1494 // Also verify that a normally-executed v2 event round-trips correctly.
1495 p.execute(TagCommand {
1496 count: 3,
1497 label: "hello".into(),
1498 })
1499 .await
1500 .unwrap();
1501 let state2 = p.state().await.unwrap();
1502 assert_eq!(state2.total, 10);
1503 assert_eq!(state2.last_label, "hello");
1504 }
1505}