pub struct EngineContext<ES, SS = NoopSnapshotStore, OS = NoopOutboxStore, DS = NoopDeadlineStore, PR = NoopProcessRegistry> {
pub dead_letter_sink: Arc<dyn DeadLetterSink>,
/* private fields */
}Expand description
Assembled engine infrastructure returned by EngineBuilder::build.
EngineContext bundles all stores and the process registry into a single
value. It is the root dependency for:
- Spawning new processes (
spawn) - Resuming existing processes (
resume) - Running outbox delivery workers (
outbox_store.pending_now(…)) - Driving the deadline scheduler (
deadline_store.due_now(…))
§Generic parameters
| Param | Role | Default |
|---|---|---|
ES | EventStore backend | — (required) |
SS | SnapshotStore backend | NoopSnapshotStore |
OS | OutboxStore backend | NoopOutboxStore |
DS | DeadlineStore backend | NoopDeadlineStore |
PR | ProcessRegistry backend | NoopProcessRegistry |
In most codebases all type parameters are inferred from the builder calls.
Fields§
§dead_letter_sink: Arc<dyn DeadLetterSink>Dead-letter sink for unroutable or unprocessable inbound messages.
Stored as Arc<dyn DeadLetterSink> so callers can share it across
tasks without an extra type parameter on EngineContext.
Implementations§
Source§impl<ES, SS, OS, DS, PR> EngineContext<ES, SS, OS, DS, PR>where
ES: EventStore,
impl<ES, SS, OS, DS, PR> EngineContext<ES, SS, OS, DS, PR>where
ES: EventStore,
Sourcepub fn spawn<W: Workflow>(
&self,
tenant_id: TenantId,
workflow_id: WorkflowId,
) -> Process<W, Arc<ES>>
pub fn spawn<W: Workflow>( &self, tenant_id: TenantId, workflow_id: WorkflowId, ) -> Process<W, Arc<ES>>
Spawn a new process and return a typed Process<W, Arc<ES>> handle.
No ES: Clone bound is required — the engine stores the event store
behind an Arc so spawning is always a cheap pointer clone.
let p = ctx.spawn::<SupplierChangeWorkflow>(tenant_id, workflow_id);
p.execute(ReceiveUtilmd { .. }).await?;Sourcepub fn resume<W: Workflow>(
&self,
identity: ProcessIdentity,
) -> Process<W, Arc<ES>>
pub fn resume<W: Workflow>( &self, identity: ProcessIdentity, ) -> Process<W, Arc<ES>>
Resume an existing process from a ProcessIdentity.
let identity = ctx.registry()
.lookup(tenant_id, &conv_id.to_string())
.await?
.ok_or(EngineError::Registry("unknown conversation".into()))?;
let p = ctx.resume::<SupplierChangeWorkflow>(identity);
p.execute(HandleAperak { .. }).await?;Sourcepub fn registered_modules(&self) -> &[&'static str]
pub fn registered_modules(&self) -> &[&'static str]
Names of all domain modules registered with the builder, in registration order.
Sourcepub fn registered_workflows(&self) -> &[&'static str]
pub fn registered_workflows(&self) -> &[&'static str]
Workflow names declared by all registered modules, in registration order.
Use this in the deadline scheduler dispatch function to detect unknown workflow names at startup. If a deadline fires for a workflow name that is not in this list, the scheduler’s dispatch function should emit an error rather than silently dropping the deadline:
let known = ctx.registered_workflows().iter().copied().collect::<HashSet<_>>();
let scheduler = ctx.run_deadline_scheduler(
move |deadline| {
let wf = deadline.workflow_id().name.as_ref();
if !known.contains(wf) {
tracing::error!(workflow = %wf, "deadline fired for unregistered workflow");
return Box::pin(async { Ok(()) });
}
// dispatch by workflow name …
Box::pin(async { Ok(()) })
},
100,
Duration::from_secs(30),
);Sourcepub fn event_store(&self) -> &Arc<ES>
pub fn event_store(&self) -> &Arc<ES>
The event store backend (behind an Arc).
Sourcepub fn snapshot_store(&self) -> &SS
pub fn snapshot_store(&self) -> &SS
The snapshot store backend.
Sourcepub fn outbox_store(&self) -> &OS
pub fn outbox_store(&self) -> &OS
The outbox store backend.
Poll outbox_store().pending_now(limit) in a background task to drain
the delivery queue.
Sourcepub fn deadline_store(&self) -> &DS
pub fn deadline_store(&self) -> &DS
The deadline store backend.
Poll deadline_store().due_now(limit) in a background scheduler to
fire overdue process timers.
Sourcepub fn registry(&self) -> &PR
pub fn registry(&self) -> &PR
The process routing registry.
Register a ProcessIdentity under a (tenant_id, key) pair at
process creation, then lookup it when routing inbound messages.
Sourcepub fn dead_letter_sink(&self) -> &Arc<dyn DeadLetterSink>
pub fn dead_letter_sink(&self) -> &Arc<dyn DeadLetterSink>
The dead-letter sink for unroutable or unprocessable messages.
Call DeadLetterSink::reject when an inbound message cannot be
dispatched to any workflow. The default sink emits tracing::warn!
so rejections are always visible in the log output.
Sourcepub fn assert_production_stores(&self)
pub fn assert_production_stores(&self)
Assert that no Noop store is active — call this during production startup.
Checks the type names of OS, DS, and PR against the string "Noop".
Panics with a human-readable message if any match, directing the operator
to configure a persistent backend.
§When to call
Call this early in makod’s startup path (and --check mode) to catch
deployments where a Noop store was accidentally wired — e.g. the
[outbox], [deadline], or [registry] configuration section was
omitted from makod.toml. The check is defence-in-depth: in release
builds without the testing feature, Noop stores cannot implement the
required traits at all and the compiler would have already rejected them.
§Panics
Panics when any of OS, DS, or PR is a Noop implementation.
Sourcepub fn pid_router(&self) -> &PidRouter
pub fn pid_router(&self) -> &PidRouter
The PID-to-workflow routing table.
Populated once during EngineBuilder::build by calling
EngineModule::register_pids on every registered module in
registration order. After build returns the table is sealed —
it is read-only for the lifetime of the EngineContext and may be
freely shared across async tasks without synchronisation.
§Mutability contract
There is intentionally no pid_router_mut() accessor. Adding PIDs
after the engine is built would create a TOCTOU race between the
dispatch path (which calls route(pid)) and any hypothetical
concurrent mutator. Instead, register all PIDs during the build phase
via EngineModule::register_pids.
If a new process family needs to be added without restarting the
binary, rebuild and restart makod — hot-swap of PID routing is not
supported.
§Example — dispatch at the AS4 reception boundary
let workflow_name = ctx.pid_router().route(pid)
.ok_or_else(|| EngineError::Workflow(WorkflowError::InvalidCommand(
format!("no workflow registered for PID {pid}").into()
)))?;
match workflow_name {
"gpke-supplier-change" => dispatch::<GpkeSupplierChangeWorkflow>(&ctx, pid, payload).await,
"wim-device-change" => dispatch::<WimDeviceChangeWorkflow>(&ctx, pid, payload).await,
other => Err(EngineError::Workflow(WorkflowError::InvalidCommand(
format!("unhandled workflow name: {other}").into()
))),
}Source§impl<ES, SS, OS, DS, PR> EngineContext<ES, SS, OS, DS, PR>
impl<ES, SS, OS, DS, PR> EngineContext<ES, SS, OS, DS, PR>
Sourcepub fn run_outbox_worker<S: As4Sender>(
&self,
sender: S,
batch_size: usize,
poll_interval: Duration,
max_attempts: u32,
) -> OutboxWorker<OS, S>
pub fn run_outbox_worker<S: As4Sender>( &self, sender: S, batch_size: usize, poll_interval: Duration, max_attempts: u32, ) -> OutboxWorker<OS, S>
Construct an OutboxWorker that drains the outbox via sender.
batch_size — messages fetched per poll cycle.
poll_interval — sleep duration when the batch is empty.
max_attempts — maximum total delivery attempts before dead-lettering.
Pass 48 for a ~4-hour retry budget at the 300 s backoff cap, or
u32::MAX to disable the cap (not recommended for production).
use std::time::Duration;
let worker = ctx.run_outbox_worker(my_sender, 50, Duration::from_secs(1), 48);
tokio::spawn(async move { worker.run().await });Source§impl<ES, SS, OS, DS, PR> EngineContext<ES, SS, OS, DS, PR>
impl<ES, SS, OS, DS, PR> EngineContext<ES, SS, OS, DS, PR>
Sourcepub fn run_deadline_scheduler<F, Fut>(
&self,
dispatch: F,
batch_size: usize,
poll_interval: Duration,
) -> DeadlineScheduler<DS>
pub fn run_deadline_scheduler<F, Fut>( &self, dispatch: F, batch_size: usize, poll_interval: Duration, ) -> DeadlineScheduler<DS>
Construct a DeadlineScheduler that polls the deadline store and
dispatches fired deadlines via dispatch.
The dispatch function is called for every fired deadline. It should
resume the owning process and execute the appropriate timeout command.
batch_size — deadlines fetched per poll cycle.
poll_interval — sleep duration when no deadlines are due.
use std::time::Duration;
let scheduler = ctx.run_deadline_scheduler(
|d| async move {
tracing::info!(label = %d.label(), "firing deadline");
Ok(())
},
100,
Duration::from_secs(30),
);
tokio::spawn(async move { scheduler.run().await });