Skip to main content

mako_engine/
workflow.rs

1//! [`Workflow`] trait, [`EventPayload`], [`CommandPayload`], and [`CommandContext`].
2//!
3//! # Design contract
4//!
5//! Workflows are **pure state machines**:
6//!
7//! - [`Workflow::apply`] folds a domain event into the current state.
8//! - [`Workflow::handle`] validates a command against the current state and
9//!   returns the events to emit. It has no I/O, no side effects, and no
10//!   clock access. The same state + command always produce the same events.
11//!
12//! All I/O (parsing raw bytes, calling external services) must happen
13//! **before** the command is constructed and passed to the write path.
14//! This keeps workflows deterministic and trivially replayable.
15//!
16//! # Serialization boundary
17//!
18//! Domain events must implement [`serde::Serialize`] and
19//! [`serde::de::DeserializeOwned`] so the engine can persist them as JSON
20//! inside [`EventEnvelope::payload`]. The [`EventPayload`] trait adds a
21//! stable `event_type` discriminant for projection routing.
22//!
23//! # Write path
24//!
25//! The public write path is [`Process::execute`] / [`Process::execute_with`].
26//! These delegate to the crate-internal `execute_command` function. Direct
27//! use of `execute_command` is intentionally not part of the public API;
28//! use [`Process`] instead.
29//!
30//! [`Process`]: crate::process::Process
31//! [`Process::execute`]: crate::process::Process::execute
32//! [`Process::execute_with`]: crate::process::Process::execute_with
33
34use crate::{
35    deadline::Deadline,
36    envelope::{EventEnvelope, NewEvent},
37    error::{EngineError, WorkflowError},
38    event_store::{EventStore, ExpectedVersion},
39    ids::{CausationId, ConversationId, CorrelationId, ProcessId, TenantId},
40    outbox::PendingOutbox,
41    version::{WorkflowId, WorkflowVersionPolicy},
42};
43
44// ── WorkflowOutput ────────────────────────────────────────────────────────────
45
46/// The combined output of [`Workflow::handle`]: domain events and optional
47/// outbox messages to be atomically co-persisted.
48///
49/// Use [`WorkflowOutput::events`] or `From<Vec<E>>` when the command produces
50/// only events (no outbox messages). This keeps existing `handle`
51/// implementations concise: `Ok(vec![event].into())`.
52///
53/// When the command must also send an EDIFACT message, add the corresponding
54/// [`PendingOutbox`] entries to `outbox`. The engine materialises them into
55/// fully-typed [`OutboxMessage`] values with correct `causation_event_id` links
56/// inside [`Process::execute_and_enqueue`].
57///
58/// [`OutboxMessage`]: crate::outbox::OutboxMessage
59/// [`Process::execute_and_enqueue`]: crate::process::Process::execute_and_enqueue
60#[derive(Debug, Clone)]
61pub struct WorkflowOutput<E: EventPayload> {
62    /// Domain events to persist in the event stream.
63    pub events: Vec<E>,
64    /// Outbox messages to enqueue atomically alongside the events.
65    ///
66    /// Empty in the vast majority of commands. Only non-empty when the command
67    /// needs to trigger an outbound EDIFACT message (e.g. `DispatchAperak`).
68    pub outbox: Vec<PendingOutbox>,
69}
70
71impl<E: EventPayload> WorkflowOutput<E> {
72    /// Construct an output with events and no outbox messages.
73    ///
74    /// Equivalent to `events.into()`.
75    #[must_use]
76    pub fn events(events: Vec<E>) -> Self {
77        Self {
78            events,
79            outbox: Vec::new(),
80        }
81    }
82
83    /// Construct an output with both events and outbox messages.
84    #[must_use]
85    pub fn with_outbox(events: Vec<E>, outbox: Vec<PendingOutbox>) -> Self {
86        Self { events, outbox }
87    }
88}
89
90impl<E: EventPayload> From<Vec<E>> for WorkflowOutput<E> {
91    /// Convert a plain event list into a `WorkflowOutput` with no outbox.
92    ///
93    /// Allows `handle` implementations to write `Ok(vec![…].into())` without
94    /// constructing a `WorkflowOutput` explicitly.
95    fn from(events: Vec<E>) -> Self {
96        Self::events(events)
97    }
98}
99
100impl<E: EventPayload> std::ops::Deref for WorkflowOutput<E> {
101    type Target = [E];
102
103    /// Deref to the events slice so callers can use `.len()`, indexing, and
104    /// iteration on `WorkflowOutput` without destructuring.
105    fn deref(&self) -> &Self::Target {
106        &self.events
107    }
108}
109
110impl<E: EventPayload> IntoIterator for WorkflowOutput<E> {
111    type Item = E;
112    type IntoIter = std::vec::IntoIter<E>;
113
114    fn into_iter(self) -> Self::IntoIter {
115        self.events.into_iter()
116    }
117}
118
119impl<'a, E: EventPayload> IntoIterator for &'a WorkflowOutput<E> {
120    type Item = &'a E;
121    type IntoIter = std::slice::Iter<'a, E>;
122
123    fn into_iter(self) -> Self::IntoIter {
124        self.events.iter()
125    }
126}
127
128// ── EventPayload ──────────────────────────────────────────────────────────────
129
130/// Marker trait for domain event types.
131///
132/// Implementors must be JSON-serializable and carry a stable `event_type`
133/// string that the engine stores in [`EventEnvelope::event_type`] for
134/// projection routing and observability.
135///
136/// # Example
137///
138/// ```rust,ignore
139/// use mako_engine::workflow::EventPayload;
140///
141/// #[derive(serde::Serialize, serde::Deserialize)]
142/// enum MyEvent { Created { name: String }, Closed }
143///
144/// impl EventPayload for MyEvent {
145///     fn event_type(&self) -> &'static str {
146///         match self {
147///             Self::Created { .. } => "MyCreated",
148///             Self::Closed       => "MyClosed",
149///         }
150///     }
151/// }
152/// ```
153pub trait EventPayload:
154    serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static
155{
156    /// A stable, unique name for this event variant.
157    ///
158    /// Used in [`EventEnvelope::event_type`]. Choose names that survive
159    /// refactors (e.g. `"SupplierChangeInitiated"`, not `"Initiated"`).
160    fn event_type(&self) -> &'static str;
161
162    /// Schema version of this event's payload layout.
163    ///
164    /// Increment when the serialized payload structure changes in a
165    /// backward-incompatible way. The engine stamps this value into
166    /// [`EventEnvelope::schema_version`] so replay and upcasting tooling
167    /// can identify which decoder to use.
168    ///
169    /// Defaults to `1`.
170    fn schema_version(&self) -> u32 {
171        1
172    }
173}
174
175// ── CommandPayload ────────────────────────────────────────────────────────────
176
177/// Marker trait for domain command types.
178///
179/// Commands are transient — they are never persisted. Only `Send + 'static` is
180/// required.
181pub trait CommandPayload: Send + 'static {}
182
183// ── Workflow ──────────────────────────────────────────────────────────────────
184
185/// A versioned, deterministic domain workflow.
186///
187/// Workflows are the unit of business logic in the engine. Each BDEW process
188/// variant (e.g. GPKE Lieferbeginn, WiM Gerätewechsel) is a separate
189/// `Workflow` implementation in its domain crate.
190///
191/// ## State reconstruction
192///
193/// Before handling a command, the engine calls [`Workflow::apply`] on every
194/// event in the stream to reconstruct the current state. This is the only
195/// path to reading state — there is no "load current state" API.
196///
197/// ## Determinism
198///
199/// `handle` and `apply` must be deterministic and free of side effects. Do not
200/// access clocks, RNGs, network, or file system inside them.
201pub trait Workflow: Send + Sync + 'static {
202    /// Domain-specific process state, reconstructed by replaying events.
203    type State: Default + Clone + Send + Sync + 'static;
204
205    /// Domain event type emitted by this workflow.
206    type Event: EventPayload;
207
208    /// Command type handled by this workflow.
209    type Command: CommandPayload;
210
211    /// Fold a domain event into the current state.
212    ///
213    /// This function must be total (no panics, no errors) and must produce a
214    /// deterministic result.
215    fn apply(state: Self::State, event: &Self::Event) -> Self::State;
216
217    /// Validate `command` against `state` and return the events to emit.
218    ///
219    /// Return an empty [`WorkflowOutput`] (or `vec![].into()`) when the
220    /// command is a no-op (already processed).
221    ///
222    /// Outbox messages in the returned [`WorkflowOutput::outbox`] will be
223    /// atomically co-persisted with the events when the command is dispatched
224    /// via [`Process::execute_and_enqueue`]. If dispatched via
225    /// [`Process::execute`], the outbox field is silently ignored.
226    ///
227    /// # Errors
228    ///
229    /// Return a [`WorkflowError`] when the command is invalid for the current
230    /// state or when domain validation fails.
231    ///
232    /// [`Process::execute`]: crate::process::Process::execute
233    /// [`Process::execute_and_enqueue`]: crate::process::Process::execute_and_enqueue
234    fn handle(
235        state: &Self::State,
236        command: Self::Command,
237    ) -> Result<WorkflowOutput<Self::Event>, WorkflowError>;
238
239    /// Schema version for serialized `Workflow::State` payloads.
240    ///
241    /// The engine stores this value in every [`Snapshot`] taken via
242    /// [`Process::take_snapshot`]. Increment it when the serialized state
243    /// layout changes in a backward-incompatible way, and add a migration
244    /// arm to your snapshot loader.
245    ///
246    /// Defaults to `1`.
247    ///
248    /// [`Snapshot`]: crate::snapshot::Snapshot
249    /// [`Process::take_snapshot`]: crate::process::Process::take_snapshot
250    #[must_use]
251    fn state_schema_version() -> u32 {
252        1
253    }
254
255    /// Upcast a stored event payload from an older schema version.
256    ///
257    /// The engine calls this during state reconstruction for every loaded
258    /// event, *before* deserializing the payload into `Self::Event`. The
259    /// returned [`serde_json::Value`] is passed to the standard JSON
260    /// deserializer.
261    ///
262    /// Override this when you bump [`EventPayload::schema_version`] on a
263    /// variant — return a `Value` compatible with the new schema so old
264    /// events replay correctly without a data migration.
265    ///
266    /// # Example
267    ///
268    /// ```rust,ignore
269    /// fn upcast(
270    ///     event_type: &str,
271    ///     from_version: u32,
272    ///     mut payload: serde_json::Value,
273    /// ) -> Result<serde_json::Value, EngineError> {
274    ///     // v2 of SupplierChangeInitiated added a `document_type` field.
275    ///     if event_type == "SupplierChangeInitiated" && from_version == 1 {
276    ///         payload["document_type"] = serde_json::json!("E01");
277    ///     }
278    ///     Ok(payload)
279    /// }
280    /// ```
281    ///
282    /// # Errors
283    ///
284    /// Return [`EngineError::Deserialization`] when the payload cannot be
285    /// migrated to the current schema.
286    fn upcast(
287        _event_type: &str,
288        _from_version: u32,
289        payload: serde_json::Value,
290    ) -> Result<serde_json::Value, EngineError> {
291        Ok(payload)
292    }
293
294    /// Declares which BDEW format versions this workflow accepts for in-flight
295    /// processes.
296    ///
297    /// The engine uses this policy to validate that an incoming message's
298    /// format version is acceptable *before* constructing the command, surfacing
299    /// missing adapter coverage at dispatch time rather than during runtime
300    /// deserialization.
301    ///
302    /// The default returns [`WorkflowVersionPolicy::ForwardCompatible`] —
303    /// accept messages in any format version.  This is the safe default for
304    /// the majority of BDEW market-communication processes, which routinely
305    /// span annual release boundaries (e.g. a GPKE Lieferbeginn process
306    /// started in September may still receive APERAK replies in November under
307    /// the new October FV).
308    ///
309    /// Override to `Pinned` only for strictly short-lived workflows that are
310    /// guaranteed to complete within a single BDEW release cycle.
311    ///
312    /// # Example
313    ///
314    /// ```rust,ignore
315    /// use mako_engine::version::WorkflowVersionPolicy;
316    ///
317    /// // Override to Pinned for a workflow with a 24h wall-clock SLA:
318    /// fn version_policy() -> WorkflowVersionPolicy {
319    ///     WorkflowVersionPolicy::Pinned
320    /// }
321    /// ```
322    #[must_use]
323    fn version_policy() -> WorkflowVersionPolicy {
324        WorkflowVersionPolicy::ForwardCompatible
325    }
326
327    /// Map a fired deadline to a compensating command.
328    ///
329    /// Called by [`Process::execute_timeout`] when a registered deadline
330    /// for this workflow's process becomes overdue. Return `Some(command)`
331    /// to trigger a compensating action; return `None` to acknowledge the
332    /// deadline as a no-op.
333    ///
334    /// This method must be **pure**: no I/O, no clock access, no global state.
335    /// The same `(deadline, state)` must always produce the same `Option<Command>`.
336    ///
337    /// The full [`Deadline`] is provided (not just the label) so implementations
338    /// can construct commands that require `deadline_id` (e.g. `TimeoutExpired`).
339    ///
340    /// # Why a dedicated hook instead of a normal command?
341    ///
342    /// A synthetic `TimeoutFired` command variant works but couples the workflow
343    /// enum to infrastructure concerns. `on_deadline` keeps the domain command
344    /// type clean and makes compensation logic explicit and testable in isolation:
345    ///
346    /// ```rust,ignore
347    /// fn on_deadline(
348    ///     deadline: &Deadline,
349    ///     state: &Self::State,
350    /// ) -> Option<Self::Command> {
351    ///     match (deadline.label(), state) {
352    ///         ("aperak-window", SupplierChangeState::Initiated(_) | SupplierChangeState::ValidationPassed(_)) => {
353    ///             Some(SupplierChangeCommand::TimeoutExpired {
354    ///                 deadline_id: deadline.deadline_id(),
355    ///                 label: deadline.label().into(),
356    ///             })
357    ///         }
358    ///         _ => None,
359    ///     }
360    /// }
361    /// ```
362    ///
363    /// # Default
364    ///
365    /// Returns `None` for all deadlines — no automatic compensation. Override in
366    /// any workflow that has deadline-triggered compensation requirements.
367    ///
368    /// [`Process::execute_timeout`]: crate::process::Process::execute_timeout
369    fn on_deadline(_deadline: &Deadline, _state: &Self::State) -> Option<Self::Command> {
370        None
371    }
372}
373
374// ── CommandContext ─────────────────────────────────────────────────────────────
375
376/// Contextual metadata attached to every command dispatch.
377///
378/// The engine stamps this information onto every event produced by the command.
379/// Callers provide the process identity; the engine generates correlation IDs
380/// automatically unless provided explicitly.
381#[derive(Debug, Clone)]
382pub struct CommandContext {
383    /// See [`CorrelationId`].
384    pub correlation_id: CorrelationId,
385    /// See [`ConversationId`].
386    pub conversation_id: ConversationId,
387    /// The MaKo process instance this command targets.
388    pub process_id: ProcessId,
389    /// The tenant that issued this command.
390    pub tenant_id: TenantId,
391    /// The workflow version to use for processing.
392    pub workflow_id: WorkflowId,
393    /// The immediate cause of this command, if driven by a prior event.
394    pub causation_id: Option<CausationId>,
395}
396
397impl CommandContext {
398    /// Construct a context with auto-generated correlation and conversation IDs.
399    #[must_use]
400    pub fn new(tenant_id: TenantId, process_id: ProcessId, workflow_id: WorkflowId) -> Self {
401        Self {
402            correlation_id: CorrelationId::new(),
403            conversation_id: ConversationId::new(),
404            process_id,
405            tenant_id,
406            workflow_id,
407            causation_id: None,
408        }
409    }
410
411    /// Set an explicit causation ID (e.g. the ID of the event that triggered
412    /// this command).
413    #[must_use]
414    pub fn with_causation(mut self, id: CausationId) -> Self {
415        self.causation_id = Some(id);
416        self
417    }
418
419    /// Override the auto-generated correlation ID.
420    ///
421    /// Use this to propagate a correlation ID from an inbound EDIFACT message
422    /// so all resulting events share the same root correlation.
423    #[must_use]
424    pub fn with_correlation(mut self, id: CorrelationId) -> Self {
425        self.correlation_id = id;
426        self
427    }
428
429    /// Override the auto-generated conversation ID.
430    ///
431    /// Use this to link the outbound APERAK to the same conversation as the
432    /// UTILMD that triggered it, so the full message exchange is traceable as
433    /// a unit.
434    #[must_use]
435    pub fn with_conversation(mut self, id: ConversationId) -> Self {
436        self.conversation_id = id;
437        self
438    }
439
440    /// Build a context that is causally linked to a prior persisted event.
441    ///
442    /// Propagates `correlation_id`, `conversation_id`, `process_id`, and
443    /// `tenant_id` from the envelope and sets the envelope's `event_id` as
444    /// the `causation_id`. This is the canonical constructor for all commands
445    /// that are triggered by a prior event (e.g. dispatching an APERAK in
446    /// response to a received UTILMD).
447    ///
448    /// # Example
449    ///
450    /// ```rust,ignore
451    /// let ctx = CommandContext::from_envelope(&utilmd_envelope, workflow_id);
452    /// process.execute_with(DispatchAperak { positive: true, reason: None }, ctx).await?;
453    /// ```
454    #[must_use]
455    pub fn from_envelope(env: &EventEnvelope, workflow_id: WorkflowId) -> Self {
456        Self {
457            correlation_id: env.correlation_id,
458            conversation_id: env.conversation_id,
459            process_id: env.process_id,
460            tenant_id: env.tenant_id,
461            workflow_id,
462            causation_id: Some(env.event_id.into()),
463        }
464    }
465
466    /// Build a context for a deadline-triggered command.
467    ///
468    /// Propagates `process_id` and `tenant_id` from the deadline. Generates
469    /// fresh `correlation_id` and `conversation_id` (deadline firings start
470    /// a new tracing root).
471    ///
472    /// # Example
473    ///
474    /// ```rust,ignore
475    /// let ctx = CommandContext::from_deadline(&overdue_deadline, workflow_id);
476    /// process.execute_with(HandleTimeout { label: overdue_deadline.label().into() }, ctx).await?;
477    /// ```
478    #[must_use]
479    pub fn from_deadline(deadline: &crate::deadline::Deadline, workflow_id: WorkflowId) -> Self {
480        Self::new(deadline.tenant_id(), deadline.process_id(), workflow_id)
481    }
482
483    /// Build a [`NewEvent`] from this context and a domain event payload.
484    ///
485    /// This is the canonical way to construct a `NewEvent` inside a transport
486    /// adapter or test helper — it eliminates the nine-argument [`NewEvent::new`]
487    /// call and ensures that correlation metadata is always propagated correctly.
488    ///
489    /// # Errors
490    ///
491    /// Returns [`EngineError::Serialization`] when the event payload cannot be
492    /// serialized to JSON.
493    ///
494    /// # Example
495    ///
496    /// ```rust,ignore
497    /// // Inside a MessageAdapter or test:
498    /// let new_event = ctx.new_event(&SupplierChangeEvent::Activated)?;
499    /// store.append(&stream_id, ExpectedVersion::Any, &[new_event]).await?;
500    /// ```
501    pub fn new_event<E: EventPayload>(&self, event: &E) -> Result<NewEvent, EngineError> {
502        let payload =
503            serde_json::to_value(event).map_err(|e| EngineError::Serialization(e.to_string()))?;
504        Ok(NewEvent {
505            correlation_id: self.correlation_id,
506            causation_id: self.causation_id,
507            conversation_id: self.conversation_id,
508            process_id: self.process_id,
509            tenant_id: self.tenant_id,
510            workflow_id: self.workflow_id.clone(),
511            event_type: event.event_type().into(),
512            schema_version: event.schema_version(),
513            payload,
514        })
515    }
516}
517
518// ── EventEnvelope convenience ──────────────────────────────────────────────────
519
520impl EventEnvelope {
521    /// Build a [`NewEvent`] causally linked to this envelope.
522    ///
523    /// Propagates `correlation_id`, `conversation_id`, `process_id`, and
524    /// `tenant_id` from the envelope and sets `envelope.event_id` as the
525    /// `causation_id` of the new event. Useful when generating a follow-up
526    /// event (e.g. an APERAK trigger event) that must be traceable back to
527    /// the UTILMD envelope that caused it.
528    ///
529    /// # Errors
530    ///
531    /// Returns [`EngineError::Serialization`] when the event payload cannot be
532    /// serialized to JSON.
533    ///
534    /// # Example
535    ///
536    /// ```rust,ignore
537    /// // After persisting a UTILMD receive event, trigger an APERAK:
538    /// let aperak_new = utilmd_envelope.new_caused_event(
539    ///     workflow_id,
540    ///     &SupplierChangeEvent::AperakDispatched { positive: true, reason: None },
541    /// )?;
542    /// ```
543    pub fn new_caused_event<E: EventPayload>(
544        &self,
545        workflow_id: WorkflowId,
546        event: &E,
547    ) -> Result<NewEvent, EngineError> {
548        let payload =
549            serde_json::to_value(event).map_err(|e| EngineError::Serialization(e.to_string()))?;
550        Ok(NewEvent {
551            correlation_id: self.correlation_id,
552            causation_id: Some(self.event_id.into()),
553            conversation_id: self.conversation_id,
554            process_id: self.process_id,
555            tenant_id: self.tenant_id,
556            workflow_id,
557            event_type: event.event_type().into(),
558            schema_version: event.schema_version(),
559            payload,
560        })
561    }
562}
563
564// ── execute_command ───────────────────────────────────────────────────────────
565
566/// Dispatch a command through a workflow and persist the resulting events.
567///
568/// This is the crate-internal write-path entry point. The public API is
569/// [`Process::execute`] / [`Process::execute_with`].
570///
571/// It performs, in order:
572///
573/// 1. **Load** all events from `stream_id` via `store`.
574/// 2. **Reconstruct state** by folding events through [`Workflow::apply`].
575/// 3. **Handle** the command via [`Workflow::handle`] (pure, no I/O).
576/// 4. **Build** [`NewEvent`] values from each domain event + `ctx`.
577/// 5. **Append** atomically with optimistic concurrency
578///    (`ExpectedVersion::Exact(current_sequence)`).
579///
580/// Returns the persisted envelopes (with store-assigned IDs and sequence
581/// numbers). Returns an empty `Vec` when the workflow produced no events.
582///
583/// # Errors
584///
585/// - [`EngineError::VersionConflict`] when a concurrent writer raced ahead.
586/// - [`EngineError::Workflow`] when the workflow rejects the command.
587/// - [`EngineError::Deserialization`] when a stored event cannot be decoded.
588///
589/// [`Process::execute`]: crate::process::Process::execute
590/// [`Process::execute_with`]: crate::process::Process::execute_with
591pub(crate) async fn execute_command<W, S>(
592    store: &S,
593    stream_id: &crate::ids::StreamId,
594    command: W::Command,
595    ctx: &CommandContext,
596) -> Result<Vec<EventEnvelope>, EngineError>
597where
598    W: Workflow,
599    S: EventStore,
600{
601    execute_command_and_collect::<W, S>(store, stream_id, command, ctx)
602        .await
603        .map(|(envelopes, _outbox)| envelopes)
604}
605
606/// Like [`execute_command`] but also returns the [`PendingOutbox`] entries
607/// produced by [`Workflow::handle`].
608///
609/// Use this when the caller needs to inspect or render the outbox messages
610/// produced by the command — for example, in E2E tests that render EDIFACT
611/// wire bytes from the workflow's outbox.  Avoids calling `handle()` a second
612/// time just to recover the outbox that `execute_command` silently discards.
613///
614/// [`PendingOutbox`]: crate::outbox::PendingOutbox
615pub(crate) async fn execute_command_and_collect<W, S>(
616    store: &S,
617    stream_id: &crate::ids::StreamId,
618    command: W::Command,
619    ctx: &CommandContext,
620) -> Result<(Vec<EventEnvelope>, Vec<PendingOutbox>), EngineError>
621where
622    W: Workflow,
623    S: EventStore,
624{
625    // ── 1 + 2. Stream-fold: reconstruct state without materialising a Vec ─────
626    //
627    // `fold_stream` feeds `EventEnvelope` values one-at-a-time; the engine
628    // never holds more than one envelope in memory during replay.  Because
629    // the envelope is owned, `env.payload` is moved into `W::upcast` without
630    // a clone — no extra heap allocation per event.
631    let (state, current_sequence) = store
632        .fold_stream(
633            stream_id,
634            0,
635            (W::State::default(), 0u64),
636            |(acc, _), env| {
637                let seq = env.sequence_number;
638                // env.payload is moved here — no clone required.
639                let payload = W::upcast(&env.event_type, env.schema_version, env.payload)?;
640                let event: W::Event = serde_json::from_value(payload)
641                    .map_err(|e| EngineError::Deserialization(e.to_string()))?;
642                Ok((W::apply(acc, &event), seq))
643            },
644        )
645        .await?;
646
647    // ── 3. Handle command (pure) ──────────────────────────────────────────────
648    let output = W::handle(&state, command)?;
649
650    if output.events.is_empty() {
651        return Ok((Vec::new(), output.outbox));
652    }
653
654    // ── 4. Build NewEvent values (caller-known metadata) ──────────────────────
655    let new_events: Result<Vec<NewEvent>, EngineError> = output
656        .events
657        .iter()
658        .map(|event| ctx.new_event(event))
659        .collect();
660    let new_events = new_events?;
661
662    // ── 5. Persist with optimistic concurrency ────────────────────────────────
663    // The store assigns event_id, sequence_number, stream_id, and timestamp.
664    // Outbox messages (output.outbox) are intentionally ignored here — use
665    // execute_command_atomic when atomic dual-writes are required.
666    let result = store
667        .append(
668            stream_id,
669            ExpectedVersion::Exact(current_sequence),
670            &new_events,
671        )
672        .await?;
673
674    Ok((result.events, output.outbox))
675}
676
677/// Like [`execute_command`] but atomically co-persists any [`PendingOutbox`]
678/// messages produced by [`Workflow::handle`].
679///
680/// Requires `S: AtomicAppend`.  All internal logic is identical to
681/// `execute_command`; the only difference is the persistence call at the end.
682pub(crate) async fn execute_command_atomic<W, S>(
683    store: &S,
684    stream_id: &crate::ids::StreamId,
685    command: W::Command,
686    ctx: &CommandContext,
687) -> Result<Vec<EventEnvelope>, EngineError>
688where
689    W: Workflow,
690    S: crate::event_store::AtomicAppend,
691{
692    // ── 1 + 2. Stream-fold: reconstruct state without materialising a Vec ─────
693    let (state, current_sequence) = store
694        .fold_stream(
695            stream_id,
696            0,
697            (W::State::default(), 0u64),
698            |(acc, _), env| {
699                let seq = env.sequence_number;
700                let payload = W::upcast(&env.event_type, env.schema_version, env.payload)?;
701                let event: W::Event = serde_json::from_value(payload)
702                    .map_err(|e| EngineError::Deserialization(e.to_string()))?;
703                Ok((W::apply(acc, &event), seq))
704            },
705        )
706        .await?;
707
708    // ── 3. Handle command (pure) ──────────────────────────────────────────────
709    let output = W::handle(&state, command)?;
710
711    if output.events.is_empty() {
712        return Ok(Vec::new());
713    }
714
715    // ── 4. Build NewEvent values ──────────────────────────────────────────────
716    let new_events: Result<Vec<NewEvent>, EngineError> = output
717        .events
718        .iter()
719        .map(|event| ctx.new_event(event))
720        .collect();
721    let new_events = new_events?;
722
723    // ── 5. Persist events + outbox atomically ─────────────────────────────────
724    let result = store
725        .append_with_outbox(
726            stream_id,
727            ExpectedVersion::Exact(current_sequence),
728            &new_events,
729            &output.outbox,
730        )
731        .await?;
732
733    Ok(result.events)
734}
735
736/// Reconstruct `(W::State, current_sequence)` using an optional snapshot as a
737/// starting point.
738///
739/// When a snapshot with matching schema version exists, replay starts from
740/// `snap.sequence_number` (O(k) tail scan). Otherwise falls back to full replay.
741async fn reconstruct_with_snapshot<W, S, Snap>(
742    store: &S,
743    snap_store: &Snap,
744    stream_id: &crate::ids::StreamId,
745) -> Result<(W::State, u64), EngineError>
746where
747    W: Workflow,
748    W::State: serde::de::DeserializeOwned,
749    S: EventStore,
750    Snap: crate::snapshot::SnapshotStore,
751{
752    let maybe_snap = snap_store.load(stream_id).await?;
753    let (initial_state, from_sequence) = match &maybe_snap {
754        Some(snap) if snap.state_schema_version == W::state_schema_version() => {
755            let state = serde_json::from_value::<W::State>(snap.state.clone())
756                .map_err(|e| EngineError::Deserialization(e.to_string()))?;
757            (state, snap.sequence_number)
758        }
759        #[allow(unused_variables)]
760        Some(snap) => {
761            #[cfg(feature = "tracing")]
762            tracing::warn!(
763                expected = W::state_schema_version(),
764                actual   = snap.state_schema_version,
765                stream_id = %stream_id,
766                "snapshot schema version mismatch; falling back to full replay"
767            );
768            (W::State::default(), 0)
769        }
770        None => (W::State::default(), 0),
771    };
772    store
773        .fold_stream(
774            stream_id,
775            from_sequence,
776            (initial_state, from_sequence),
777            |(acc, _), env| {
778                let seq = env.sequence_number;
779                let payload = W::upcast(&env.event_type, env.schema_version, env.payload)?;
780                let event: W::Event = serde_json::from_value(payload)
781                    .map_err(|e| EngineError::Deserialization(e.to_string()))?;
782                Ok((W::apply(acc, &event), seq))
783            },
784        )
785        .await
786}
787
788/// Like [`execute_command`] but uses a snapshot store to skip full replay.
789///
790/// When a valid snapshot exists, only tail events since the snapshot are
791/// replayed — O(k) instead of O(n). Falls back to full replay when no snapshot
792/// exists or the schema version has changed.
793pub(crate) async fn execute_command_with_snapshot<W, S, Snap>(
794    store: &S,
795    snap_store: &Snap,
796    stream_id: &crate::ids::StreamId,
797    command: W::Command,
798    ctx: &CommandContext,
799) -> Result<Vec<EventEnvelope>, EngineError>
800where
801    W: Workflow,
802    W::State: serde::de::DeserializeOwned,
803    S: EventStore,
804    Snap: crate::snapshot::SnapshotStore,
805{
806    let (state, current_sequence) =
807        reconstruct_with_snapshot::<W, S, Snap>(store, snap_store, stream_id).await?;
808
809    let output = W::handle(&state, command)?;
810    if output.events.is_empty() {
811        return Ok(Vec::new());
812    }
813    let new_events: Result<Vec<NewEvent>, EngineError> = output
814        .events
815        .iter()
816        .map(|event| ctx.new_event(event))
817        .collect();
818    let new_events = new_events?;
819    let result = store
820        .append(
821            stream_id,
822            ExpectedVersion::Exact(current_sequence),
823            &new_events,
824        )
825        .await?;
826    Ok(result.events)
827}
828
829/// Like [`execute_command_atomic`] but uses a snapshot store to skip full replay.
830///
831/// Atomically co-persists outbox messages alongside events while using a
832/// snapshot as the starting point for state reconstruction.
833pub(crate) async fn execute_command_atomic_with_snapshot<W, S, Snap>(
834    store: &S,
835    snap_store: &Snap,
836    stream_id: &crate::ids::StreamId,
837    command: W::Command,
838    ctx: &CommandContext,
839) -> Result<Vec<EventEnvelope>, EngineError>
840where
841    W: Workflow,
842    W::State: serde::de::DeserializeOwned,
843    S: crate::event_store::AtomicAppend,
844    Snap: crate::snapshot::SnapshotStore,
845{
846    let (state, current_sequence) =
847        reconstruct_with_snapshot::<W, S, Snap>(store, snap_store, stream_id).await?;
848
849    let output = W::handle(&state, command)?;
850    if output.events.is_empty() {
851        return Ok(Vec::new());
852    }
853    let new_events: Result<Vec<NewEvent>, EngineError> = output
854        .events
855        .iter()
856        .map(|event| ctx.new_event(event))
857        .collect();
858    let new_events = new_events?;
859    let result = store
860        .append_with_outbox(
861            stream_id,
862            ExpectedVersion::Exact(current_sequence),
863            &new_events,
864            &output.outbox,
865        )
866        .await?;
867    Ok(result.events)
868}