pub struct Process<W: Workflow, S: EventStore> { /* private fields */ }Expand description
An ergonomic typed handle for a single MaKo process instance.
Process bundles the StreamId, ProcessId, TenantId,
WorkflowId, and event store into a single owned value so callers do not
need to pass them on every command dispatch.
§Generic parameters
W— theWorkflowimplementation. In practice this is a zero-size marker struct; the type parameter carries the domain logic as associated types.S— theEventStorebackend.InMemoryEventStoreis the default for tests; production deployments wrap a persistent backend inArcand useProcess<W, Arc<MyStore>>.
§Clone semantics
If S: Clone (e.g. InMemoryEventStore or Arc<…>), Process is also
Clone and all clones share the same underlying storage.
Implementations§
Source§impl<W: Workflow, S: EventStore> Process<W, S>
impl<W: Workflow, S: EventStore> Process<W, S>
Sourcepub fn new(store: S, tenant_id: TenantId, workflow_id: WorkflowId) -> Self
pub fn new(store: S, tenant_id: TenantId, workflow_id: WorkflowId) -> Self
Sourcepub fn from_stream(
store: S,
stream_id: StreamId,
process_id: ProcessId,
tenant_id: TenantId,
workflow_id: WorkflowId,
) -> Self
pub fn from_stream( store: S, stream_id: StreamId, process_id: ProcessId, tenant_id: TenantId, workflow_id: WorkflowId, ) -> Self
Attach to an existing process stream.
Use this on service restart or when routing an inbound message to an already-running process whose identifiers were previously persisted.
Sourcepub fn process_id(&self) -> ProcessId
pub fn process_id(&self) -> ProcessId
The stable process identifier.
Sourcepub fn workflow_id(&self) -> &WorkflowId
pub fn workflow_id(&self) -> &WorkflowId
The workflow version under which this process was created.
Sourcepub fn identity(&self) -> ProcessIdentity
pub fn identity(&self) -> ProcessIdentity
Return a serializable value bundle of all four process identifiers.
Persist this to a routing table (e.g. keyed by conversation_id or
correlation_id) so inbound messages can be routed to the correct
running process without the caller needing to manage four separate
fields.
Use Process::from_identity to re-attach to the same process stream
on a subsequent request.
let id = process.identity();
routing_table.insert(conv_id, id.clone());
// Later, on a subsequent inbound message:
let id = routing_table.get(&conv_id)?;
let process = Process::<MyWorkflow, _>::from_identity(store, id);Sourcepub fn context_for_inbound(&self, interchange_ref: &str) -> CommandContext
pub fn context_for_inbound(&self, interchange_ref: &str) -> CommandContext
Build a CommandContext for an inbound EDIFACT message dispatch.
Derives a deterministic CorrelationId from interchange_ref
(UUID v5) so repeated dispatches of the same EDIFACT message — e.g.
AS4 retransmissions or idempotent REST replays — produce the same
correlation root. This makes EDIFACT-level idempotency observable in
distributed traces without any extra dedup logic at the engine level.
Use this instead of Process::execute when you need to propagate
EDIFACT correlation metadata into the event stream. The returned
context is passed to Process::execute_with.
§Example
let process = ctx.resume::<GpkeSupplierChangeWorkflow>(identity);
let cmd_ctx = process.context_for_inbound(&utilmd_interchange_ref);
process.execute_with(command, cmd_ctx).await?;Sourcepub fn from_identity(store: S, identity: ProcessIdentity) -> Self
pub fn from_identity(store: S, identity: ProcessIdentity) -> Self
Attach to an existing process stream from a previously persisted
ProcessIdentity.
This is the companion to Process::identity: look up the identity
from your routing table and call from_identity to get a live
Process handle bound to store.
Sourcepub async fn event_count(&self) -> Result<u64, EngineError>
pub async fn event_count(&self) -> Result<u64, EngineError>
Return the number of events currently in the stream.
Uses EventStore::stream_version for an efficient O(1) metadata
query on backends that override it. Falls back to loading all events
on stores that use the default implementation.
Use this to decide whether to take a snapshot — e.g. with
Snapshot::should_take:
if Snapshot::should_take(process.event_count().await?, 100) {
process.take_snapshot(&snap_store, 100).await?;
}§Errors
Returns EngineError::Store on storage failures.
Sourcepub async fn execute(
&self,
command: W::Command,
) -> Result<Vec<EventEnvelope>, EngineError>
pub async fn execute( &self, command: W::Command, ) -> Result<Vec<EventEnvelope>, EngineError>
Dispatch command using a freshly generated CommandContext.
A new CorrelationId and ConversationId are auto-generated for
each call. To propagate tracing IDs from an inbound EDIFACT message
across a multi-step command chain, use execute_with.
§Errors
EngineError::VersionConflictwhen a concurrent writer raced ahead; retry by callingexecuteagain.EngineError::Workflowwhen the workflow rejects the command.EngineError::Deserializationwhen a stored event cannot be decoded.
Sourcepub async fn execute_and_collect(
&self,
command: W::Command,
) -> Result<(Vec<EventEnvelope>, Vec<OutboxMessage>), EngineError>
pub async fn execute_and_collect( &self, command: W::Command, ) -> Result<(Vec<EventEnvelope>, Vec<OutboxMessage>), EngineError>
Like execute but also returns the outbox messages produced by
Workflow::handle, fully stamped with the real IDs from the persisted
event.
The returned OutboxMessage entries have their causation_event_id
set to the event_id of the first persisted event — identical to what
execute_and_enqueue writes into the OutboxStore atomically. This
makes the messages ready to pass directly to the EDIFACT renderer
without any manual ID stitching.
Use this in E2E and integration tests that need to inspect or render
outbox messages after a command is persisted, without the awkward
handle() + execute() double invocation.
§Errors
Returns EngineError on storage or command handling failure.
Sourcepub async fn execute_with(
&self,
command: W::Command,
ctx: CommandContext,
) -> Result<Vec<EventEnvelope>, EngineError>
pub async fn execute_with( &self, command: W::Command, ctx: CommandContext, ) -> Result<Vec<EventEnvelope>, EngineError>
Dispatch command with a caller-supplied CommandContext.
Use this when you need to thread a specific correlation_id,
conversation_id, or causation_id through the command. For example,
when dispatching an APERAK in response to a UTILMD, pass the
conversation_id from the UTILMD envelope so both exchanges are
traceable as a single business conversation.
Build a context with:
let ctx = CommandContext::new(tenant_id, process_id, workflow_id)
.with_causation(utilmd_event_id.into()) // From<EventId> for CausationId
.with_conversation(utilmd_conversation_id);
process.execute_with(DispatchAperak { .. }, ctx).await?;§Errors
See Process::execute for the error contract.
Sourcepub async fn execute_snapshot<Snap>(
&self,
command: W::Command,
snap_store: &Snap,
) -> Result<Vec<EventEnvelope>, EngineError>
pub async fn execute_snapshot<Snap>( &self, command: W::Command, snap_store: &Snap, ) -> Result<Vec<EventEnvelope>, EngineError>
Dispatch command using a snapshot store to accelerate state reconstruction.
Equivalent to Process::execute but starts replay from the most recent
snapshot rather than from sequence 0. For streams with thousands of events
and a snapshot within the last 100 events, this reduces replay cost from
O(n) to O(k) where k is the tail length since the last snapshot.
When no snapshot exists or the schema version has changed, falls back to
full O(n) replay — identical in cost to Process::execute.
§Errors
Same contract as Process::execute.
Sourcepub async fn state(&self) -> Result<W::State, EngineError>
pub async fn state(&self) -> Result<W::State, EngineError>
Reconstruct the current workflow state by replaying all persisted events.
This is a read-only operation — it loads events but does not acquire any write lock or check optimistic concurrency. Use it to:
- Inspect process status in tests without dispatching a command.
- Build a diagnostic snapshot for observability or health checks.
- Implement query-side read models that need the full typed state.
For production read models, prefer a Projection that is updated
incrementally rather than replaying the full stream on every query.
To accelerate replay for long-lived streams, use
Process::state_with_snapshot instead.
§Errors
EngineError::Storeon storage failures.EngineError::Deserializationwhen a stored event cannot be decoded intoW::Event(schema migration required).
Sourcepub async fn state_with_snapshot<Snap: SnapshotStore>(
&self,
snap_store: &Snap,
) -> Result<W::State, EngineError>where
W::State: DeserializeOwned,
pub async fn state_with_snapshot<Snap: SnapshotStore>(
&self,
snap_store: &Snap,
) -> Result<W::State, EngineError>where
W::State: DeserializeOwned,
Reconstruct current state using a snapshot as the starting point.
Loads the most recent snapshot for this stream from snap_store. If
one exists, deserializes it into W::State and then replays only
events appended after the snapshot’s sequence_number
(O(k) instead of O(n)). Falls back to full replay when no snapshot
exists.
§When to use
Use this instead of Process::state for long-lived processes where
the event count grows large. Pair it with Process::take_snapshot
to keep the snapshot store current after each command.
§Schema version compatibility
Snapshots whose state field cannot be deserialized into W::State
(e.g. after a breaking state schema change) will return
EngineError::Deserialization. In that case, fall back to
Process::state (full replay) and take a fresh snapshot.
§Errors
EngineError::Storeon snapshot or event storage failures.EngineError::Deserializationwhen the snapshot state or a tail event cannot be decoded.
Sourcepub async fn take_snapshot<Snap: SnapshotStore>(
&self,
snap_store: &Snap,
interval: u64,
) -> Result<bool, EngineError>
pub async fn take_snapshot<Snap: SnapshotStore>( &self, snap_store: &Snap, interval: u64, ) -> Result<bool, EngineError>
Reconstruct current state and save a snapshot if the event-count threshold is reached.
Checks Snapshot::should_take with interval. When at least
interval new events have accumulated since the last snapshot,
reconstructs state via full replay, serializes it, and calls
SnapshotStore::save.
Returns true when a snapshot was taken, false when the threshold
was not reached or interval is 0.
§Integration pattern
// After every successful command:
process.execute(command).await?;
process.take_snapshot(&snap_store, 100).await?;
// On the read path — O(k) instead of O(n):
let state = process.state_with_snapshot(&snap_store).await?;§Errors
EngineError::Storeon snapshot storage failures.EngineError::Serializationwhen the state cannot be JSON-encoded.EngineError::Deserializationwhen a stored event cannot be decoded.
Sourcepub async fn execute_with_retry(
&self,
command: W::Command,
max_attempts: u32,
) -> Result<Vec<EventEnvelope>, EngineError>
pub async fn execute_with_retry( &self, command: W::Command, max_attempts: u32, ) -> Result<Vec<EventEnvelope>, EngineError>
Dispatch command with automatic retry on EngineError::VersionConflict.
A version conflict occurs when a concurrent writer appended events between this process’s read and its append attempt. On each conflict, the engine reloads the complete event stream from the store and replays all events to rebuild fresh state before re-handling the command. Stale in-memory state from a previous attempt is never carried forward — each retry always starts from a fully-rebuilt snapshot.
Non-conflict errors (storage failures, workflow rejections) are returned immediately without retrying.
A freshly-generated CommandContext is pinned before the first
attempt and reused across all retries so all events share the same
correlation root regardless of retry count. Use
execute_with_retry_ctx to supply a specific context (e.g. one
derived from an inbound EDIFACT envelope).
§When to use
Use for commands where two inbound EDIFACT messages for the same process may arrive concurrently — e.g. a UTILMD and its APERAK processed on separate async tasks.
§Command cloning
W::Command must implement Clone so it can be resubmitted on
each retry without reconstructing it from scratch.
§Errors
EngineError::VersionConflictwhen allmax_attemptsare exhausted without a successful append.- Any non-conflict
EngineErrorreturned by the workflow or storage. EngineError::Storewhenmax_attemptsis0.
Sourcepub async fn execute_with_retry_ctx(
&self,
command: W::Command,
ctx: CommandContext,
max_attempts: u32,
) -> Result<Vec<EventEnvelope>, EngineError>
pub async fn execute_with_retry_ctx( &self, command: W::Command, ctx: CommandContext, max_attempts: u32, ) -> Result<Vec<EventEnvelope>, EngineError>
Dispatch command with a caller-supplied CommandContext and
automatic retry on EngineError::VersionConflict.
Identical to execute_with_retry but threads the provided ctx
(including its correlation_id, conversation_id, and causation_id)
through every retry attempt. Use this when you need to propagate
tracing IDs from an inbound EDIFACT envelope across a retried command.
§Example
let ctx = CommandContext::from_envelope(&utilmd_envelope, workflow_id);
process.execute_with_retry_ctx(HandleAperak { .. }, ctx, 3).await?;§Errors
See execute_with_retry for the error contract.
§Panics
Panics if max_attempts is 0 and the guard at the top of the function
is somehow bypassed (unreachable in practice).
Sourcepub async fn execute_and_enqueue(
&self,
command: W::Command,
) -> Result<Vec<EventEnvelope>, EngineError>where
S: AtomicAppend,
pub async fn execute_and_enqueue(
&self,
command: W::Command,
) -> Result<Vec<EventEnvelope>, EngineError>where
S: AtomicAppend,
Execute command and atomically co-persist any PendingOutbox messages
produced by Workflow::handle.
Like execute, but requires S: AtomicAppend. When the workflow’s
handle returns outbox messages alongside events, both are written to
storage in a single WriteBatch, eliminating the silent message-loss
window that would exist with separate writes.
When the handle returns no outbox messages, this degenerates to a plain
EventStore::append (no performance cost).
Use this method instead of execute in all production code that
needs outbox delivery guarantees. Plain execute silently drops any
outbox entries produced by the workflow handler — a crash between
execute and a subsequent manual OutboxStore::enqueue call would
lose the APERAK or UTILMD response permanently.
For long event streams with periodic snapshots use
execute_and_enqueue_snapshot to reduce O(n) replay cost to O(k).
In concurrent environments where VersionConflict is expected, use
execute_and_enqueue_with_retry to retry automatically.
§Example
use std::sync::Arc;
use mako_engine::process::Process;
use mako_engine::version::WorkflowId;
use mako_engine::ids::TenantId;
// SlateDbStore implements AtomicAppend — required for execute_and_enqueue.
let store = Arc::new(SlateDbStore::open_in_memory().await?);
let tenant_id = TenantId::from_party_id("9904231000007");
let workflow_id = WorkflowId::new("gpke-supplier-change", fv);
let process = Process::<GpkeSupplierChangeWorkflow, _>::new(
Arc::clone(&store),
tenant_id,
workflow_id,
);
// The workflow handle emits a PendingOutbox APERAK entry alongside the event.
// execute_and_enqueue writes both in one WriteBatch — no partial-write window.
let events = process
.execute_and_enqueue(GpkeCommand::ReceiveUtilmd { pid: 55001, payload })
.await?;
assert!(!events.is_empty(), "at least one event was persisted");
// The APERAK outbox entry is now visible to the outbox worker:
let pending = store.peek_outbox(tenant_id, 10).await?;
assert_eq!(pending.len(), 1, "APERAK enqueued atomically with the event");§Errors
EngineError::VersionConflict— stream was modified concurrently; retry withexecute_and_enqueue_with_retry.EngineError::Workflow— the command was rejected by the workflow.EngineError::Store/EngineError::Outbox— storage failure.
Sourcepub async fn execute_and_enqueue_snapshot<Snap>(
&self,
command: W::Command,
snap_store: &Snap,
) -> Result<Vec<EventEnvelope>, EngineError>
pub async fn execute_and_enqueue_snapshot<Snap>( &self, command: W::Command, snap_store: &Snap, ) -> Result<Vec<EventEnvelope>, EngineError>
Like execute_and_enqueue but uses a snapshot to accelerate replay.
Atomically persists events and outbox entries while starting state reconstruction from the most recent snapshot. For long streams with periodic snapshots this reduces replay cost from O(n) to O(k).
§Errors
Returns EngineError on storage or command handling failure.
Sourcepub async fn execute_timeout(
&self,
deadline: &Deadline,
) -> Result<Option<Vec<EventEnvelope>>, EngineError>where
S: AtomicAppend,
pub async fn execute_timeout(
&self,
deadline: &Deadline,
) -> Result<Option<Vec<EventEnvelope>>, EngineError>where
S: AtomicAppend,
Dispatch the compensation command returned by Workflow::on_deadline.
Reconstructs the current process state, calls
W::on_deadline(deadline, &state), and — if the hook returns
Some(command) — executes it via Process::execute_and_enqueue,
which atomically persists events and any outbox entries (e.g.
APERAK Ablehnung) produced by the compensation handler.
Returns Ok(Some(events)) when compensation fired, Ok(None) when
the hook returned None (deadline acknowledged as no-op).
This is the canonical way to wire deadline firings to workflow
compensation logic. Any WorkflowOutput::with_outbox entries
returned by on_deadline are guaranteed to be persisted atomically —
there is no window where the event is stored but the outbox entry is
lost.
§Example
// In the deadline worker:
let overdue = ctx.deadline_store().due_now(50).await?;
for deadline in overdue {
let identity = ctx.registry()
.lookup(deadline.tenant_id(), &RegistryKey::from_process(deadline.process_id()))
.await?
.expect("process must be registered");
let process = ctx.resume::<GpkeSupplierChangeWorkflow>(identity);
if let Some(events) = process.execute_timeout(&deadline).await? {
// compensation command was dispatched — APERAK Ablehnung enqueued
tracing::info!(events = events.len(), "timeout compensation applied");
}
ctx.deadline_store().cancel(deadline.deadline_id()).await?;
}§Errors
Propagates EngineError::VersionConflict, EngineError::Workflow,
and storage errors from execute_and_enqueue. Use
execute_timeout_with_retry when VersionConflict retries are
required.
Sourcepub async fn execute_timeout_with_retry(
&self,
deadline: &Deadline,
max_attempts: u32,
) -> Result<Option<Vec<EventEnvelope>>, EngineError>
pub async fn execute_timeout_with_retry( &self, deadline: &Deadline, max_attempts: u32, ) -> Result<Option<Vec<EventEnvelope>>, EngineError>
Like execute_timeout but retries on VersionConflict up to
max_attempts times.
Use this in production deadline workers where concurrent event appends are expected. Outbox entries (e.g. APERAK Ablehnung) produced by the compensation handler are persisted atomically on every attempt.
§Errors
Returns EngineError on storage or command handling failure.
§Panics
Panics if the deadline produces a command but the retry loop somehow exhausts without capturing an error (unreachable in practice).
Sourcepub async fn execute_and_enqueue_with_retry(
&self,
command: W::Command,
max_attempts: u32,
) -> Result<Vec<EventEnvelope>, EngineError>
pub async fn execute_and_enqueue_with_retry( &self, command: W::Command, max_attempts: u32, ) -> Result<Vec<EventEnvelope>, EngineError>
Like execute_and_enqueue but retries on crate::error::EngineError::VersionConflict up to
max_attempts times.
§Errors
Returns EngineError on storage or command handling failure.
§Panics
Panics if max_attempts is 0 and the guard is bypassed (unreachable).
Sourcepub async fn execute_and_enqueue_with_snapshot<Snap>(
&self,
command: W::Command,
snap_store: &Snap,
snapshot_interval: u64,
) -> Result<(Vec<EventEnvelope>, bool), EngineError>
pub async fn execute_and_enqueue_with_snapshot<Snap>( &self, command: W::Command, snap_store: &Snap, snapshot_interval: u64, ) -> Result<(Vec<EventEnvelope>, bool), EngineError>
Execute command atomically with outbox, then automatically snapshot
if the event-count threshold is reached.
Combines execute_and_enqueue with take_snapshot: after a
successful write, checks whether event_count % snapshot_interval == 0
and, if so, serialises and saves a snapshot via snap_store.
Pass snapshot_interval = 0 to disable auto-snapshotting; the call
then behaves identically to execute_and_enqueue.
Returns (events, snapshot_taken) where snapshot_taken is true when
a snapshot was written this call.
§Errors
EngineError::VersionConflict— stream was modified concurrently; retry withexecute_and_enqueue_with_retry.EngineError::Workflow— the command was rejected by the workflow.EngineError::Store/EngineError::Outbox— storage failure.EngineError::Serialization— state serialisation failed during snapshot.
Sourcepub async fn execute_and_enqueue_with_snapshot_and_retry<Snap>(
&self,
command: W::Command,
max_attempts: u32,
snap_store: &Snap,
snapshot_interval: u64,
) -> Result<(Vec<EventEnvelope>, bool), EngineError>
pub async fn execute_and_enqueue_with_snapshot_and_retry<Snap>( &self, command: W::Command, max_attempts: u32, snap_store: &Snap, snapshot_interval: u64, ) -> Result<(Vec<EventEnvelope>, bool), EngineError>
Like execute_and_enqueue_with_snapshot but retries on
crate::error::EngineError::VersionConflict up to max_attempts times.
§Errors
EngineError::VersionConflict— stream was modified concurrently; retry withexecute_and_enqueue_with_snapshot_and_retry.EngineError::Workflow— the command was rejected by the workflow.EngineError::Store/EngineError::Outbox— storage failure.EngineError::Serialization— state serialisation failed during snapshot.
§Panics
Panics if max_attempts is 0 and the loop guard is bypassed (unreachable).