Skip to main content

Process

Struct Process 

Source
pub struct Process<W: Workflow, S: EventStore> { /* private fields */ }
Expand description

An ergonomic typed handle for a single MaKo process instance.

Process bundles the StreamId, ProcessId, TenantId, WorkflowId, and event store into a single owned value so callers do not need to pass them on every command dispatch.

§Generic parameters

  • W — the Workflow implementation. In practice this is a zero-size marker struct; the type parameter carries the domain logic as associated types.
  • S — the EventStore backend. InMemoryEventStore is the default for tests; production deployments wrap a persistent backend in Arc and use Process<W, Arc<MyStore>>.

§Clone semantics

If S: Clone (e.g. InMemoryEventStore or Arc<…>), Process is also Clone and all clones share the same underlying storage.

Implementations§

Source§

impl<W: Workflow, S: EventStore> Process<W, S>

Source

pub fn new(store: S, tenant_id: TenantId, workflow_id: WorkflowId) -> Self

Create a fresh process instance.

Generates a new ProcessId and derives the StreamId from tenant_id and process_id (process/{tenant_id}/{process_id}). Use this when starting a new MaKo process (e.g. on receipt of the first inbound UTILMD Lieferbeginn).

Source

pub fn from_stream( store: S, stream_id: StreamId, process_id: ProcessId, tenant_id: TenantId, workflow_id: WorkflowId, ) -> Self

Attach to an existing process stream.

Use this on service restart or when routing an inbound message to an already-running process whose identifiers were previously persisted.

Source

pub fn stream_id(&self) -> &StreamId

The event stream identifier for this process.

Source

pub fn process_id(&self) -> ProcessId

The stable process identifier.

Source

pub fn tenant_id(&self) -> TenantId

The tenant that owns this process.

Source

pub fn workflow_id(&self) -> &WorkflowId

The workflow version under which this process was created.

Source

pub fn identity(&self) -> ProcessIdentity

Return a serializable value bundle of all four process identifiers.

Persist this to a routing table (e.g. keyed by conversation_id or correlation_id) so inbound messages can be routed to the correct running process without the caller needing to manage four separate fields.

Use Process::from_identity to re-attach to the same process stream on a subsequent request.

let id = process.identity();
routing_table.insert(conv_id, id.clone());

// Later, on a subsequent inbound message:
let id = routing_table.get(&conv_id)?;
let process = Process::<MyWorkflow, _>::from_identity(store, id);
Source

pub fn context_for_inbound(&self, interchange_ref: &str) -> CommandContext

Build a CommandContext for an inbound EDIFACT message dispatch.

Derives a deterministic CorrelationId from interchange_ref (UUID v5) so repeated dispatches of the same EDIFACT message — e.g. AS4 retransmissions or idempotent REST replays — produce the same correlation root. This makes EDIFACT-level idempotency observable in distributed traces without any extra dedup logic at the engine level.

Use this instead of Process::execute when you need to propagate EDIFACT correlation metadata into the event stream. The returned context is passed to Process::execute_with.

§Example
let process = ctx.resume::<GpkeSupplierChangeWorkflow>(identity);
let cmd_ctx = process.context_for_inbound(&utilmd_interchange_ref);
process.execute_with(command, cmd_ctx).await?;
Source

pub fn from_identity(store: S, identity: ProcessIdentity) -> Self

Attach to an existing process stream from a previously persisted ProcessIdentity.

This is the companion to Process::identity: look up the identity from your routing table and call from_identity to get a live Process handle bound to store.

Source

pub async fn event_count(&self) -> Result<u64, EngineError>

Return the number of events currently in the stream.

Uses EventStore::stream_version for an efficient O(1) metadata query on backends that override it. Falls back to loading all events on stores that use the default implementation.

Use this to decide whether to take a snapshot — e.g. with Snapshot::should_take:

if Snapshot::should_take(process.event_count().await?, 100) {
    process.take_snapshot(&snap_store, 100).await?;
}
§Errors

Returns EngineError::Store on storage failures.

Source

pub async fn execute( &self, command: W::Command, ) -> Result<Vec<EventEnvelope>, EngineError>

Dispatch command using a freshly generated CommandContext.

A new CorrelationId and ConversationId are auto-generated for each call. To propagate tracing IDs from an inbound EDIFACT message across a multi-step command chain, use execute_with.

§Errors
Source

pub async fn execute_and_collect( &self, command: W::Command, ) -> Result<(Vec<EventEnvelope>, Vec<OutboxMessage>), EngineError>

Like execute but also returns the outbox messages produced by Workflow::handle, fully stamped with the real IDs from the persisted event.

