mako_engine/process.rs
1//! [`Process`] — ergonomic typed handle for a single MaKo process instance.
2//!
3//! Instead of threading `stream_id`, `workflow_id`, `tenant_id`, and a store
4//! reference through every call to the write path, bind them once into a
5//! `Process<W, S>` and call [`execute`] / [`state`] directly.
6//!
7//! # Starting a new process
8//!
9//! ```rust,ignore
10//! use mako_engine::{
11//! event_store::InMemoryEventStore,
12//! ids::TenantId,
13//! process::Process,
14//! version::WorkflowId,
15//! };
16//!
17//! let store = InMemoryEventStore::new();
18//! let process = Process::<MyWorkflow, _>::new(
19//! store,
20//! TenantId::new(),
21//! WorkflowId::new("my-workflow", "FV2024-10-01"),
22//! );
23//!
24//! let envelopes = process.execute(my_command).await?;
25//! let current = process.state().await?;
26//! ```
27//!
28//! # Resuming an existing process
29//!
30//! ```rust,ignore
31//! let process = Process::<MyWorkflow, _>::from_stream(
32//! store, stream_id, process_id, tenant_id, workflow_id,
33//! );
34//! ```
35//!
36//! [`execute`]: Process::execute
37//! [`state`]: Process::state
38
39use std::marker::PhantomData;
40
41use crate::{
42 envelope::EventEnvelope,
43 error::EngineError,
44 event_store::EventStore,
45 ids::{ProcessId, ProcessIdentity, StreamId, TenantId},
46 snapshot::{Snapshot, SnapshotStore},
47 version::WorkflowId,
48 workflow::{
49 CommandContext, Workflow, execute_command, execute_command_and_collect,
50 execute_command_with_snapshot,
51 },
52};
53
54// ── Process ───────────────────────────────────────────────────────────────────
55
56/// An ergonomic typed handle for a single MaKo process instance.
57///
58/// `Process` bundles the [`StreamId`], [`ProcessId`], [`TenantId`],
59/// [`WorkflowId`], and event store into a single owned value so callers do not
60/// need to pass them on every command dispatch.
61///
62/// ## Generic parameters
63///
64/// - `W` — the [`Workflow`] implementation. In practice this is a zero-size
65/// marker struct; the type parameter carries the domain logic as associated
66/// types.
67/// - `S` — the [`EventStore`] backend. [`InMemoryEventStore`] is the default
68/// for tests; production deployments wrap a persistent backend in
69/// [`Arc`][std::sync::Arc] and use `Process<W, Arc<MyStore>>`.
70///
71/// ## Clone semantics
72///
73/// If `S: Clone` (e.g. [`InMemoryEventStore`] or `Arc<…>`), `Process` is also
74/// `Clone` and all clones share the same underlying storage.
75///
76/// [`InMemoryEventStore`]: crate::event_store::InMemoryEventStore
77#[allow(clippy::struct_field_names)] // `process_id` and `stream_id` are intentional: they
78// describe engine-layer concepts, not redundant prefixes.
79pub struct Process<W: Workflow, S: EventStore> {
80 stream_id: StreamId,
81 process_id: ProcessId,
82 tenant_id: TenantId,
83 workflow_id: WorkflowId,
84 store: S,
85 _phantom: PhantomData<fn() -> W>,
86}
87
88impl<W: Workflow, S: EventStore> Process<W, S> {
89 /// Create a fresh process instance.
90 ///
91 /// Generates a new [`ProcessId`] and derives the [`StreamId`] from
92 /// `tenant_id` and `process_id` (`process/{tenant_id}/{process_id}`).
93 /// Use this when starting a new MaKo process
94 /// (e.g. on receipt of the first inbound UTILMD Lieferbeginn).
95 #[must_use]
96 pub fn new(store: S, tenant_id: TenantId, workflow_id: WorkflowId) -> Self {
97 let process_id = ProcessId::new();
98 let stream_id = StreamId::for_process(tenant_id, &process_id);
99 Self {
100 stream_id,
101 process_id,
102 tenant_id,
103 workflow_id,
104 store,
105 _phantom: PhantomData,
106 }
107 }
108
109 /// Attach to an existing process stream.
110 ///
111 /// Use this on service restart or when routing an inbound message to an
112 /// already-running process whose identifiers were previously persisted.
113 #[must_use]
114 pub fn from_stream(
115 store: S,
116 stream_id: StreamId,
117 process_id: ProcessId,
118 tenant_id: TenantId,
119 workflow_id: WorkflowId,
120 ) -> Self {
121 Self {
122 stream_id,
123 process_id,
124 tenant_id,
125 workflow_id,
126 store,
127 _phantom: PhantomData,
128 }
129 }
130
131 /// The event stream identifier for this process.
132 #[must_use]
133 pub fn stream_id(&self) -> &StreamId {
134 &self.stream_id
135 }
136
137 /// The stable process identifier.
138 #[must_use]
139 pub fn process_id(&self) -> ProcessId {
140 self.process_id
141 }
142
143 /// The tenant that owns this process.
144 #[must_use]
145 pub fn tenant_id(&self) -> TenantId {
146 self.tenant_id
147 }
148
149 /// The workflow version under which this process was created.
150 #[must_use]
151 pub fn workflow_id(&self) -> &WorkflowId {
152 &self.workflow_id
153 }
154
155 /// Return a serializable value bundle of all four process identifiers.
156 ///
157 /// Persist this to a routing table (e.g. keyed by `conversation_id` or
158 /// `correlation_id`) so inbound messages can be routed to the correct
159 /// running process without the caller needing to manage four separate
160 /// fields.
161 ///
162 /// Use [`Process::from_identity`] to re-attach to the same process stream
163 /// on a subsequent request.
164 ///
165 /// ```rust,ignore
166 /// let id = process.identity();
167 /// routing_table.insert(conv_id, id.clone());
168 ///
169 /// // Later, on a subsequent inbound message:
170 /// let id = routing_table.get(&conv_id)?;
171 /// let process = Process::<MyWorkflow, _>::from_identity(store, id);
172 /// ```
173 #[must_use]
174 pub fn identity(&self) -> ProcessIdentity {
175 ProcessIdentity::new(self.process_id, self.tenant_id, self.workflow_id.clone())
176 }
177
178 /// Build a [`CommandContext`] for an inbound EDIFACT message dispatch.
179 ///
180 /// Derives a **deterministic** [`CorrelationId`] from `interchange_ref`
181 /// (UUID v5) so repeated dispatches of the same EDIFACT message — e.g.
182 /// AS4 retransmissions or idempotent REST replays — produce the same
183 /// correlation root. This makes EDIFACT-level idempotency observable in
184 /// distributed traces without any extra dedup logic at the engine level.
185 ///
186 /// Use this instead of [`Process::execute`] when you need to propagate
187 /// EDIFACT correlation metadata into the event stream. The returned
188 /// context is passed to [`Process::execute_with`].
189 ///
190 /// # Example
191 ///
192 /// ```rust,ignore
193 /// let process = ctx.resume::<GpkeSupplierChangeWorkflow>(identity);
194 /// let cmd_ctx = process.context_for_inbound(&utilmd_interchange_ref);
195 /// process.execute_with(command, cmd_ctx).await?;
196 /// ```
197 ///
198 /// [`CorrelationId`]: crate::ids::CorrelationId
199 #[must_use]
200 pub fn context_for_inbound(&self, interchange_ref: &str) -> CommandContext {
201 CommandContext::new(self.tenant_id, self.process_id, self.workflow_id.clone())
202 .with_correlation(crate::ids::CorrelationId::from_interchange_ref(
203 interchange_ref,
204 ))
205 }
206
207 /// Attach to an existing process stream from a previously persisted
208 /// [`ProcessIdentity`].
209 ///
210 /// This is the companion to [`Process::identity`]: look up the identity
211 /// from your routing table and call `from_identity` to get a live
212 /// `Process` handle bound to `store`.
213 #[must_use]
214 pub fn from_identity(store: S, identity: ProcessIdentity) -> Self {
215 Self {
216 stream_id: identity.stream_id().clone(),
217 process_id: identity.process_id,
218 tenant_id: identity.tenant_id,
219 workflow_id: identity.workflow_id,
220 store,
221 _phantom: PhantomData,
222 }
223 }
224
225 /// Return the number of events currently in the stream.
226 ///
227 /// Uses [`EventStore::stream_version`] for an efficient O(1) metadata
228 /// query on backends that override it. Falls back to loading all events
229 /// on stores that use the default implementation.
230 ///
231 /// Use this to decide whether to take a snapshot — e.g. with
232 /// [`Snapshot::should_take`]:
233 ///
234 /// ```rust,ignore
235 /// if Snapshot::should_take(process.event_count().await?, 100) {
236 /// process.take_snapshot(&snap_store, 100).await?;
237 /// }
238 /// ```
239 ///
240 /// # Errors
241 ///
242 /// Returns [`EngineError::Store`] on storage failures.
243 ///
244 /// [`Snapshot::should_take`]: crate::snapshot::Snapshot::should_take
245 pub async fn event_count(&self) -> Result<u64, EngineError> {
246 self.store.stream_version(&self.stream_id).await
247 }
248
249 /// Dispatch `command` using a freshly generated [`CommandContext`].
250 ///
251 /// A new [`CorrelationId`] and [`ConversationId`] are auto-generated for
252 /// each call. To propagate tracing IDs from an inbound EDIFACT message
253 /// across a multi-step command chain, use [`execute_with`].
254 ///
255 /// # Errors
256 ///
257 /// - [`EngineError::VersionConflict`] when a concurrent writer raced ahead;
258 /// retry by calling `execute` again.
259 /// - [`EngineError::Workflow`] when the workflow rejects the command.
260 /// - [`EngineError::Deserialization`] when a stored event cannot be decoded.
261 ///
262 /// [`CorrelationId`]: crate::ids::CorrelationId
263 /// [`ConversationId`]: crate::ids::ConversationId
264 /// [`execute_with`]: Process::execute_with
265 #[cfg_attr(
266 feature = "tracing",
267 tracing::instrument(skip(self, command), fields(
268 workflow = %self.workflow_id,
269 process_id = %self.process_id,
270 stream_id = %self.stream_id,
271 ))
272 )]
273 pub async fn execute(&self, command: W::Command) -> Result<Vec<EventEnvelope>, EngineError> {
274 let ctx = CommandContext::new(self.tenant_id, self.process_id, self.workflow_id.clone());
275 execute_command::<W, S>(&self.store, &self.stream_id, command, &ctx).await
276 }
277
278 /// Like [`execute`] but also returns the outbox messages produced by
279 /// [`Workflow::handle`], fully stamped with the real IDs from the persisted
280 /// event.
281 ///
282 /// The returned [`OutboxMessage`] entries have their `causation_event_id`
283 /// set to the `event_id` of the first persisted event — identical to what
284 /// `execute_and_enqueue` writes into the [`OutboxStore`] atomically. This
285 /// makes the messages ready to pass directly to the EDIFACT renderer
286 /// without any manual ID stitching.
287 ///
288 /// Use this in E2E and integration tests that need to inspect or render
289 /// outbox messages after a command is persisted, without the awkward
290 /// `handle()` + `execute()` double invocation.
291 ///
292 /// [`execute`]: Process::execute
293 /// [`OutboxMessage`]: crate::outbox::OutboxMessage
294 /// [`OutboxStore`]: crate::outbox::OutboxStore
295 ///
296 /// # Errors
297 ///
298 /// Returns [`EngineError`] on storage or command handling failure.
299 pub async fn execute_and_collect(
300 &self,
301 command: W::Command,
302 ) -> Result<(Vec<EventEnvelope>, Vec<crate::outbox::OutboxMessage>), EngineError> {
303 let ctx = CommandContext::new(self.tenant_id, self.process_id, self.workflow_id.clone());
304 let (events, pending) =
305 execute_command_and_collect::<W, S>(&self.store, &self.stream_id, command, &ctx)
306 .await?;
307
308 // Stamp each PendingOutbox with the real IDs from the persisted event.
309 // Using the first event's event_id as causation_event_id mirrors what
310 // execute_and_enqueue writes into the OutboxStore atomically.
311 let causation_event_id = events
312 .first()
313 .map_or_else(crate::ids::EventId::new, |e| e.event_id);
314
315 let outbox = pending
316 .into_iter()
317 .map(|p| {
318 crate::outbox::OutboxMessage::new(
319 self.stream_id.clone(),
320 self.process_id,
321 self.tenant_id,
322 ctx.correlation_id,
323 ctx.conversation_id,
324 causation_event_id,
325 p.message_type,
326 p.recipient,
327 p.payload,
328 )
329 })
330 .collect();
331
332 Ok((events, outbox))
333 }
334
335 /// Dispatch `command` with a caller-supplied [`CommandContext`].
336 ///
337 /// Use this when you need to thread a specific `correlation_id`,
338 /// `conversation_id`, or `causation_id` through the command. For example,
339 /// when dispatching an APERAK in response to a UTILMD, pass the
340 /// `conversation_id` from the UTILMD envelope so both exchanges are
341 /// traceable as a single business conversation.
342 ///
343 /// Build a context with:
344 ///
345 /// ```rust,ignore
346 /// let ctx = CommandContext::new(tenant_id, process_id, workflow_id)
347 /// .with_causation(utilmd_event_id.into()) // From<EventId> for CausationId
348 /// .with_conversation(utilmd_conversation_id);
349 /// process.execute_with(DispatchAperak { .. }, ctx).await?;
350 /// ```
351 ///
352 /// # Errors
353 ///
354 /// See [`Process::execute`] for the error contract.
355 ///
356 /// [`Process::execute`]: Process::execute
357 #[cfg_attr(
358 feature = "tracing",
359 tracing::instrument(skip(self, command, ctx), fields(
360 workflow = %self.workflow_id,
361 process_id = %self.process_id,
362 correlation_id = %ctx.correlation_id,
363 ))
364 )]
365 pub async fn execute_with(
366 &self,
367 command: W::Command,
368 ctx: CommandContext,
369 ) -> Result<Vec<EventEnvelope>, EngineError> {
370 execute_command::<W, S>(&self.store, &self.stream_id, command, &ctx).await
371 }
372
373 /// Dispatch `command` using a snapshot store to accelerate state reconstruction.
374 ///
375 /// Equivalent to [`Process::execute`] but starts replay from the most recent
376 /// snapshot rather than from sequence 0. For streams with thousands of events
377 /// and a snapshot within the last 100 events, this reduces replay cost from
378 /// O(n) to O(k) where k is the tail length since the last snapshot.
379 ///
380 /// When no snapshot exists or the schema version has changed, falls back to
381 /// full O(n) replay — identical in cost to [`Process::execute`].
382 ///
383 /// # Errors
384 ///
385 /// Same contract as [`Process::execute`].
386 pub async fn execute_snapshot<Snap>(
387 &self,
388 command: W::Command,
389 snap_store: &Snap,
390 ) -> Result<Vec<EventEnvelope>, EngineError>
391 where
392 W::State: serde::de::DeserializeOwned,
393 Snap: SnapshotStore,
394 {
395 let ctx = CommandContext::new(self.tenant_id, self.process_id, self.workflow_id.clone());
396 execute_command_with_snapshot::<W, S, Snap>(
397 &self.store,
398 snap_store,
399 &self.stream_id,
400 command,
401 &ctx,
402 )
403 .await
404 }
405
406 /// Reconstruct the current workflow state by replaying all persisted events.
407 ///
408 /// This is a **read-only** operation — it loads events but does not
409 /// acquire any write lock or check optimistic concurrency. Use it to:
410 ///
411 /// - Inspect process status in tests without dispatching a command.
412 /// - Build a diagnostic snapshot for observability or health checks.
413 /// - Implement query-side read models that need the full typed state.
414 ///
415 /// For production read models, prefer a [`Projection`] that is updated
416 /// incrementally rather than replaying the full stream on every query.
417 ///
418 /// To accelerate replay for long-lived streams, use
419 /// [`Process::state_with_snapshot`] instead.
420 ///
421 /// # Errors
422 ///
423 /// - [`EngineError::Store`] on storage failures.
424 /// - [`EngineError::Deserialization`] when a stored event cannot be decoded
425 /// into `W::Event` (schema migration required).
426 ///
427 /// [`Projection`]: crate::projection::Projection
428 #[cfg_attr(
429 feature = "tracing",
430 tracing::instrument(skip(self), fields(
431 workflow = %self.workflow_id,
432 stream_id = %self.stream_id,
433 ))
434 )]
435 pub async fn state(&self) -> Result<W::State, EngineError> {
436 self.store
437 .fold_stream(&self.stream_id, 0, W::State::default(), |acc, env| {
438 let payload = W::upcast(&env.event_type, env.schema_version, env.payload)?;
439 let event: W::Event = serde_json::from_value(payload)
440 .map_err(|e| EngineError::Deserialization(e.to_string()))?;
441 Ok(W::apply(acc, &event))
442 })
443 .await
444 }
445
446 // ── Snapshot-aware state reconstruction ──────────────────────────────────
447
448 /// Reconstruct current state using a snapshot as the starting point.
449 ///
450 /// Loads the most recent snapshot for this stream from `snap_store`. If
451 /// one exists, deserializes it into `W::State` and then replays only
452 /// events appended **after** the snapshot's `sequence_number`
453 /// (O(k) instead of O(n)). Falls back to full replay when no snapshot
454 /// exists.
455 ///
456 /// ## When to use
457 ///
458 /// Use this instead of [`Process::state`] for long-lived processes where
459 /// the event count grows large. Pair it with [`Process::take_snapshot`]
460 /// to keep the snapshot store current after each command.
461 ///
462 /// ## Schema version compatibility
463 ///
464 /// Snapshots whose `state` field cannot be deserialized into `W::State`
465 /// (e.g. after a breaking state schema change) will return
466 /// [`EngineError::Deserialization`]. In that case, fall back to
467 /// [`Process::state`] (full replay) and take a fresh snapshot.
468 ///
469 /// # Errors
470 ///
471 /// - [`EngineError::Store`] on snapshot or event storage failures.
472 /// - [`EngineError::Deserialization`] when the snapshot state or a tail
473 /// event cannot be decoded.
474 #[cfg_attr(
475 feature = "tracing",
476 tracing::instrument(skip(self, snap_store), fields(
477 workflow = %self.workflow_id,
478 stream_id = %self.stream_id,
479 ))
480 )]
481 pub async fn state_with_snapshot<Snap: SnapshotStore>(
482 &self,
483 snap_store: &Snap,
484 ) -> Result<W::State, EngineError>
485 where
486 W::State: serde::de::DeserializeOwned,
487 {
488 let maybe_snap = snap_store.load(&self.stream_id).await?;
489
490 let (initial_state, from_sequence) = match maybe_snap {
491 Some(snap) => {
492 if snap.state_schema_version == W::state_schema_version() {
493 let state = serde_json::from_value::<W::State>(snap.state)
494 .map_err(|e| EngineError::Deserialization(e.to_string()))?;
495 (state, snap.sequence_number)
496 } else {
497 // Schema version mismatch: discard the stale snapshot and
498 // fall back to full replay. The caller should take a fresh
499 // snapshot after this reconstruction completes.
500 tracing::warn!(
501 expected = W::state_schema_version(),
502 actual = snap.state_schema_version,
503 stream_id = %self.stream_id,
504 "snapshot schema version mismatch; falling back to full replay"
505 );
506 (W::State::default(), 0)
507 }
508 }
509 None => (W::State::default(), 0),
510 };
511
512 let tail = self
513 .store
514 .fold_stream(&self.stream_id, from_sequence, initial_state, |acc, env| {
515 let payload = W::upcast(&env.event_type, env.schema_version, env.payload)?;
516 let event: W::Event = serde_json::from_value(payload)
517 .map_err(|e| EngineError::Deserialization(e.to_string()))?;
518 Ok(W::apply(acc, &event))
519 })
520 .await?;
521 Ok(tail)
522 }
523
524 /// Reconstruct current state and save a snapshot if the event-count
525 /// threshold is reached.
526 ///
527 /// Checks [`Snapshot::should_take`] with `interval`. When at least
528 /// `interval` new events have accumulated since the last snapshot,
529 /// reconstructs state via full replay, serializes it, and calls
530 /// [`SnapshotStore::save`].
531 ///
532 /// Returns `true` when a snapshot was taken, `false` when the threshold
533 /// was not reached or `interval` is `0`.
534 ///
535 /// ## Integration pattern
536 ///
537 /// ```rust,ignore
538 /// // After every successful command:
539 /// process.execute(command).await?;
540 /// process.take_snapshot(&snap_store, 100).await?;
541 ///
542 /// // On the read path — O(k) instead of O(n):
543 /// let state = process.state_with_snapshot(&snap_store).await?;
544 /// ```
545 ///
546 /// # Errors
547 ///
548 /// - [`EngineError::Store`] on snapshot storage failures.
549 /// - [`EngineError::Serialization`] when the state cannot be JSON-encoded.
550 /// - [`EngineError::Deserialization`] when a stored event cannot be decoded.
551 ///
552 /// [`Snapshot::should_take`]: crate::snapshot::Snapshot::should_take
553 pub async fn take_snapshot<Snap: SnapshotStore>(
554 &self,
555 snap_store: &Snap,
556 interval: u64,
557 ) -> Result<bool, EngineError>
558 where
559 W::State: serde::Serialize,
560 {
561 let count = self.event_count().await?;
562 // Load the last snapshot (if any) to get its sequence number.
563 let last_snap_seq = snap_store
564 .load(&self.stream_id)
565 .await?
566 .map_or(0, |s| s.sequence_number);
567 if !Snapshot::should_take(count, last_snap_seq, interval) {
568 return Ok(false);
569 }
570 let state = self.state().await?;
571 let payload =
572 serde_json::to_value(&state).map_err(|e| EngineError::Serialization(e.to_string()))?;
573 let snap = Snapshot::new(
574 self.stream_id.clone(),
575 count,
576 W::state_schema_version(),
577 payload,
578 );
579 snap_store.save(&snap).await?;
580 Ok(true)
581 }
582
583 // ── Retry ─────────────────────────────────────────────────────────────────
584
585 /// Dispatch `command` with automatic retry on [`EngineError::VersionConflict`].
586 ///
587 /// A version conflict occurs when a concurrent writer appended events
588 /// between this process's read and its append attempt. On each conflict,
589 /// the engine **reloads the complete event stream from the store and
590 /// replays all events** to rebuild fresh state before re-handling the
591 /// command. Stale in-memory state from a previous attempt is never
592 /// carried forward — each retry always starts from a fully-rebuilt snapshot.
593 ///
594 /// Non-conflict errors (storage failures, workflow rejections) are
595 /// returned immediately without retrying.
596 ///
597 /// A freshly-generated [`CommandContext`] is pinned before the first
598 /// attempt and reused across all retries so all events share the same
599 /// correlation root regardless of retry count. Use
600 /// [`execute_with_retry_ctx`] to supply a specific context (e.g. one
601 /// derived from an inbound EDIFACT envelope).
602 ///
603 /// ## When to use
604 ///
605 /// Use for commands where two inbound EDIFACT messages for the same
606 /// process may arrive concurrently — e.g. a UTILMD and its APERAK
607 /// processed on separate async tasks.
608 ///
609 /// ## Command cloning
610 ///
611 /// `W::Command` must implement [`Clone`] so it can be resubmitted on
612 /// each retry without reconstructing it from scratch.
613 ///
614 /// # Errors
615 ///
616 /// - [`EngineError::VersionConflict`] when all `max_attempts` are
617 /// exhausted without a successful append.
618 /// - Any non-conflict [`EngineError`] returned by the workflow or storage.
619 /// - [`EngineError::Store`] when `max_attempts` is `0`.
620 ///
621 /// [`execute_with_retry_ctx`]: Process::execute_with_retry_ctx
622 pub async fn execute_with_retry(
623 &self,
624 command: W::Command,
625 max_attempts: u32,
626 ) -> Result<Vec<EventEnvelope>, EngineError>
627 where
628 W::Command: Clone,
629 {
630 if max_attempts == 0 {
631 return Err(EngineError::store("max_attempts must be >= 1"));
632 }
633 // Pin context before the loop — all retry attempts share the same
634 // correlation root for consistent distributed tracing.
635 let ctx = CommandContext::new(self.tenant_id, self.process_id, self.workflow_id.clone());
636 self.execute_with_retry_ctx(command, ctx, max_attempts)
637 .await
638 }
639
640 /// Dispatch `command` with a caller-supplied [`CommandContext`] and
641 /// automatic retry on [`EngineError::VersionConflict`].
642 ///
643 /// Identical to [`execute_with_retry`] but threads the provided `ctx`
644 /// (including its `correlation_id`, `conversation_id`, and `causation_id`)
645 /// through every retry attempt. Use this when you need to propagate
646 /// tracing IDs from an inbound EDIFACT envelope across a retried command.
647 ///
648 /// # Example
649 ///
650 /// ```rust,ignore
651 /// let ctx = CommandContext::from_envelope(&utilmd_envelope, workflow_id);
652 /// process.execute_with_retry_ctx(HandleAperak { .. }, ctx, 3).await?;
653 /// ```
654 ///
655 /// # Errors
656 ///
657 /// See [`execute_with_retry`] for the error contract.
658 ///
659 /// # Panics
660 ///
661 /// Panics if `max_attempts` is 0 and the guard at the top of the function
662 /// is somehow bypassed (unreachable in practice).
663 ///
664 /// [`execute_with_retry`]: Process::execute_with_retry
665 pub async fn execute_with_retry_ctx(
666 &self,
667 command: W::Command,
668 ctx: CommandContext,
669 max_attempts: u32,
670 ) -> Result<Vec<EventEnvelope>, EngineError>
671 where
672 W::Command: Clone,
673 {
674 if max_attempts == 0 {
675 return Err(EngineError::store("max_attempts must be >= 1"));
676 }
677 let mut conflict_err: Option<EngineError> = None;
678 for attempt in 0..max_attempts {
679 // Each call to `execute_with` internally calls `fold_stream` from
680 // sequence 0 (or from the most recent snapshot if one is available).
681 // State is always freshly reconstructed from the event log on every
682 // attempt — there is no stale state carried forward between retries.
683 // Do NOT "optimise" this by caching state across attempts; doing so
684 // would allow a winning concurrent writer's events to be invisible
685 // to the retry, producing incorrect decisions and duplicate events.
686 match self.execute_with(command.clone(), ctx.clone()).await {
687 Ok(envs) => return Ok(envs),
688 Err(e) if e.is_version_conflict() => {
689 conflict_err = Some(e);
690 // Brief jittered sleep to reduce thundering-herd under
691 // concurrent ERP commands targeting the same stream.
692 // Delay = uniform random in [0, 10ms * attempt], capped at 80ms.
693 // Uses the OS CSPRNG via rand so every retry gets independent
694 // entropy regardless of stream-ID prefix.
695 if attempt + 1 < max_attempts {
696 let entropy: u64 = rand::random();
697 let window_ms: u64 = (10 * (u64::from(attempt) + 1)).min(80);
698 let jitter_ms = if window_ms == 0 {
699 0
700 } else {
701 entropy % window_ms
702 };
703 tokio::time::sleep(std::time::Duration::from_millis(jitter_ms)).await;
704 }
705 }
706 Err(e) => return Err(e), // non-retriable — propagate immediately
707 }
708 }
709 // At least one attempt ran (max_attempts >= 1), so conflict_err is Some.
710 Err(conflict_err.expect("loop ran at least once"))
711 }
712
713 /// Execute `command` and atomically co-persist any [`PendingOutbox`] messages
714 /// produced by [`Workflow::handle`].
715 ///
716 /// Like [`execute`], but requires `S: AtomicAppend`. When the workflow's
717 /// `handle` returns outbox messages alongside events, both are written to
718 /// storage in a single `WriteBatch`, eliminating the silent message-loss
719 /// window that would exist with separate writes.
720 ///
721 /// When the handle returns no outbox messages, this degenerates to a plain
722 /// `EventStore::append` (no performance cost).
723 ///
724 /// **Use this method instead of [`execute`] in all production code** that
725 /// needs outbox delivery guarantees. Plain `execute` silently drops any
726 /// outbox entries produced by the workflow handler — a crash between
727 /// `execute` and a subsequent manual `OutboxStore::enqueue` call would
728 /// lose the APERAK or UTILMD response permanently.
729 ///
730 /// For long event streams with periodic snapshots use
731 /// [`execute_and_enqueue_snapshot`] to reduce O(n) replay cost to O(k).
732 /// In concurrent environments where `VersionConflict` is expected, use
733 /// [`execute_and_enqueue_with_retry`] to retry automatically.
734 ///
735 /// # Example
736 ///
737 /// ```rust,ignore
738 /// use std::sync::Arc;
739 /// use mako_engine::process::Process;
740 /// use mako_engine::version::WorkflowId;
741 /// use mako_engine::ids::TenantId;
742 ///
743 /// // SlateDbStore implements AtomicAppend — required for execute_and_enqueue.
744 /// let store = Arc::new(SlateDbStore::open_in_memory().await?);
745 /// let tenant_id = TenantId::from_party_id("9904231000007");
746 /// let workflow_id = WorkflowId::new("gpke-supplier-change", fv);
747 ///
748 /// let process = Process::<GpkeSupplierChangeWorkflow, _>::new(
749 /// Arc::clone(&store),
750 /// tenant_id,
751 /// workflow_id,
752 /// );
753 ///
754 /// // The workflow handle emits a PendingOutbox APERAK entry alongside the event.
755 /// // execute_and_enqueue writes both in one WriteBatch — no partial-write window.
756 /// let events = process
757 /// .execute_and_enqueue(GpkeCommand::ReceiveUtilmd { pid: 55001, payload })
758 /// .await?;
759 ///
760 /// assert!(!events.is_empty(), "at least one event was persisted");
761 ///
762 /// // The APERAK outbox entry is now visible to the outbox worker:
763 /// let pending = store.peek_outbox(tenant_id, 10).await?;
764 /// assert_eq!(pending.len(), 1, "APERAK enqueued atomically with the event");
765 /// ```
766 ///
767 /// # Errors
768 ///
769 /// - [`EngineError::VersionConflict`] — stream was modified concurrently;
770 /// retry with [`execute_and_enqueue_with_retry`].
771 /// - [`EngineError::Workflow`] — the command was rejected by the workflow.
772 /// - [`EngineError::Store`] / [`EngineError::Outbox`] — storage failure.
773 ///
774 /// [`PendingOutbox`]: crate::outbox::PendingOutbox
775 /// [`execute`]: Process::execute
776 /// [`execute_and_enqueue_snapshot`]: Process::execute_and_enqueue_snapshot
777 /// [`execute_and_enqueue_with_retry`]: Process::execute_and_enqueue_with_retry
778 pub async fn execute_and_enqueue(
779 &self,
780 command: W::Command,
781 ) -> Result<Vec<EventEnvelope>, EngineError>
782 where
783 S: crate::event_store::AtomicAppend,
784 {
785 let ctx = CommandContext::new(self.tenant_id, self.process_id, self.workflow_id.clone());
786 crate::workflow::execute_command_atomic::<W, S>(&self.store, &self.stream_id, command, &ctx)
787 .await
788 }
789
790 /// Like [`execute_and_enqueue`] but co-persists `deadlines` in the same
791 /// atomic write as events and outbox entries.
792 ///
793 /// On [`SlateDbStore`] this writes events, outbox entries, **and** deadlines
794 /// in a single SSI transaction. On in-memory test stores the default
795 /// fallback is used: events and outbox are written atomically, deadlines are
796 /// **not** persisted here and must be registered separately.
797 ///
798 /// Use this method for commands that must register a regulatory deadline
799 /// (GPKE 24h APERAK, WiM 5 WT, GeLi Gas / WiM Gas 10 WT, MABIS 1 WT).
800 ///
801 /// [`execute_and_enqueue`]: Process::execute_and_enqueue
802 /// [`SlateDbStore`]: crate::store_slatedb::SlateDbStore
803 ///
804 /// # Errors
805 ///
806 /// Returns [`EngineError`] on storage or command handling failure.
807 pub async fn execute_and_enqueue_with_deadlines(
808 &self,
809 command: W::Command,
810 deadlines: &[crate::deadline::Deadline],
811 ) -> Result<Vec<EventEnvelope>, EngineError>
812 where
813 S: crate::event_store::AtomicAppend,
814 {
815 let ctx = CommandContext::new(self.tenant_id, self.process_id, self.workflow_id.clone());
816 crate::workflow::execute_command_atomic_with_deadlines::<W, S>(
817 &self.store,
818 &self.stream_id,
819 command,
820 &ctx,
821 deadlines,
822 )
823 .await
824 }
825
826 /// Like [`execute_and_enqueue`] but uses a snapshot to accelerate replay.
827 ///
828 /// Atomically persists events and outbox entries while starting state
829 /// reconstruction from the most recent snapshot. For long streams with
830 /// periodic snapshots this reduces replay cost from O(n) to O(k).
831 ///
832 /// [`execute_and_enqueue`]: Process::execute_and_enqueue
833 ///
834 /// # Errors
835 ///
836 /// Returns [`EngineError`] on storage or command handling failure.
837 pub async fn execute_and_enqueue_snapshot<Snap>(
838 &self,
839 command: W::Command,
840 snap_store: &Snap,
841 ) -> Result<Vec<EventEnvelope>, EngineError>
842 where
843 W::State: serde::de::DeserializeOwned,
844 S: crate::event_store::AtomicAppend,
845 Snap: SnapshotStore,
846 {
847 let ctx = CommandContext::new(self.tenant_id, self.process_id, self.workflow_id.clone());
848 crate::workflow::execute_command_atomic_with_snapshot::<W, S, Snap>(
849 &self.store,
850 snap_store,
851 &self.stream_id,
852 command,
853 &ctx,
854 )
855 .await
856 }
857
858 /// Dispatch the compensation command returned by [`Workflow::on_deadline`].
859 ///
860 /// Reconstructs the current process state, calls
861 /// `W::on_deadline(deadline, &state)`, and — if the hook returns
862 /// `Some(command)` — executes it via [`Process::execute_and_enqueue`],
863 /// which atomically persists events **and** any outbox entries (e.g.
864 /// APERAK Ablehnung) produced by the compensation handler.
865 ///
866 /// Returns `Ok(Some(events))` when compensation fired, `Ok(None)` when
867 /// the hook returned `None` (deadline acknowledged as no-op).
868 ///
869 /// This is the canonical way to wire deadline firings to workflow
870 /// compensation logic. Any [`WorkflowOutput::with_outbox`] entries
871 /// returned by `on_deadline` are guaranteed to be persisted atomically —
872 /// there is no window where the event is stored but the outbox entry is
873 /// lost.
874 ///
875 /// # Example
876 ///
877 /// ```rust,ignore
878 /// // In the deadline worker:
879 /// let overdue = ctx.deadline_store().due_now(50).await?;
880 /// for deadline in overdue {
881 /// let identity = ctx.registry()
882 /// .lookup(deadline.tenant_id(), &RegistryKey::from_process(deadline.process_id()))
883 /// .await?
884 /// .expect("process must be registered");
885 /// let process = ctx.resume::<GpkeSupplierChangeWorkflow>(identity);
886 /// if let Some(events) = process.execute_timeout(&deadline).await? {
887 /// // compensation command was dispatched — APERAK Ablehnung enqueued
888 /// tracing::info!(events = events.len(), "timeout compensation applied");
889 /// }
890 /// ctx.deadline_store().cancel(deadline.deadline_id()).await?;
891 /// }
892 /// ```
893 ///
894 /// # Errors
895 ///
896 /// Propagates [`EngineError::VersionConflict`], [`EngineError::Workflow`],
897 /// and storage errors from `execute_and_enqueue`. Use
898 /// [`execute_timeout_with_retry`] when `VersionConflict` retries are
899 /// required.
900 ///
901 /// [`Workflow::on_deadline`]: crate::workflow::Workflow::on_deadline
902 /// [`WorkflowOutput::with_outbox`]: crate::workflow::WorkflowOutput::with_outbox
903 /// [`execute_timeout_with_retry`]: Process::execute_timeout_with_retry
904 pub async fn execute_timeout(
905 &self,
906 deadline: &crate::deadline::Deadline,
907 ) -> Result<Option<Vec<EventEnvelope>>, EngineError>
908 where
909 S: crate::event_store::AtomicAppend,
910 {
911 let state = self.state().await?;
912 match W::on_deadline(deadline, &state) {
913 None => Ok(None),
914 Some(command) => self.execute_and_enqueue(command).await.map(Some),
915 }
916 }
917
918 /// Like [`execute_timeout`] but retries on [`VersionConflict`] up to
919 /// `max_attempts` times.
920 ///
921 /// Use this in production deadline workers where concurrent event appends
922 /// are expected. Outbox entries (e.g. APERAK Ablehnung) produced by the
923 /// compensation handler are persisted atomically on every attempt.
924 ///
925 /// [`execute_timeout`]: Process::execute_timeout
926 /// [`VersionConflict`]: crate::error::EngineError::VersionConflict
927 ///
928 /// # Errors
929 ///
930 /// Returns [`EngineError`] on storage or command handling failure.
931 ///
932 /// # Panics
933 ///
934 /// Panics if the deadline produces a command but the retry loop somehow
935 /// exhausts without capturing an error (unreachable in practice).
936 pub async fn execute_timeout_with_retry(
937 &self,
938 deadline: &crate::deadline::Deadline,
939 max_attempts: u32,
940 ) -> Result<Option<Vec<EventEnvelope>>, EngineError>
941 where
942 S: crate::event_store::AtomicAppend,
943 W::Command: Clone,
944 {
945 let state = self.state().await?;
946 match W::on_deadline(deadline, &state) {
947 None => Ok(None),
948 Some(command) => self
949 .execute_and_enqueue_with_retry(command, max_attempts)
950 .await
951 .map(Some),
952 }
953 }
954
955 /// Like [`execute_and_enqueue`] but retries on [`crate::error::EngineError::VersionConflict`] up to
956 /// `max_attempts` times.
957 ///
958 /// [`execute_and_enqueue`]: Process::execute_and_enqueue
959 ///
960 /// # Errors
961 ///
962 /// Returns [`EngineError`] on storage or command handling failure.
963 ///
964 /// # Panics
965 ///
966 /// Panics if `max_attempts` is 0 and the guard is bypassed (unreachable).
967 pub async fn execute_and_enqueue_with_retry(
968 &self,
969 command: W::Command,
970 max_attempts: u32,
971 ) -> Result<Vec<EventEnvelope>, EngineError>
972 where
973 S: crate::event_store::AtomicAppend,
974 W::Command: Clone,
975 {
976 if max_attempts == 0 {
977 return Err(EngineError::store("max_attempts must be >= 1"));
978 }
979 let ctx = CommandContext::new(self.tenant_id, self.process_id, self.workflow_id.clone());
980 let mut conflict_err: Option<EngineError> = None;
981 for _ in 0..max_attempts {
982 match crate::workflow::execute_command_atomic::<W, S>(
983 &self.store,
984 &self.stream_id,
985 command.clone(),
986 &ctx,
987 )
988 .await
989 {
990 Ok(envs) => return Ok(envs),
991 Err(e) if e.is_version_conflict() => conflict_err = Some(e),
992 Err(e) => return Err(e),
993 }
994 }
995 Err(conflict_err.expect("loop ran at least once"))
996 }
997
998 /// Execute `command` atomically with outbox, then automatically snapshot
999 /// if the event-count threshold is reached.
1000 ///
1001 /// Combines [`execute_and_enqueue`] with [`take_snapshot`]: after a
1002 /// successful write, checks whether `event_count % snapshot_interval == 0`
1003 /// and, if so, serialises and saves a snapshot via `snap_store`.
1004 ///
1005 /// Pass `snapshot_interval = 0` to disable auto-snapshotting; the call
1006 /// then behaves identically to [`execute_and_enqueue`].
1007 ///
1008 /// Returns `(events, snapshot_taken)` where `snapshot_taken` is `true` when
1009 /// a snapshot was written this call.
1010 ///
1011 /// # Errors
1012 ///
1013 /// - [`EngineError::VersionConflict`] — stream was modified concurrently;
1014 /// retry with [`execute_and_enqueue_with_retry`].
1015 /// - [`EngineError::Workflow`] — the command was rejected by the workflow.
1016 /// - [`EngineError::Store`] / [`EngineError::Outbox`] — storage failure.
1017 /// - [`EngineError::Serialization`] — state serialisation failed during snapshot.
1018 ///
1019 /// [`execute_and_enqueue`]: Process::execute_and_enqueue
1020 /// [`take_snapshot`]: Process::take_snapshot
1021 /// [`execute_and_enqueue_with_retry`]: Process::execute_and_enqueue_with_retry
1022 pub async fn execute_and_enqueue_with_snapshot<Snap>(
1023 &self,
1024 command: W::Command,
1025 snap_store: &Snap,
1026 snapshot_interval: u64,
1027 ) -> Result<(Vec<EventEnvelope>, bool), EngineError>
1028 where
1029 S: crate::event_store::AtomicAppend,
1030 Snap: crate::snapshot::SnapshotStore,
1031 W::State: serde::Serialize,
1032 {
1033 let events = self.execute_and_enqueue(command).await?;
1034 let snapped = if snapshot_interval > 0 {
1035 self.take_snapshot(snap_store, snapshot_interval).await?
1036 } else {
1037 false
1038 };
1039 Ok((events, snapped))
1040 }
1041
1042 /// Like [`execute_and_enqueue_with_snapshot`] but retries on
1043 /// [`crate::error::EngineError::VersionConflict`] up to `max_attempts` times.
1044 ///
1045 /// [`execute_and_enqueue_with_retry`]: Process::execute_and_enqueue_with_retry
1046 ///
1047 /// # Errors
1048 ///
1049 /// - [`EngineError::VersionConflict`] — stream was modified concurrently;
1050 /// retry with [`execute_and_enqueue_with_snapshot_and_retry`].
1051 /// - [`EngineError::Workflow`] — the command was rejected by the workflow.
1052 /// - [`EngineError::Store`] / [`EngineError::Outbox`] — storage failure.
1053 /// - [`EngineError::Serialization`] — state serialisation failed during snapshot.
1054 ///
1055 /// # Panics
1056 ///
1057 /// Panics if `max_attempts` is 0 and the loop guard is bypassed (unreachable).
1058 ///
1059 /// [`execute_and_enqueue_with_snapshot`]: Process::execute_and_enqueue_with_snapshot
1060 /// [`execute_and_enqueue_with_snapshot_and_retry`]: Process::execute_and_enqueue_with_snapshot_and_retry
1061 pub async fn execute_and_enqueue_with_snapshot_and_retry<Snap>(
1062 &self,
1063 command: W::Command,
1064 max_attempts: u32,
1065 snap_store: &Snap,
1066 snapshot_interval: u64,
1067 ) -> Result<(Vec<EventEnvelope>, bool), EngineError>
1068 where
1069 S: crate::event_store::AtomicAppend,
1070 W::Command: Clone,
1071 Snap: crate::snapshot::SnapshotStore,
1072 W::State: serde::Serialize,
1073 {
1074 let events = self
1075 .execute_and_enqueue_with_retry(command, max_attempts)
1076 .await?;
1077 let snapped = if snapshot_interval > 0 {
1078 self.take_snapshot(snap_store, snapshot_interval).await?
1079 } else {
1080 false
1081 };
1082 Ok((events, snapped))
1083 }
1084}
1085
1086impl<W: Workflow, S: EventStore + Clone> Clone for Process<W, S> {
1087 fn clone(&self) -> Self {
1088 Self {
1089 stream_id: self.stream_id.clone(),
1090 process_id: self.process_id,
1091 tenant_id: self.tenant_id,
1092 workflow_id: self.workflow_id.clone(),
1093 store: self.store.clone(),
1094 _phantom: PhantomData,
1095 }
1096 }
1097}
1098
1099impl<W: Workflow, S: EventStore + std::fmt::Debug> std::fmt::Debug for Process<W, S> {
1100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1101 f.debug_struct("Process")
1102 .field("stream_id", &self.stream_id)
1103 .field("process_id", &self.process_id)
1104 .field("workflow_id", &self.workflow_id)
1105 .finish_non_exhaustive()
1106 }
1107}
1108
1109// ── Unit tests ────────────────────────────────────────────────────────────────
1110
1111#[cfg(test)]
1112mod tests {
1113 use super::*;
1114 use crate::{
1115 envelope::NewEvent,
1116 error::WorkflowError,
1117 event_store::{EventStore, ExpectedVersion, InMemoryEventStore},
1118 ids::{ConversationId, CorrelationId, TenantId},
1119 snapshot::{InMemorySnapshotStore, NoopSnapshotStore},
1120 version::WorkflowId,
1121 workflow::{CommandPayload, EventPayload},
1122 };
1123
1124 // ── Minimal test workflow ─────────────────────────────────────────────────
1125
1126 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
1127 enum CounterEvent {
1128 Incremented { by: u32 },
1129 Reset,
1130 }
1131
1132 impl EventPayload for CounterEvent {
1133 fn event_type(&self) -> &'static str {
1134 match self {
1135 Self::Incremented { .. } => "Incremented",
1136 Self::Reset => "Reset",
1137 }
1138 }
1139 }
1140
1141 #[derive(Debug, Clone)]
1142 enum CounterCommand {
1143 Increment { by: u32 },
1144 Reset,
1145 }
1146
1147 impl CommandPayload for CounterCommand {}
1148
1149 #[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
1150 struct CounterState {
1151 value: u32,
1152 }
1153
1154 struct CounterWorkflow;
1155
1156 impl Workflow for CounterWorkflow {
1157 type State = CounterState;
1158 type Event = CounterEvent;
1159 type Command = CounterCommand;
1160
1161 fn apply(mut state: CounterState, event: &CounterEvent) -> CounterState {
1162 match event {
1163 CounterEvent::Incremented { by } => state.value += by,
1164 CounterEvent::Reset => state.value = 0,
1165 }
1166 state
1167 }
1168
1169 fn handle(
1170 _state: &CounterState,
1171 command: CounterCommand,
1172 ) -> Result<crate::workflow::WorkflowOutput<CounterEvent>, WorkflowError> {
1173 Ok(match command {
1174 CounterCommand::Increment { by } => vec![CounterEvent::Incremented { by }].into(),
1175 CounterCommand::Reset => vec![CounterEvent::Reset].into(),
1176 })
1177 }
1178 }
1179
1180 fn make_process() -> Process<CounterWorkflow, InMemoryEventStore> {
1181 Process::new(
1182 InMemoryEventStore::new(),
1183 TenantId::new(),
1184 WorkflowId::new("counter", "FV2024-10-01"),
1185 )
1186 }
1187
1188 // ── execute + state round-trip ────────────────────────────────────────────
1189
1190 #[tokio::test]
1191 async fn execute_then_state_round_trip() {
1192 let p = make_process();
1193
1194 p.execute(CounterCommand::Increment { by: 3 })
1195 .await
1196 .unwrap();
1197 p.execute(CounterCommand::Increment { by: 7 })
1198 .await
1199 .unwrap();
1200
1201 let state = p.state().await.unwrap();
1202 assert_eq!(state.value, 10);
1203 }
1204
1205 #[tokio::test]
1206 async fn event_count_matches_dispatched_commands() {
1207 let p = make_process();
1208
1209 assert_eq!(p.event_count().await.unwrap(), 0);
1210 p.execute(CounterCommand::Increment { by: 1 })
1211 .await
1212 .unwrap();
1213 assert_eq!(p.event_count().await.unwrap(), 1);
1214 p.execute(CounterCommand::Reset).await.unwrap();
1215 assert_eq!(p.event_count().await.unwrap(), 2);
1216 }
1217
1218 // ── identity round-trip ───────────────────────────────────────────────────
1219
1220 #[tokio::test]
1221 async fn identity_round_trip_via_from_identity() {
1222 let store = InMemoryEventStore::new();
1223 let p1 = Process::<CounterWorkflow, _>::new(
1224 store.clone(),
1225 TenantId::new(),
1226 WorkflowId::new("counter", "FV2024-10-01"),
1227 );
1228
1229 p1.execute(CounterCommand::Increment { by: 5 })
1230 .await
1231 .unwrap();
1232
1233 let identity = p1.identity();
1234 assert_eq!(*identity.stream_id(), *p1.stream_id());
1235 assert_eq!(identity.process_id, p1.process_id());
1236
1237 // Re-attach from identity and confirm state is visible.
1238 let p2 = Process::<CounterWorkflow, _>::from_identity(store, identity);
1239 let state = p2.state().await.unwrap();
1240 assert_eq!(state.value, 5);
1241 }
1242
1243 #[test]
1244 fn process_identity_is_serializable() {
1245 let p = make_process();
1246 let id = p.identity();
1247 let json = serde_json::to_string(&id).expect("ProcessIdentity must be serializable");
1248 let back: ProcessIdentity = serde_json::from_str(&json).unwrap();
1249 assert_eq!(*back.stream_id(), *id.stream_id());
1250 assert_eq!(back.process_id, id.process_id);
1251 }
1252
1253 // ── snapshot-accelerated state reconstruction ─────────────────────────────
1254
1255 #[tokio::test]
1256 async fn take_snapshot_and_state_with_snapshot() {
1257 let snap_store = InMemorySnapshotStore::new();
1258 let p = make_process();
1259
1260 // Dispatch 4 commands; the interval is 4.
1261 for i in 1u32..=4 {
1262 p.execute(CounterCommand::Increment { by: i })
1263 .await
1264 .unwrap();
1265 }
1266
1267 let took = p.take_snapshot(&snap_store, 4).await.unwrap();
1268 assert!(took, "snapshot must be taken at event_count = 4");
1269
1270 // Dispatch one more command after the snapshot.
1271 p.execute(CounterCommand::Increment { by: 10 })
1272 .await
1273 .unwrap();
1274
1275 let state = p.state_with_snapshot(&snap_store).await.unwrap();
1276 // 1+2+3+4 = 10, plus the final +10 = 20.
1277 assert_eq!(state.value, 20);
1278 }
1279
1280 #[tokio::test]
1281 async fn state_with_snapshot_falls_back_to_full_replay() {
1282 let p = make_process();
1283 p.execute(CounterCommand::Increment { by: 42 })
1284 .await
1285 .unwrap();
1286
1287 // NoopSnapshotStore always returns None → full replay.
1288 let state = p.state_with_snapshot(&NoopSnapshotStore).await.unwrap();
1289 assert_eq!(state.value, 42);
1290 }
1291
1292 #[tokio::test]
1293 async fn take_snapshot_skipped_between_intervals() {
1294 let snap_store = InMemorySnapshotStore::new();
1295 let p = make_process();
1296
1297 p.execute(CounterCommand::Increment { by: 1 })
1298 .await
1299 .unwrap();
1300 p.execute(CounterCommand::Increment { by: 1 })
1301 .await
1302 .unwrap();
1303 p.execute(CounterCommand::Increment { by: 1 })
1304 .await
1305 .unwrap();
1306
1307 // 3 events, interval = 4 → must not take.
1308 let took = p.take_snapshot(&snap_store, 4).await.unwrap();
1309 assert!(!took);
1310 assert!(snap_store.is_empty().await);
1311 }
1312
1313 /// Regression test for when a persisted snapshot carries a
1314 /// `state_schema_version` that does not match the current workflow's
1315 /// `state_schema_version()`, `state_with_snapshot` must silently discard
1316 /// the stale snapshot and fall back to full event replay.
1317 ///
1318 /// This guards against silent data corruption when state layout changes
1319 /// incompatibly — e.g. after adding a new required field to `CounterState`.
1320 #[tokio::test]
1321 async fn stale_snapshot_schema_version_falls_back_to_full_replay() {
1322 // CounterWorkflow uses state_schema_version() == 1 (the default).
1323 // We simulate a "migrated" workflow by injecting a snapshot whose
1324 // state_schema_version is bumped to 99, representing a schema that the
1325 // current workflow code does not understand.
1326 let snap_store = InMemorySnapshotStore::new();
1327 let p = make_process();
1328
1329 // Dispatch some events so there is something to replay.
1330 p.execute(CounterCommand::Increment { by: 5 })
1331 .await
1332 .unwrap();
1333 p.execute(CounterCommand::Increment { by: 3 })
1334 .await
1335 .unwrap();
1336
1337 // Manually save a stale snapshot with schema_version = 99.
1338 // The state payload is intentionally wrong — it should never be used.
1339 let stale = crate::snapshot::Snapshot::new(
1340 p.stream_id().clone(),
1341 2, // sequence_number after 2 events
1342 99, // ← unknown schema version
1343 serde_json::json!({ "value": 9999 }), // ← wrong value; must not be read
1344 );
1345 snap_store.save(&stale).await.unwrap();
1346
1347 // state_with_snapshot must discard the stale snapshot and replay all
1348 // events from sequence 0, producing the correct state (5+3=8).
1349 let current_state = p.state_with_snapshot(&snap_store).await.unwrap();
1350 assert_eq!(
1351 current_state.value, 8,
1352 "stale snapshot must be discarded; full replay must yield correct state"
1353 );
1354 }
1355
1356 // ── execute_with_retry ────────────────────────────────────────────────────
1357
1358 #[tokio::test]
1359 async fn execute_with_retry_succeeds_on_first_attempt() {
1360 let p = make_process();
1361 let envs = p
1362 .execute_with_retry(CounterCommand::Increment { by: 99 }, 3)
1363 .await
1364 .unwrap();
1365 assert_eq!(envs.len(), 1);
1366 assert_eq!(p.state().await.unwrap().value, 99);
1367 }
1368
1369 #[tokio::test]
1370 async fn execute_with_retry_returns_err_on_zero_attempts() {
1371 let p = make_process();
1372 let err = p
1373 .execute_with_retry(CounterCommand::Increment { by: 1 }, 0)
1374 .await
1375 .unwrap_err();
1376 assert!(
1377 matches!(err, EngineError::Store { ref message, .. } if message.contains("max_attempts")),
1378 "expected Store error about max_attempts, got: {err:?}",
1379 );
1380 }
1381
1382 // ── execute_with (explicit context) ──────────────────────────────────────
1383
1384 #[tokio::test]
1385 async fn execute_with_explicit_context_propagates_ids() {
1386 use crate::ids::{ConversationId, CorrelationId};
1387 let p = make_process();
1388
1389 let corr = CorrelationId::new();
1390 let conv = ConversationId::new();
1391 let ctx = CommandContext::new(p.tenant_id(), p.process_id(), p.workflow_id().clone())
1392 .with_correlation(corr)
1393 .with_conversation(conv);
1394
1395 let envs = p
1396 .execute_with(CounterCommand::Increment { by: 1 }, ctx)
1397 .await
1398 .unwrap();
1399 assert_eq!(envs.len(), 1);
1400 assert_eq!(envs[0].correlation_id, corr);
1401 assert_eq!(envs[0].conversation_id, conv);
1402 }
1403
1404 // ── upcast / schema-migration ─────────────────────────────────────────────
1405 //
1406 // A v2 workflow adds a `label: String` field to its single event.
1407 // Old (v1) events stored without `label` must be migrated by `upcast`.
1408 //
1409 // `#[serde(untagged)]` is used so the serialized payload is the flat
1410 // inner struct `{"count": N, "label": "..."}` rather than the externally-
1411 // tagged `{"Tagged": {"count": N}}` form. This matches the common
1412 // real-world pattern where each `EventPayload::event_type()` discriminant
1413 // IS the variant selector stored in the envelope, and the payload holds
1414 // only the fields.
1415
1416 #[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
1417 struct TagState {
1418 total: u32,
1419 last_label: String,
1420 }
1421
1422 /// v1 schema (legacy): `{ "count": u32 }` — `label` field absent.
1423 /// v2 schema: `{ "count": u32, "label": String }`.
1424 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
1425 #[serde(untagged)]
1426 enum TagEvent {
1427 Tagged { count: u32, label: String },
1428 }
1429
1430 impl EventPayload for TagEvent {
1431 fn event_type(&self) -> &'static str {
1432 "Tagged"
1433 }
1434 fn schema_version(&self) -> u32 {
1435 2
1436 }
1437 }
1438
1439 #[derive(Debug, Clone)]
1440 struct TagCommand {
1441 count: u32,
1442 label: String,
1443 }
1444 impl CommandPayload for TagCommand {}
1445
1446 struct TagWorkflow;
1447
1448 impl Workflow for TagWorkflow {
1449 type State = TagState;
1450 type Event = TagEvent;
1451 type Command = TagCommand;
1452
1453 fn apply(mut state: TagState, event: &TagEvent) -> TagState {
1454 let TagEvent::Tagged { count, label } = event;
1455 state.total += count;
1456 state.last_label = label.clone();
1457 state
1458 }
1459
1460 fn handle(
1461 _state: &TagState,
1462 cmd: TagCommand,
1463 ) -> Result<crate::workflow::WorkflowOutput<TagEvent>, WorkflowError> {
1464 Ok(vec![TagEvent::Tagged {
1465 count: cmd.count,
1466 label: cmd.label,
1467 }]
1468 .into())
1469 }
1470
1471 /// Migrate v1 `Tagged` events (missing `label`) to v2.
1472 ///
1473 /// v1 payload: `{"count": N}` (no `label` field)
1474 /// v2 payload: `{"count": N, "label": ""}` (default empty string)
1475 ///
1476 /// Because the event uses `#[serde(untagged)]`, the envelope payload
1477 /// is the flat struct — variant discrimination comes from `event_type`.
1478 fn upcast(
1479 event_type: &str,
1480 from_version: u32,
1481 mut payload: serde_json::Value,
1482 ) -> Result<serde_json::Value, EngineError> {
1483 if event_type == "Tagged"
1484 && from_version == 1
1485 && let Some(obj) = payload.as_object_mut()
1486 {
1487 obj.entry("label")
1488 .or_insert_with(|| serde_json::Value::String(String::new()));
1489 }
1490 Ok(payload)
1491 }
1492 }
1493
1494 /// Inject a raw v1 event (no `label` field) directly into the store and
1495 /// confirm that `state()` replays it correctly via `upcast`.
1496 #[tokio::test]
1497 async fn upcast_v1_event_adds_default_label() {
1498 let store = InMemoryEventStore::new();
1499 let p = Process::<TagWorkflow, _>::new(
1500 store.clone(), // shares the underlying Arc<RwLock<_>>
1501 TenantId::new(),
1502 WorkflowId::new("tag", "FV2025-10-01"),
1503 );
1504
1505 // v1 payload: flat struct fields, no `label` (untagged serde repr).
1506 let v1_payload = serde_json::json!({ "count": 7 });
1507 let raw = NewEvent {
1508 correlation_id: CorrelationId::new(),
1509 causation_id: None,
1510 conversation_id: ConversationId::new(),
1511 process_id: p.process_id(),
1512 tenant_id: p.tenant_id(),
1513 workflow_id: p.workflow_id().clone(),
1514 event_type: "Tagged".into(),
1515 schema_version: 1, // ← schema_version 1 (old format)
1516 payload: v1_payload,
1517 };
1518 store
1519 .append(p.stream_id(), ExpectedVersion::Any, &[raw])
1520 .await
1521 .expect("inject v1 event");
1522
1523 // Replay via the v2 workflow — `upcast` must fill in `label: ""`.
1524 let state = p.state().await.expect("state must replay without error");
1525 assert_eq!(state.total, 7, "count must be accumulated");
1526 assert_eq!(
1527 state.last_label, "",
1528 "missing v1 label must default to empty string"
1529 );
1530
1531 // Also verify that a normally-executed v2 event round-trips correctly.
1532 p.execute(TagCommand {
1533 count: 3,
1534 label: "hello".into(),
1535 })
1536 .await
1537 .unwrap();
1538 let state2 = p.state().await.unwrap();
1539 assert_eq!(state2.total, 10);
1540 assert_eq!(state2.last_label, "hello");
1541 }
1542}