mako_engine/workflow.rs
1//! [`Workflow`] trait, [`EventPayload`], [`CommandPayload`], and [`CommandContext`].
2//!
3//! # Design contract
4//!
5//! Workflows are **pure state machines**:
6//!
7//! - [`Workflow::apply`] folds a domain event into the current state.
8//! - [`Workflow::handle`] validates a command against the current state and
9//! returns the events to emit. It has no I/O, no side effects, and no
10//! clock access. The same state + command always produce the same events.
11//!
12//! All I/O (parsing raw bytes, calling external services) must happen
13//! **before** the command is constructed and passed to the write path.
14//! This keeps workflows deterministic and trivially replayable.
15//!
16//! # Serialization boundary
17//!
18//! Domain events must implement [`serde::Serialize`] and
19//! [`serde::de::DeserializeOwned`] so the engine can persist them as JSON
20//! inside [`EventEnvelope::payload`]. The [`EventPayload`] trait adds a
21//! stable `event_type` discriminant for projection routing.
22//!
23//! # Write path
24//!
25//! The public write path is [`Process::execute`] / [`Process::execute_with`].
26//! These delegate to the crate-internal `execute_command` function. Direct
27//! use of `execute_command` is intentionally not part of the public API;
28//! use [`Process`] instead.
29//!
30//! [`Process`]: crate::process::Process
31//! [`Process::execute`]: crate::process::Process::execute
32//! [`Process::execute_with`]: crate::process::Process::execute_with
33
34use crate::{
35 deadline::Deadline,
36 envelope::{EventEnvelope, NewEvent},
37 error::{EngineError, WorkflowError},
38 event_store::{EventStore, ExpectedVersion},
39 ids::{CausationId, ConversationId, CorrelationId, ProcessId, TenantId},
40 outbox::PendingOutbox,
41 version::{WorkflowId, WorkflowVersionPolicy},
42};
43
44// ── WorkflowOutput ────────────────────────────────────────────────────────────
45
46/// The combined output of [`Workflow::handle`]: domain events and optional
47/// outbox messages to be atomically co-persisted.
48///
49/// Use [`WorkflowOutput::events`] or `From<Vec<E>>` when the command produces
50/// only events (no outbox messages). This keeps existing `handle`
51/// implementations concise: `Ok(vec![event].into())`.
52///
53/// When the command must also send an EDIFACT message, add the corresponding
54/// [`PendingOutbox`] entries to `outbox`. The engine materialises them into
55/// fully-typed [`OutboxMessage`] values with correct `causation_event_id` links
56/// inside [`Process::execute_and_enqueue`].
57///
58/// [`OutboxMessage`]: crate::outbox::OutboxMessage
59/// [`Process::execute_and_enqueue`]: crate::process::Process::execute_and_enqueue
60#[derive(Debug, Clone)]
61pub struct WorkflowOutput<E: EventPayload> {
62 /// Domain events to persist in the event stream.
63 pub events: Vec<E>,
64 /// Outbox messages to enqueue atomically alongside the events.
65 ///
66 /// Empty in the vast majority of commands. Only non-empty when the command
67 /// needs to trigger an outbound EDIFACT message (e.g. `DispatchAperak`).
68 pub outbox: Vec<PendingOutbox>,
69}
70
71impl<E: EventPayload> WorkflowOutput<E> {
72 /// Construct an output with events and no outbox messages.
73 ///
74 /// Equivalent to `events.into()`.
75 #[must_use]
76 pub fn events(events: Vec<E>) -> Self {
77 Self {
78 events,
79 outbox: Vec::new(),
80 }
81 }
82
83 /// Construct an output with both events and outbox messages.
84 #[must_use]
85 pub fn with_outbox(events: Vec<E>, outbox: Vec<PendingOutbox>) -> Self {
86 Self { events, outbox }
87 }
88}
89
90impl<E: EventPayload> From<Vec<E>> for WorkflowOutput<E> {
91 /// Convert a plain event list into a `WorkflowOutput` with no outbox.
92 ///
93 /// Allows `handle` implementations to write `Ok(vec![…].into())` without
94 /// constructing a `WorkflowOutput` explicitly.
95 fn from(events: Vec<E>) -> Self {
96 Self::events(events)
97 }
98}
99
100impl<E: EventPayload> std::ops::Deref for WorkflowOutput<E> {
101 type Target = [E];
102
103 /// Deref to the events slice so callers can use `.len()`, indexing, and
104 /// iteration on `WorkflowOutput` without destructuring.
105 fn deref(&self) -> &Self::Target {
106 &self.events
107 }
108}
109
110impl<E: EventPayload> IntoIterator for WorkflowOutput<E> {
111 type Item = E;
112 type IntoIter = std::vec::IntoIter<E>;
113
114 fn into_iter(self) -> Self::IntoIter {
115 self.events.into_iter()
116 }
117}
118
119impl<'a, E: EventPayload> IntoIterator for &'a WorkflowOutput<E> {
120 type Item = &'a E;
121 type IntoIter = std::slice::Iter<'a, E>;
122
123 fn into_iter(self) -> Self::IntoIter {
124 self.events.iter()
125 }
126}
127
128// ── EventPayload ──────────────────────────────────────────────────────────────
129
130/// Marker trait for domain event types.
131///
132/// Implementors must be JSON-serializable and carry a stable `event_type`
133/// string that the engine stores in [`EventEnvelope::event_type`] for
134/// projection routing and observability.
135///
136/// # Example
137///
138/// ```rust,ignore
139/// use mako_engine::workflow::EventPayload;
140///
141/// #[derive(serde::Serialize, serde::Deserialize)]
142/// enum MyEvent { Created { name: String }, Closed }
143///
144/// impl EventPayload for MyEvent {
145/// fn event_type(&self) -> &'static str {
146/// match self {
147/// Self::Created { .. } => "MyCreated",
148/// Self::Closed => "MyClosed",
149/// }
150/// }
151/// }
152/// ```
153pub trait EventPayload:
154 serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static
155{
156 /// A stable, unique name for this event variant.
157 ///
158 /// Used in [`EventEnvelope::event_type`]. Choose names that survive
159 /// refactors (e.g. `"SupplierChangeInitiated"`, not `"Initiated"`).
160 fn event_type(&self) -> &'static str;
161
162 /// Schema version of this event's payload layout.
163 ///
164 /// Increment when the serialized payload structure changes in a
165 /// backward-incompatible way. The engine stamps this value into
166 /// [`EventEnvelope::schema_version`] so replay and upcasting tooling
167 /// can identify which decoder to use.
168 ///
169 /// Defaults to `1`.
170 fn schema_version(&self) -> u32 {
171 1
172 }
173}
174
175// ── CommandPayload ────────────────────────────────────────────────────────────
176
177/// Marker trait for domain command types.
178///
179/// Commands are transient — they are never persisted. Only `Send + 'static` is
180/// required.
181pub trait CommandPayload: Send + 'static {}
182
183// ── Workflow ──────────────────────────────────────────────────────────────────
184
185/// A versioned, deterministic domain workflow.
186///
187/// Workflows are the unit of business logic in the engine. Each BDEW process
188/// variant (e.g. GPKE Lieferbeginn, WiM Gerätewechsel) is a separate
189/// `Workflow` implementation in its domain crate.
190///
191/// ## State reconstruction
192///
193/// Before handling a command, the engine calls [`Workflow::apply`] on every
194/// event in the stream to reconstruct the current state. This is the only
195/// path to reading state — there is no "load current state" API.
196///
197/// ## Determinism
198///
199/// `handle` and `apply` must be deterministic and free of side effects. Do not
200/// access clocks, RNGs, network, or file system inside them.
201pub trait Workflow: Send + Sync + 'static {
202 /// Domain-specific process state, reconstructed by replaying events.
203 type State: Default + Clone + Send + Sync + 'static;
204
205 /// Domain event type emitted by this workflow.
206 type Event: EventPayload;
207
208 /// Command type handled by this workflow.
209 type Command: CommandPayload;
210
211 /// Fold a domain event into the current state.
212 ///
213 /// This function must be total (no panics, no errors) and must produce a
214 /// deterministic result.
215 fn apply(state: Self::State, event: &Self::Event) -> Self::State;
216
217 /// Validate `command` against `state` and return the events to emit.
218 ///
219 /// Return an empty [`WorkflowOutput`] (or `vec![].into()`) when the
220 /// command is a no-op (already processed).
221 ///
222 /// Outbox messages in the returned [`WorkflowOutput::outbox`] will be
223 /// atomically co-persisted with the events when the command is dispatched
224 /// via [`Process::execute_and_enqueue`]. If dispatched via
225 /// [`Process::execute`], the outbox field is silently ignored.
226 ///
227 /// # Errors
228 ///
229 /// Return a [`WorkflowError`] when the command is invalid for the current
230 /// state or when domain validation fails.
231 ///
232 /// [`Process::execute`]: crate::process::Process::execute
233 /// [`Process::execute_and_enqueue`]: crate::process::Process::execute_and_enqueue
234 fn handle(
235 state: &Self::State,
236 command: Self::Command,
237 ) -> Result<WorkflowOutput<Self::Event>, WorkflowError>;
238
239 /// Schema version for serialized `Workflow::State` payloads.
240 ///
241 /// The engine stores this value in every [`Snapshot`] taken via
242 /// [`Process::take_snapshot`]. Increment it when the serialized state
243 /// layout changes in a backward-incompatible way, and add a migration
244 /// arm to your snapshot loader.
245 ///
246 /// Defaults to `1`.
247 ///
248 /// [`Snapshot`]: crate::snapshot::Snapshot
249 /// [`Process::take_snapshot`]: crate::process::Process::take_snapshot
250 #[must_use]
251 fn state_schema_version() -> u32 {
252 1
253 }
254
255 /// Upcast a stored event payload from an older schema version.
256 ///
257 /// The engine calls this during state reconstruction for every loaded
258 /// event, *before* deserializing the payload into `Self::Event`. The
259 /// returned [`serde_json::Value`] is passed to the standard JSON
260 /// deserializer.
261 ///
262 /// Override this when you bump [`EventPayload::schema_version`] on a
263 /// variant — return a `Value` compatible with the new schema so old
264 /// events replay correctly without a data migration.
265 ///
266 /// # Example
267 ///
268 /// ```rust,ignore
269 /// fn upcast(
270 /// event_type: &str,
271 /// from_version: u32,
272 /// mut payload: serde_json::Value,
273 /// ) -> Result<serde_json::Value, EngineError> {
274 /// // v2 of SupplierChangeInitiated added a `document_type` field.
275 /// if event_type == "SupplierChangeInitiated" && from_version == 1 {
276 /// payload["document_type"] = serde_json::json!("E01");
277 /// }
278 /// Ok(payload)
279 /// }
280 /// ```
281 ///
282 /// # Errors
283 ///
284 /// Return [`EngineError::Deserialization`] when the payload cannot be
285 /// migrated to the current schema.
286 fn upcast(
287 _event_type: &str,
288 _from_version: u32,
289 payload: serde_json::Value,
290 ) -> Result<serde_json::Value, EngineError> {
291 Ok(payload)
292 }
293
294 /// Declares which BDEW format versions this workflow accepts for in-flight
295 /// processes.
296 ///
297 /// The engine uses this policy to validate that an incoming message's
298 /// format version is acceptable *before* constructing the command, surfacing
299 /// missing adapter coverage at dispatch time rather than during runtime
300 /// deserialization.
301 ///
302 /// The default returns [`WorkflowVersionPolicy::ForwardCompatible`] —
303 /// accept messages in any format version. This is the safe default for
304 /// the majority of BDEW market-communication processes, which routinely
305 /// span annual release boundaries (e.g. a GPKE Lieferbeginn process
306 /// started in September may still receive APERAK replies in November under
307 /// the new October FV).
308 ///
309 /// Override to `Pinned` only for strictly short-lived workflows that are
310 /// guaranteed to complete within a single BDEW release cycle.
311 ///
312 /// # Example
313 ///
314 /// ```rust,ignore
315 /// use mako_engine::version::WorkflowVersionPolicy;
316 ///
317 /// // Override to Pinned for a workflow with a 24h wall-clock SLA:
318 /// fn version_policy() -> WorkflowVersionPolicy {
319 /// WorkflowVersionPolicy::Pinned
320 /// }
321 /// ```
322 #[must_use]
323 fn version_policy() -> WorkflowVersionPolicy {
324 WorkflowVersionPolicy::ForwardCompatible
325 }
326
327 /// Map a fired deadline to a compensating command.
328 ///
329 /// Called by [`Process::execute_timeout`] when a registered deadline
330 /// for this workflow's process becomes overdue. Return `Some(command)`
331 /// to trigger a compensating action; return `None` to acknowledge the
332 /// deadline as a no-op.
333 ///
334 /// This method must be **pure**: no I/O, no clock access, no global state.
335 /// The same `(deadline, state)` must always produce the same `Option<Command>`.
336 ///
337 /// The full [`Deadline`] is provided (not just the label) so implementations
338 /// can construct commands that require `deadline_id` (e.g. `TimeoutExpired`).
339 ///
340 /// # Why a dedicated hook instead of a normal command?
341 ///
342 /// A synthetic `TimeoutFired` command variant works but couples the workflow
343 /// enum to infrastructure concerns. `on_deadline` keeps the domain command
344 /// type clean and makes compensation logic explicit and testable in isolation:
345 ///
346 /// ```rust,ignore
347 /// fn on_deadline(
348 /// deadline: &Deadline,
349 /// state: &Self::State,
350 /// ) -> Option<Self::Command> {
351 /// match (deadline.label(), state) {
352 /// ("aperak-window", SupplierChangeState::Initiated(_) | SupplierChangeState::ValidationPassed(_)) => {
353 /// Some(SupplierChangeCommand::TimeoutExpired {
354 /// deadline_id: deadline.deadline_id(),
355 /// label: deadline.label().into(),
356 /// })
357 /// }
358 /// _ => None,
359 /// }
360 /// }
361 /// ```
362 ///
363 /// # Default
364 ///
365 /// Returns `None` for all deadlines — no automatic compensation. Override in
366 /// any workflow that has deadline-triggered compensation requirements.
367 ///
368 /// [`Process::execute_timeout`]: crate::process::Process::execute_timeout
369 fn on_deadline(_deadline: &Deadline, _state: &Self::State) -> Option<Self::Command> {
370 None
371 }
372}
373
374// ── CommandContext ─────────────────────────────────────────────────────────────
375
376/// Contextual metadata attached to every command dispatch.
377///
378/// The engine stamps this information onto every event produced by the command.
379/// Callers provide the process identity; the engine generates correlation IDs
380/// automatically unless provided explicitly.
381#[derive(Debug, Clone)]
382pub struct CommandContext {
383 /// See [`CorrelationId`].
384 pub correlation_id: CorrelationId,
385 /// See [`ConversationId`].
386 pub conversation_id: ConversationId,
387 /// The MaKo process instance this command targets.
388 pub process_id: ProcessId,
389 /// The tenant that issued this command.
390 pub tenant_id: TenantId,
391 /// The workflow version to use for processing.
392 pub workflow_id: WorkflowId,
393 /// The immediate cause of this command, if driven by a prior event.
394 pub causation_id: Option<CausationId>,
395}
396
397impl CommandContext {
398 /// Construct a context with auto-generated correlation and conversation IDs.
399 #[must_use]
400 pub fn new(tenant_id: TenantId, process_id: ProcessId, workflow_id: WorkflowId) -> Self {
401 Self {
402 correlation_id: CorrelationId::new(),
403 conversation_id: ConversationId::new(),
404 process_id,
405 tenant_id,
406 workflow_id,
407 causation_id: None,
408 }
409 }
410
411 /// Set an explicit causation ID (e.g. the ID of the event that triggered
412 /// this command).
413 #[must_use]
414 pub fn with_causation(mut self, id: CausationId) -> Self {
415 self.causation_id = Some(id);
416 self
417 }
418
419 /// Override the auto-generated correlation ID.
420 ///
421 /// Use this to propagate a correlation ID from an inbound EDIFACT message
422 /// so all resulting events share the same root correlation.
423 #[must_use]
424 pub fn with_correlation(mut self, id: CorrelationId) -> Self {
425 self.correlation_id = id;
426 self
427 }
428
429 /// Override the auto-generated conversation ID.
430 ///
431 /// Use this to link the outbound APERAK to the same conversation as the
432 /// UTILMD that triggered it, so the full message exchange is traceable as
433 /// a unit.
434 #[must_use]
435 pub fn with_conversation(mut self, id: ConversationId) -> Self {
436 self.conversation_id = id;
437 self
438 }
439
440 /// Build a context that is causally linked to a prior persisted event.
441 ///
442 /// Propagates `correlation_id`, `conversation_id`, `process_id`, and
443 /// `tenant_id` from the envelope and sets the envelope's `event_id` as
444 /// the `causation_id`. This is the canonical constructor for all commands
445 /// that are triggered by a prior event (e.g. dispatching an APERAK in
446 /// response to a received UTILMD).
447 ///
448 /// # Example
449 ///
450 /// ```rust,ignore
451 /// let ctx = CommandContext::from_envelope(&utilmd_envelope, workflow_id);
452 /// process.execute_with(DispatchAperak { positive: true, reason: None }, ctx).await?;
453 /// ```
454 #[must_use]
455 pub fn from_envelope(env: &EventEnvelope, workflow_id: WorkflowId) -> Self {
456 Self {
457 correlation_id: env.correlation_id,
458 conversation_id: env.conversation_id,
459 process_id: env.process_id,
460 tenant_id: env.tenant_id,
461 workflow_id,
462 causation_id: Some(env.event_id.into()),
463 }
464 }
465
466 /// Build a context for a deadline-triggered command.
467 ///
468 /// Propagates `process_id` and `tenant_id` from the deadline. Generates
469 /// fresh `correlation_id` and `conversation_id` (deadline firings start
470 /// a new tracing root).
471 ///
472 /// # Example
473 ///
474 /// ```rust,ignore
475 /// let ctx = CommandContext::from_deadline(&overdue_deadline, workflow_id);
476 /// process.execute_with(HandleTimeout { label: overdue_deadline.label().into() }, ctx).await?;
477 /// ```
478 #[must_use]
479 pub fn from_deadline(deadline: &crate::deadline::Deadline, workflow_id: WorkflowId) -> Self {
480 Self::new(deadline.tenant_id(), deadline.process_id(), workflow_id)
481 }
482
483 /// Build a [`NewEvent`] from this context and a domain event payload.
484 ///
485 /// This is the canonical way to construct a `NewEvent` inside a transport
486 /// adapter or test helper — it eliminates the nine-argument [`NewEvent::new`]
487 /// call and ensures that correlation metadata is always propagated correctly.
488 ///
489 /// # Errors
490 ///
491 /// Returns [`EngineError::Serialization`] when the event payload cannot be
492 /// serialized to JSON.
493 ///
494 /// # Example
495 ///
496 /// ```rust,ignore
497 /// // Inside a MessageAdapter or test:
498 /// let new_event = ctx.new_event(&SupplierChangeEvent::Activated)?;
499 /// store.append(&stream_id, ExpectedVersion::Any, &[new_event]).await?;
500 /// ```
501 pub fn new_event<E: EventPayload>(&self, event: &E) -> Result<NewEvent, EngineError> {
502 let payload =
503 serde_json::to_value(event).map_err(|e| EngineError::Serialization(e.to_string()))?;
504 Ok(NewEvent {
505 correlation_id: self.correlation_id,
506 causation_id: self.causation_id,
507 conversation_id: self.conversation_id,
508 process_id: self.process_id,
509 tenant_id: self.tenant_id,
510 workflow_id: self.workflow_id.clone(),
511 event_type: event.event_type().into(),
512 schema_version: event.schema_version(),
513 payload,
514 })
515 }
516}
517
518// ── EventEnvelope convenience ──────────────────────────────────────────────────
519
520impl EventEnvelope {
521 /// Build a [`NewEvent`] causally linked to this envelope.
522 ///
523 /// Propagates `correlation_id`, `conversation_id`, `process_id`, and
524 /// `tenant_id` from the envelope and sets `envelope.event_id` as the
525 /// `causation_id` of the new event. Useful when generating a follow-up
526 /// event (e.g. an APERAK trigger event) that must be traceable back to
527 /// the UTILMD envelope that caused it.
528 ///
529 /// # Errors
530 ///
531 /// Returns [`EngineError::Serialization`] when the event payload cannot be
532 /// serialized to JSON.
533 ///
534 /// # Example
535 ///
536 /// ```rust,ignore
537 /// // After persisting a UTILMD receive event, trigger an APERAK:
538 /// let aperak_new = utilmd_envelope.new_caused_event(
539 /// workflow_id,
540 /// &SupplierChangeEvent::AperakDispatched { positive: true, reason: None },
541 /// )?;
542 /// ```
543 pub fn new_caused_event<E: EventPayload>(
544 &self,
545 workflow_id: WorkflowId,
546 event: &E,
547 ) -> Result<NewEvent, EngineError> {
548 let payload =
549 serde_json::to_value(event).map_err(|e| EngineError::Serialization(e.to_string()))?;
550 Ok(NewEvent {
551 correlation_id: self.correlation_id,
552 causation_id: Some(self.event_id.into()),
553 conversation_id: self.conversation_id,
554 process_id: self.process_id,
555 tenant_id: self.tenant_id,
556 workflow_id,
557 event_type: event.event_type().into(),
558 schema_version: event.schema_version(),
559 payload,
560 })
561 }
562}
563
564// ── execute_command ───────────────────────────────────────────────────────────
565
566/// Dispatch a command through a workflow and persist the resulting events.
567///
568/// This is the crate-internal write-path entry point. The public API is
569/// [`Process::execute`] / [`Process::execute_with`].
570///
571/// It performs, in order:
572///
573/// 1. **Load** all events from `stream_id` via `store`.
574/// 2. **Reconstruct state** by folding events through [`Workflow::apply`].
575/// 3. **Handle** the command via [`Workflow::handle`] (pure, no I/O).
576/// 4. **Build** [`NewEvent`] values from each domain event + `ctx`.
577/// 5. **Append** atomically with optimistic concurrency
578/// (`ExpectedVersion::Exact(current_sequence)`).
579///
580/// Returns the persisted envelopes (with store-assigned IDs and sequence
581/// numbers). Returns an empty `Vec` when the workflow produced no events.
582///
583/// # Errors
584///
585/// - [`EngineError::VersionConflict`] when a concurrent writer raced ahead.
586/// - [`EngineError::Workflow`] when the workflow rejects the command.
587/// - [`EngineError::Deserialization`] when a stored event cannot be decoded.
588///
589/// [`Process::execute`]: crate::process::Process::execute
590/// [`Process::execute_with`]: crate::process::Process::execute_with
591pub(crate) async fn execute_command<W, S>(
592 store: &S,
593 stream_id: &crate::ids::StreamId,
594 command: W::Command,
595 ctx: &CommandContext,
596) -> Result<Vec<EventEnvelope>, EngineError>
597where
598 W: Workflow,
599 S: EventStore,
600{
601 execute_command_and_collect::<W, S>(store, stream_id, command, ctx)
602 .await
603 .map(|(envelopes, _outbox)| envelopes)
604}
605
606/// Like [`execute_command`] but also returns the [`PendingOutbox`] entries
607/// produced by [`Workflow::handle`].
608///
609/// Use this when the caller needs to inspect or render the outbox messages
610/// produced by the command — for example, in E2E tests that render EDIFACT
611/// wire bytes from the workflow's outbox. Avoids calling `handle()` a second
612/// time just to recover the outbox that `execute_command` silently discards.
613///
614/// [`PendingOutbox`]: crate::outbox::PendingOutbox
615pub(crate) async fn execute_command_and_collect<W, S>(
616 store: &S,
617 stream_id: &crate::ids::StreamId,
618 command: W::Command,
619 ctx: &CommandContext,
620) -> Result<(Vec<EventEnvelope>, Vec<PendingOutbox>), EngineError>
621where
622 W: Workflow,
623 S: EventStore,
624{
625 // ── 1 + 2. Stream-fold: reconstruct state without materialising a Vec ─────
626 //
627 // `fold_stream` feeds `EventEnvelope` values one-at-a-time; the engine
628 // never holds more than one envelope in memory during replay. Because
629 // the envelope is owned, `env.payload` is moved into `W::upcast` without
630 // a clone — no extra heap allocation per event.
631 let (state, current_sequence) = store
632 .fold_stream(
633 stream_id,
634 0,
635 (W::State::default(), 0u64),
636 |(acc, _), env| {
637 let seq = env.sequence_number;
638 // env.payload is moved here — no clone required.
639 let payload = W::upcast(&env.event_type, env.schema_version, env.payload)?;
640 let event: W::Event = serde_json::from_value(payload)
641 .map_err(|e| EngineError::Deserialization(e.to_string()))?;
642 Ok((W::apply(acc, &event), seq))
643 },
644 )
645 .await?;
646
647 // ── 3. Handle command (pure) ──────────────────────────────────────────────
648 let output = W::handle(&state, command)?;
649
650 if output.events.is_empty() {
651 return Ok((Vec::new(), output.outbox));
652 }
653
654 // ── 4. Build NewEvent values (caller-known metadata) ──────────────────────
655 let new_events: Result<Vec<NewEvent>, EngineError> = output
656 .events
657 .iter()
658 .map(|event| ctx.new_event(event))
659 .collect();
660 let new_events = new_events?;
661
662 // ── 5. Persist with optimistic concurrency ────────────────────────────────
663 // The store assigns event_id, sequence_number, stream_id, and timestamp.
664 // Outbox messages (output.outbox) are intentionally ignored here — use
665 // execute_command_atomic when atomic dual-writes are required.
666 let result = store
667 .append(
668 stream_id,
669 ExpectedVersion::Exact(current_sequence),
670 &new_events,
671 )
672 .await?;
673
674 Ok((result.events, output.outbox))
675}
676
677/// Like [`execute_command`] but atomically co-persists any [`PendingOutbox`]
678/// messages produced by [`Workflow::handle`].
679///
680/// Requires `S: AtomicAppend`. All internal logic is identical to
681/// `execute_command`; the only difference is the persistence call at the end.
682pub(crate) async fn execute_command_atomic<W, S>(
683 store: &S,
684 stream_id: &crate::ids::StreamId,
685 command: W::Command,
686 ctx: &CommandContext,
687) -> Result<Vec<EventEnvelope>, EngineError>
688where
689 W: Workflow,
690 S: crate::event_store::AtomicAppend,
691{
692 // ── 1 + 2. Stream-fold: reconstruct state without materialising a Vec ─────
693 let (state, current_sequence) = store
694 .fold_stream(
695 stream_id,
696 0,
697 (W::State::default(), 0u64),
698 |(acc, _), env| {
699 let seq = env.sequence_number;
700 let payload = W::upcast(&env.event_type, env.schema_version, env.payload)?;
701 let event: W::Event = serde_json::from_value(payload)
702 .map_err(|e| EngineError::Deserialization(e.to_string()))?;
703 Ok((W::apply(acc, &event), seq))
704 },
705 )
706 .await?;
707
708 // ── 3. Handle command (pure) ──────────────────────────────────────────────
709 let output = W::handle(&state, command)?;
710
711 if output.events.is_empty() {
712 return Ok(Vec::new());
713 }
714
715 // ── 4. Build NewEvent values ──────────────────────────────────────────────
716 let new_events: Result<Vec<NewEvent>, EngineError> = output
717 .events
718 .iter()
719 .map(|event| ctx.new_event(event))
720 .collect();
721 let new_events = new_events?;
722
723 // ── 5. Persist events + outbox atomically ─────────────────────────────────
724 let result = store
725 .append_with_outbox(
726 stream_id,
727 ExpectedVersion::Exact(current_sequence),
728 &new_events,
729 &output.outbox,
730 )
731 .await?;
732
733 Ok(result.events)
734}
735
736/// Reconstruct `(W::State, current_sequence)` using an optional snapshot as a
737/// starting point.
738///
739/// When a snapshot with matching schema version exists, replay starts from
740/// `snap.sequence_number` (O(k) tail scan). Otherwise falls back to full replay.
741async fn reconstruct_with_snapshot<W, S, Snap>(
742 store: &S,
743 snap_store: &Snap,
744 stream_id: &crate::ids::StreamId,
745) -> Result<(W::State, u64), EngineError>
746where
747 W: Workflow,
748 W::State: serde::de::DeserializeOwned,
749 S: EventStore,
750 Snap: crate::snapshot::SnapshotStore,
751{
752 let maybe_snap = snap_store.load(stream_id).await?;
753 let (initial_state, from_sequence) = match &maybe_snap {
754 Some(snap) if snap.state_schema_version == W::state_schema_version() => {
755 let state = serde_json::from_value::<W::State>(snap.state.clone())
756 .map_err(|e| EngineError::Deserialization(e.to_string()))?;
757 (state, snap.sequence_number)
758 }
759 #[allow(unused_variables)]
760 Some(snap) => {
761 #[cfg(feature = "tracing")]
762 tracing::warn!(
763 expected = W::state_schema_version(),
764 actual = snap.state_schema_version,
765 stream_id = %stream_id,
766 "snapshot schema version mismatch; falling back to full replay"
767 );
768 (W::State::default(), 0)
769 }
770 None => (W::State::default(), 0),
771 };
772 store
773 .fold_stream(
774 stream_id,
775 from_sequence,
776 (initial_state, from_sequence),
777 |(acc, _), env| {
778 let seq = env.sequence_number;
779 let payload = W::upcast(&env.event_type, env.schema_version, env.payload)?;
780 let event: W::Event = serde_json::from_value(payload)
781 .map_err(|e| EngineError::Deserialization(e.to_string()))?;
782 Ok((W::apply(acc, &event), seq))
783 },
784 )
785 .await
786}
787
788/// Like [`execute_command`] but uses a snapshot store to skip full replay.
789///
790/// When a valid snapshot exists, only tail events since the snapshot are
791/// replayed — O(k) instead of O(n). Falls back to full replay when no snapshot
792/// exists or the schema version has changed.
793pub(crate) async fn execute_command_with_snapshot<W, S, Snap>(
794 store: &S,
795 snap_store: &Snap,
796 stream_id: &crate::ids::StreamId,
797 command: W::Command,
798 ctx: &CommandContext,
799) -> Result<Vec<EventEnvelope>, EngineError>
800where
801 W: Workflow,
802 W::State: serde::de::DeserializeOwned,
803 S: EventStore,
804 Snap: crate::snapshot::SnapshotStore,
805{
806 let (state, current_sequence) =
807 reconstruct_with_snapshot::<W, S, Snap>(store, snap_store, stream_id).await?;
808
809 let output = W::handle(&state, command)?;
810 if output.events.is_empty() {
811 return Ok(Vec::new());
812 }
813 let new_events: Result<Vec<NewEvent>, EngineError> = output
814 .events
815 .iter()
816 .map(|event| ctx.new_event(event))
817 .collect();
818 let new_events = new_events?;
819 let result = store
820 .append(
821 stream_id,
822 ExpectedVersion::Exact(current_sequence),
823 &new_events,
824 )
825 .await?;
826 Ok(result.events)
827}
828
829/// Like [`execute_command_atomic`] but uses a snapshot store to skip full replay.
830///
831/// Atomically co-persists outbox messages alongside events while using a
832/// snapshot as the starting point for state reconstruction.
833pub(crate) async fn execute_command_atomic_with_snapshot<W, S, Snap>(
834 store: &S,
835 snap_store: &Snap,
836 stream_id: &crate::ids::StreamId,
837 command: W::Command,
838 ctx: &CommandContext,
839) -> Result<Vec<EventEnvelope>, EngineError>
840where
841 W: Workflow,
842 W::State: serde::de::DeserializeOwned,
843 S: crate::event_store::AtomicAppend,
844 Snap: crate::snapshot::SnapshotStore,
845{
846 let (state, current_sequence) =
847 reconstruct_with_snapshot::<W, S, Snap>(store, snap_store, stream_id).await?;
848
849 let output = W::handle(&state, command)?;
850 if output.events.is_empty() {
851 return Ok(Vec::new());
852 }
853 let new_events: Result<Vec<NewEvent>, EngineError> = output
854 .events
855 .iter()
856 .map(|event| ctx.new_event(event))
857 .collect();
858 let new_events = new_events?;
859 let result = store
860 .append_with_outbox(
861 stream_id,
862 ExpectedVersion::Exact(current_sequence),
863 &new_events,
864 &output.outbox,
865 )
866 .await?;
867 Ok(result.events)
868}