The returned OutboxMessage entries have their causation_event_id set to the event_id of the first persisted event — identical to what execute_and_enqueue writes into the OutboxStore atomically. This makes the messages ready to pass directly to the EDIFACT renderer without any manual ID stitching.

Use this in E2E and integration tests that need to inspect or render outbox messages after a command is persisted, without the awkward handle() + execute() double invocation.

§Errors

Returns EngineError on storage or command handling failure.

Source

pub async fn execute_with( &self, command: W::Command, ctx: CommandContext, ) -> Result<Vec<EventEnvelope>, EngineError>

Dispatch command with a caller-supplied CommandContext.

Use this when you need to thread a specific correlation_id, conversation_id, or causation_id through the command. For example, when dispatching an APERAK in response to a UTILMD, pass the conversation_id from the UTILMD envelope so both exchanges are traceable as a single business conversation.

Build a context with:

let ctx = CommandContext::new(tenant_id, process_id, workflow_id)
    .with_causation(utilmd_event_id.into())  // From<EventId> for CausationId
    .with_conversation(utilmd_conversation_id);
process.execute_with(DispatchAperak { .. }, ctx).await?;
§Errors

See Process::execute for the error contract.

Source

pub async fn execute_snapshot<Snap>( &self, command: W::Command, snap_store: &Snap, ) -> Result<Vec<EventEnvelope>, EngineError>

Dispatch command using a snapshot store to accelerate state reconstruction.

Equivalent to Process::execute but starts replay from the most recent snapshot rather than from sequence 0. For streams with thousands of events and a snapshot within the last 100 events, this reduces replay cost from O(n) to O(k) where k is the tail length since the last snapshot.

When no snapshot exists or the schema version has changed, falls back to full O(n) replay — identical in cost to Process::execute.

§Errors

Same contract as Process::execute.

Source

pub async fn state(&self) -> Result<W::State, EngineError>

Reconstruct the current workflow state by replaying all persisted events.

This is a read-only operation — it loads events but does not acquire any write lock or check optimistic concurrency. Use it to:

  • Inspect process status in tests without dispatching a command.
  • Build a diagnostic snapshot for observability or health checks.
  • Implement query-side read models that need the full typed state.

For production read models, prefer a Projection that is updated incrementally rather than replaying the full stream on every query.

To accelerate replay for long-lived streams, use Process::state_with_snapshot instead.

§Errors
Source

pub async fn state_with_snapshot<Snap: SnapshotStore>( &self, snap_store: &Snap, ) -> Result<W::State, EngineError>

Reconstruct current state using a snapshot as the starting point.

Loads the most recent snapshot for this stream from snap_store. If one exists, deserializes it into W::State and then replays only events appended after the snapshot’s sequence_number (O(k) instead of O(n)). Falls back to full replay when no snapshot exists.

§When to use

Use this instead of Process::state for long-lived processes where the event count grows large. Pair it with Process::take_snapshot to keep the snapshot store current after each command.

§Schema version compatibility

Snapshots whose state field cannot be deserialized into W::State (e.g. after a breaking state schema change) will return EngineError::Deserialization. In that case, fall back to Process::state (full replay) and take a fresh snapshot.

§Errors
Source

pub async fn take_snapshot<Snap: SnapshotStore>( &self, snap_store: &Snap, interval: u64, ) -> Result<bool, EngineError>
where W::State: Serialize,

Reconstruct current state and save a snapshot if the event-count threshold is reached.

Checks Snapshot::should_take with interval. When at least interval new events have accumulated since the last snapshot, reconstructs state via full replay, serializes it, and calls SnapshotStore::save.

Returns true when a snapshot was taken, false when the threshold was not reached or interval is 0.

§Integration pattern
// After every successful command:
process.execute(command).await?;
process.take_snapshot(&snap_store, 100).await?;

// On the read path — O(k) instead of O(n):
let state = process.state_with_snapshot(&snap_store).await?;
§Errors
Source

pub async fn execute_with_retry( &self, command: W::Command, max_attempts: u32, ) -> Result<Vec<EventEnvelope>, EngineError>
where W::Command: Clone,

Dispatch command with automatic retry on EngineError::VersionConflict.

A version conflict occurs when a concurrent writer appended events between this process’s read and its append attempt. On each conflict, the engine reloads the complete event stream from the store and replays all events to rebuild fresh state before re-handling the command. Stale in-memory state from a previous attempt is never carried forward — each retry always starts from a fully-rebuilt snapshot.

Non-conflict errors (storage failures, workflow rejections) are returned immediately without retrying.

