Skip to main content

Workflow

Trait Workflow 

Source
pub trait Workflow:
    Send
    + Sync
    + 'static {
    type State: Default + Clone + Send + Sync + 'static;
    type Event: EventPayload;
    type Command: CommandPayload;

    // Required methods
    fn apply(state: Self::State, event: &Self::Event) -> Self::State;
    fn handle(
        state: &Self::State,
        command: Self::Command,
    ) -> Result<WorkflowOutput<Self::Event>, WorkflowError>;

    // Provided methods
    fn state_schema_version() -> u32 { ... }
    fn upcast(
        _event_type: &str,
        _from_version: u32,
        payload: Value,
    ) -> Result<Value, EngineError> { ... }
    fn version_policy() -> WorkflowVersionPolicy { ... }
    fn on_deadline(
        _deadline: &Deadline,
        _state: &Self::State,
    ) -> Option<Self::Command> { ... }
}
Expand description

A versioned, deterministic domain workflow.

Workflows are the unit of business logic in the engine. Each BDEW process variant (e.g. GPKE Lieferbeginn, WiM Gerätewechsel) is a separate Workflow implementation in its domain crate.

§State reconstruction

Before handling a command, the engine calls Workflow::apply on every event in the stream to reconstruct the current state. This is the only path to reading state — there is no “load current state” API.

§Determinism

handle and apply must be deterministic and free of side effects. Do not access clocks, RNGs, network, or file system inside them.

Required Associated Types§

Source

type State: Default + Clone + Send + Sync + 'static

Domain-specific process state, reconstructed by replaying events.

Source

type Event: EventPayload

Domain event type emitted by this workflow.

Source

type Command: CommandPayload

Command type handled by this workflow.

Required Methods§

Source

fn apply(state: Self::State, event: &Self::Event) -> Self::State

Fold a domain event into the current state.

This function must be total (no panics, no errors) and must produce a deterministic result.

Source

fn handle( state: &Self::State, command: Self::Command, ) -> Result<WorkflowOutput<Self::Event>, WorkflowError>

Validate command against state and return the events to emit.

Return an empty WorkflowOutput (or vec![].into()) when the command is a no-op (already processed).

Outbox messages in the returned WorkflowOutput::outbox will be atomically co-persisted with the events when the command is dispatched via Process::execute_and_enqueue. If dispatched via Process::execute, the outbox field is silently ignored.

§Errors

Return a WorkflowError when the command is invalid for the current state or when domain validation fails.

Provided Methods§

Source

fn state_schema_version() -> u32

Schema version for serialized Workflow::State payloads.

The engine stores this value in every Snapshot taken via Process::take_snapshot. Increment it when the serialized state layout changes in a backward-incompatible way, and add a migration arm to your snapshot loader.

Defaults to 1.

Source

fn upcast( _event_type: &str, _from_version: u32, payload: Value, ) -> Result<Value, EngineError>

Upcast a stored event payload from an older schema version.

The engine calls this during state reconstruction for every loaded event, before deserializing the payload into Self::Event. The returned serde_json::Value is passed to the standard JSON deserializer.

Override this when you bump EventPayload::schema_version on a variant — return a Value compatible with the new schema so old events replay correctly without a data migration.

§Example
fn upcast(
    event_type: &str,
    from_version: u32,
    mut payload: serde_json::Value,
) -> Result<serde_json::Value, EngineError> {
    // v2 of SupplierChangeInitiated added a `document_type` field.
    if event_type == "SupplierChangeInitiated" && from_version == 1 {
        payload["document_type"] = serde_json::json!("E01");
    }
    Ok(payload)
}
§Errors

Return EngineError::Deserialization when the payload cannot be migrated to the current schema.

Source

fn version_policy() -> WorkflowVersionPolicy

Declares which BDEW format versions this workflow accepts for in-flight processes.

The engine uses this policy to validate that an incoming message’s format version is acceptable before constructing the command, surfacing missing adapter coverage at dispatch time rather than during runtime deserialization.

The default returns WorkflowVersionPolicy::ForwardCompatible — accept messages in any format version. This is the safe default for the majority of BDEW market-communication processes, which routinely span annual release boundaries (e.g. a GPKE Lieferbeginn process started in September may still receive APERAK replies in November under the new October FV).

Override to Pinned only for strictly short-lived workflows that are guaranteed to complete within a single BDEW release cycle.

§Example
use mako_engine::version::WorkflowVersionPolicy;

// Override to Pinned for a workflow with a 24h wall-clock SLA:
fn version_policy() -> WorkflowVersionPolicy {
    WorkflowVersionPolicy::Pinned
}
Source

fn on_deadline( _deadline: &Deadline, _state: &Self::State, ) -> Option<Self::Command>

Map a fired deadline to a compensating command.

Called by Process::execute_timeout when a registered deadline for this workflow’s process becomes overdue. Return Some(command) to trigger a compensating action; return None to acknowledge the deadline as a no-op.

This method must be pure: no I/O, no clock access, no global state. The same (deadline, state) must always produce the same Option<Command>.

The full Deadline is provided (not just the label) so implementations can construct commands that require deadline_id (e.g. TimeoutExpired).

§Why a dedicated hook instead of a normal command?

A synthetic TimeoutFired command variant works but couples the workflow enum to infrastructure concerns. on_deadline keeps the domain command type clean and makes compensation logic explicit and testable in isolation:

fn on_deadline(
    deadline: &Deadline,
    state: &Self::State,
) -> Option<Self::Command> {
    match (deadline.label(), state) {
        ("aperak-window", SupplierChangeState::Initiated(_) | SupplierChangeState::ValidationPassed(_)) => {
            Some(SupplierChangeCommand::TimeoutExpired {
                deadline_id: deadline.deadline_id(),
                label: deadline.label().into(),
            })
        }
        _ => None,
    }
}
§Default

Returns None for all deadlines — no automatic compensation. Override in any workflow that has deadline-triggered compensation requirements.

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety".

Implementors§