A freshly-generated CommandContext is pinned before the first attempt and reused across all retries so all events share the same correlation root regardless of retry count. Use execute_with_retry_ctx to supply a specific context (e.g. one derived from an inbound EDIFACT envelope).

§When to use

Use for commands where two inbound EDIFACT messages for the same process may arrive concurrently — e.g. a UTILMD and its APERAK processed on separate async tasks.

§Command cloning

W::Command must implement Clone so it can be resubmitted on each retry without reconstructing it from scratch.

§Errors
Source

pub async fn execute_with_retry_ctx( &self, command: W::Command, ctx: CommandContext, max_attempts: u32, ) -> Result<Vec<EventEnvelope>, EngineError>
where W::Command: Clone,

Dispatch command with a caller-supplied CommandContext and automatic retry on EngineError::VersionConflict.

Identical to execute_with_retry but threads the provided ctx (including its correlation_id, conversation_id, and causation_id) through every retry attempt. Use this when you need to propagate tracing IDs from an inbound EDIFACT envelope across a retried command.

§Example
let ctx = CommandContext::from_envelope(&utilmd_envelope, workflow_id);
process.execute_with_retry_ctx(HandleAperak { .. }, ctx, 3).await?;
§Errors

See execute_with_retry for the error contract.

§Panics

Panics if max_attempts is 0 and the guard at the top of the function is somehow bypassed (unreachable in practice).

Source

pub async fn execute_and_enqueue( &self, command: W::Command, ) -> Result<Vec<EventEnvelope>, EngineError>
where S: AtomicAppend,

Execute command and atomically co-persist any PendingOutbox messages produced by Workflow::handle.

Like execute, but requires S: AtomicAppend. When the workflow’s handle returns outbox messages alongside events, both are written to storage in a single WriteBatch, eliminating the silent message-loss window that would exist with separate writes.

When the handle returns no outbox messages, this degenerates to a plain EventStore::append (no performance cost).

Use this method instead of execute in all production code that needs outbox delivery guarantees. Plain execute silently drops any outbox entries produced by the workflow handler — a crash between execute and a subsequent manual OutboxStore::enqueue call would lose the APERAK or UTILMD response permanently.

For long event streams with periodic snapshots use execute_and_enqueue_snapshot to reduce O(n) replay cost to O(k). In concurrent environments where VersionConflict is expected, use execute_and_enqueue_with_retry to retry automatically.

§Example
use std::sync::Arc;
use mako_engine::process::Process;
use mako_engine::version::WorkflowId;
use mako_engine::ids::TenantId;

// SlateDbStore implements AtomicAppend — required for execute_and_enqueue.
let store = Arc::new(SlateDbStore::open_in_memory().await?);
let tenant_id = TenantId::from_party_id("9904231000007");
let workflow_id = WorkflowId::new("gpke-supplier-change", fv);

let process = Process::<GpkeSupplierChangeWorkflow, _>::new(
    Arc::clone(&store),
    tenant_id,
    workflow_id,
);

// The workflow handle emits a PendingOutbox APERAK entry alongside the event.
// execute_and_enqueue writes both in one WriteBatch — no partial-write window.
let events = process
    .execute_and_enqueue(GpkeCommand::ReceiveUtilmd { pid: 55001, payload })
    .await?;

assert!(!events.is_empty(), "at least one event was persisted");

// The APERAK outbox entry is now visible to the outbox worker:
let pending = store.peek_outbox(tenant_id, 10).await?;
assert_eq!(pending.len(), 1, "APERAK enqueued atomically with the event");
§Errors
Source

pub async fn execute_and_enqueue_snapshot<Snap>( &self, command: W::Command, snap_store: &Snap, ) -> Result<Vec<EventEnvelope>, EngineError>

Like execute_and_enqueue but uses a snapshot to accelerate replay.

Atomically persists events and outbox entries while starting state reconstruction from the most recent snapshot. For long streams with periodic snapshots this reduces replay cost from O(n) to O(k).

§Errors

Returns EngineError on storage or command handling failure.

Source

pub async fn execute_timeout( &self, deadline: &Deadline, ) -> Result<Option<Vec<EventEnvelope>>, EngineError>
where S: AtomicAppend,

Dispatch the compensation command returned by Workflow::on_deadline.

Reconstructs the current process state, calls W::on_deadline(deadline, &state), and — if the hook returns Some(command) — executes it via Process::execute_and_enqueue, which atomically persists events and any outbox entries (e.g. APERAK Ablehnung) produced by the compensation handler.

Returns Ok(Some(events)) when compensation fired, Ok(None) when the hook returned None (deadline acknowledged as no-op).

This is the canonical way to wire deadline firings to workflow compensation logic. Any WorkflowOutput::with_outbox entries returned by on_deadline are guaranteed to be persisted atomically — there is no window where the event is stored but the outbox entry is lost.

§Example
// In the deadline worker:
let overdue = ctx.deadline_store().due_now(50).await?;
for deadline in overdue {
    let identity = ctx.registry()
        .lookup(deadline.tenant_id(), &RegistryKey::from_process(deadline.process_id()))
        .await?
        .expect("process must be registered");
    let process = ctx.resume::<GpkeSupplierChangeWorkflow>(identity);
    if let Some(events) = process.execute_timeout(&deadline).await? {
        // compensation command was dispatched — APERAK Ablehnung enqueued
        tracing::info!(events = events.len(), "timeout compensation applied");
    }
    ctx.deadline_store().cancel(deadline.deadline_id()).await?;
}
§Errors

Propagates EngineError::VersionConflict, EngineError::Workflow, and storage errors from execute_and_enqueue. Use execute_timeout_with_retry when VersionConflict retries are required.

Source

pub async fn execute_timeout_with_retry( &self, deadline: &Deadline, max_attempts: u32, ) -> Result<Option<Vec<EventEnvelope>>, EngineError>
where S: AtomicAppend, W::Command: Clone,

Like execute_timeout but retries on VersionConflict up to max_attempts times.

Use this in production deadline workers where concurrent event appends are expected. Outbox entries (e.g. APERAK Ablehnung) produced by the compensation handler are persisted atomically on every attempt.

§Errors

Returns EngineError on storage or command handling failure.

§Panics

Panics if the deadline produces a command but the retry loop somehow exhausts without capturing an error (unreachable in practice).

Source

pub async fn execute_and_enqueue_with_retry( &self, command: W::Command, max_attempts: u32, ) -> Result<Vec<EventEnvelope>, EngineError>
where S: AtomicAppend, W::Command: Clone,

Like execute_and_enqueue but retries on crate::error::EngineError::VersionConflict up to max_attempts times.

§Errors

Returns EngineError on storage or command handling failure.

§Panics

Panics if max_attempts is 0 and the guard is bypassed (unreachable).

Source

pub async fn execute_and_enqueue_with_snapshot<Snap>( &self, command: W::Command, snap_store: &Snap, snapshot_interval: u64, ) -> Result<(Vec<EventEnvelope>, bool), EngineError>

Execute command atomically with outbox, then automatically snapshot if the event-count threshold is reached.

Combines execute_and_enqueue with take_snapshot: after a successful write, checks whether event_count % snapshot_interval == 0 and, if so, serialises and saves a snapshot via snap_store.

Pass snapshot_interval = 0 to disable auto-snapshotting; the call then behaves identically to execute_and_enqueue.

Returns (events, snapshot_taken) where snapshot_taken is true when a snapshot was written this call.

§Errors
Source

pub async fn execute_and_enqueue_with_snapshot_and_retry<Snap>( &self, command: W::Command, max_attempts: u32, snap_store: &Snap, snapshot_interval: u64, ) -> Result<(Vec<EventEnvelope>, bool), EngineError>

Like execute_and_enqueue_with_snapshot but retries on crate::error::EngineError::VersionConflict up to max_attempts times.

§Errors
§Panics

Panics if max_attempts is 0 and the loop guard is bypassed (unreachable).

Trait Implementations§

Source§

impl<W: Workflow, S: EventStore + Clone> Clone for Process<W, S>

Source§

fn clone(&self) -> Self

Returns a duplicate of the value. Read more
1.0.0 (const: unstable) · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl<W: Workflow, S: EventStore + Debug> Debug for Process<W, S>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<W, S> Freeze for Process<W, S>
where S: Freeze,

§

impl<W, S> RefUnwindSafe for Process<W, S>
where S: RefUnwindSafe,

§

impl<W, S> Send for Process<W, S>

§

impl<W, S> Sync for Process<W, S>

§

impl<W, S> Unpin for Process<W, S>
where S: Unpin,

§

impl<W, S> UnsafeUnpin for Process<W, S>
where S: UnsafeUnpin,

§

impl<W, S> UnwindSafe for Process<W, S>
where S: UnwindSafe,

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<ST, DT> CastableFrom<ST, Initialized, Initialized> for DT
where ST: ?Sized, DT: ?Sized,

Source§

impl<ST, DT> CastableFrom<ST, Uninit, Uninit> for DT
where ST: ?Sized, DT: ?Sized,

Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Read<Exclusive, BecauseExclusive> for T
where T: ?Sized,

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more