Skip to main content

EngineContext

Struct EngineContext 

Source
pub struct EngineContext<ES, SS = NoopSnapshotStore, OS = NoopOutboxStore, DS = NoopDeadlineStore, PR = NoopProcessRegistry> {
    pub dead_letter_sink: Arc<dyn DeadLetterSink>,
    /* private fields */
}
Expand description

Assembled engine infrastructure returned by EngineBuilder::build.

EngineContext bundles all stores and the process registry into a single value. It is the root dependency for:

  • Spawning new processes (spawn)
  • Resuming existing processes (resume)
  • Running outbox delivery workers (outbox_store.pending_now(…))
  • Driving the deadline scheduler (deadline_store.due_now(…))

§Generic parameters

In most codebases all type parameters are inferred from the builder calls.

Fields§

§dead_letter_sink: Arc<dyn DeadLetterSink>

Dead-letter sink for unroutable or unprocessable inbound messages.

Stored as Arc<dyn DeadLetterSink> so callers can share it across tasks without an extra type parameter on EngineContext.

Implementations§

Source§

impl<ES, SS, OS, DS, PR> EngineContext<ES, SS, OS, DS, PR>
where ES: EventStore,

Source

pub fn spawn<W: Workflow>( &self, tenant_id: TenantId, workflow_id: WorkflowId, ) -> Process<W, Arc<ES>>

Spawn a new process and return a typed Process<W, Arc<ES>> handle.

No ES: Clone bound is required — the engine stores the event store behind an Arc so spawning is always a cheap pointer clone.

let p = ctx.spawn::<SupplierChangeWorkflow>(tenant_id, workflow_id);
p.execute(ReceiveUtilmd { .. }).await?;
Source

pub fn resume<W: Workflow>( &self, identity: ProcessIdentity, ) -> Process<W, Arc<ES>>

Resume an existing process from a ProcessIdentity.

let identity = ctx.registry()
    .lookup(tenant_id, &conv_id.to_string())
    .await?
    .ok_or(EngineError::Registry("unknown conversation".into()))?;
let p = ctx.resume::<SupplierChangeWorkflow>(identity);
p.execute(HandleAperak { .. }).await?;
Source

pub fn registered_modules(&self) -> &[&'static str]

Names of all domain modules registered with the builder, in registration order.

Source

pub fn registered_workflows(&self) -> &[&'static str]

Workflow names declared by all registered modules, in registration order.

Use this in the deadline scheduler dispatch function to detect unknown workflow names at startup. If a deadline fires for a workflow name that is not in this list, the scheduler’s dispatch function should emit an error rather than silently dropping the deadline:

let known = ctx.registered_workflows().iter().copied().collect::<HashSet<_>>();
let scheduler = ctx.run_deadline_scheduler(
    move |deadline| {
        let wf = deadline.workflow_id().name.as_ref();
        if !known.contains(wf) {
            tracing::error!(workflow = %wf, "deadline fired for unregistered workflow");
            return Box::pin(async { Ok(()) });
        }
        // dispatch by workflow name …
        Box::pin(async { Ok(()) })
    },
    100,
    Duration::from_secs(30),
);
Source

pub fn event_store(&self) -> &Arc<ES>

The event store backend (behind an Arc).

Source

pub fn snapshot_store(&self) -> &SS

The snapshot store backend.

Source

pub fn outbox_store(&self) -> &OS

The outbox store backend.

Poll outbox_store().pending_now(limit) in a background task to drain the delivery queue.

Source

pub fn deadline_store(&self) -> &DS

The deadline store backend.

Poll deadline_store().due_now(limit) in a background scheduler to fire overdue process timers.

Source

pub fn registry(&self) -> &PR

The process routing registry.

Register a ProcessIdentity under a (tenant_id, key) pair at process creation, then lookup it when routing inbound messages.

Source

pub fn dead_letter_sink(&self) -> &Arc<dyn DeadLetterSink>

The dead-letter sink for unroutable or unprocessable messages.

Call DeadLetterSink::reject when an inbound message cannot be dispatched to any workflow. The default sink emits tracing::warn! so rejections are always visible in the log output.

Source

pub fn assert_production_stores(&self)

Assert that no Noop store is active — call this during production startup.

Checks the type names of OS, DS, and PR against the string "Noop". Panics with a human-readable message if any match, directing the operator to configure a persistent backend.

§When to call

Call this early in makod’s startup path (and --check mode) to catch deployments where a Noop store was accidentally wired — e.g. the [outbox], [deadline], or [registry] configuration section was omitted from makod.toml. The check is defence-in-depth: in release builds without the testing feature, Noop stores cannot implement the required traits at all and the compiler would have already rejected them.

§Panics

Panics when any of OS, DS, or PR is a Noop implementation.

Source

pub fn pid_router(&self) -> &PidRouter

The PID-to-workflow routing table.

Populated once during EngineBuilder::build by calling EngineModule::register_pids on every registered module in registration order. After build returns the table is sealed — it is read-only for the lifetime of the EngineContext and may be freely shared across async tasks without synchronisation.

§Mutability contract

There is intentionally no pid_router_mut() accessor. Adding PIDs after the engine is built would create a TOCTOU race between the dispatch path (which calls route(pid)) and any hypothetical concurrent mutator. Instead, register all PIDs during the build phase via EngineModule::register_pids.

If a new process family needs to be added without restarting the binary, rebuild and restart makod — hot-swap of PID routing is not supported.

§Example — dispatch at the AS4 reception boundary
let workflow_name = ctx.pid_router().route(pid)
    .ok_or_else(|| EngineError::Workflow(WorkflowError::InvalidCommand(
        format!("no workflow registered for PID {pid}").into()
    )))?;

match workflow_name {
    "gpke-supplier-change" => dispatch::<GpkeSupplierChangeWorkflow>(&ctx, pid, payload).await,
    "wim-device-change"    => dispatch::<WimDeviceChangeWorkflow>(&ctx, pid, payload).await,
    other => Err(EngineError::Workflow(WorkflowError::InvalidCommand(
        format!("unhandled workflow name: {other}").into()
    ))),
}
Source§

impl<ES, SS, OS, DS, PR> EngineContext<ES, SS, OS, DS, PR>
where ES: EventStore, OS: OutboxStore + Clone,

Source

pub fn run_outbox_worker<S: As4Sender>( &self, sender: S, batch_size: usize, poll_interval: Duration, max_attempts: u32, ) -> OutboxWorker<OS, S>

Construct an OutboxWorker that drains the outbox via sender.

batch_size — messages fetched per poll cycle. poll_interval — sleep duration when the batch is empty.

max_attempts — maximum total delivery attempts before dead-lettering. Pass 48 for a ~4-hour retry budget at the 300 s backoff cap, or u32::MAX to disable the cap (not recommended for production).

use std::time::Duration;

let worker = ctx.run_outbox_worker(my_sender, 50, Duration::from_secs(1), 48);
tokio::spawn(async move { worker.run().await });
Source§

impl<ES, SS, OS, DS, PR> EngineContext<ES, SS, OS, DS, PR>
where ES: EventStore, DS: DeadlineStore + Clone,

Source

pub fn run_deadline_scheduler<F, Fut>( &self, dispatch: F, batch_size: usize, poll_interval: Duration, ) -> DeadlineScheduler<DS>
where F: Fn(Deadline) -> Fut + Send + Sync + 'static, Fut: Future<Output = Result<(), EngineError>> + Send + 'static,

Construct a DeadlineScheduler that polls the deadline store and dispatches fired deadlines via dispatch.

The dispatch function is called for every fired deadline. It should resume the owning process and execute the appropriate timeout command.

batch_size — deadlines fetched per poll cycle. poll_interval — sleep duration when no deadlines are due.

use std::time::Duration;

let scheduler = ctx.run_deadline_scheduler(
    |d| async move {
        tracing::info!(label = %d.label(), "firing deadline");
        Ok(())
    },
    100,
    Duration::from_secs(30),
);
tokio::spawn(async move { scheduler.run().await });

Trait Implementations§

Source§

impl<ES, SS, OS, DS, PR> Debug for EngineContext<ES, SS, OS, DS, PR>
where ES: Debug, SS: Debug, OS: Debug, DS: Debug, PR: Debug,

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<ES, SS = NoopSnapshotStore, OS = NoopOutboxStore, DS = NoopDeadlineStore, PR = NoopProcessRegistry> !RefUnwindSafe for EngineContext<ES, SS, OS, DS, PR>

§

impl<ES, SS = NoopSnapshotStore, OS = NoopOutboxStore, DS = NoopDeadlineStore, PR = NoopProcessRegistry> !UnwindSafe for EngineContext<ES, SS, OS, DS, PR>

§

impl<ES, SS, OS, DS, PR> Freeze for EngineContext<ES, SS, OS, DS, PR>
where SS: Freeze, OS: Freeze, DS: Freeze, PR: Freeze,

§

impl<ES, SS, OS, DS, PR> Send for EngineContext<ES, SS, OS, DS, PR>
where SS: Send, OS: Send, DS: Send, PR: Send, ES: Sync + Send,

§

impl<ES, SS, OS, DS, PR> Sync for EngineContext<ES, SS, OS, DS, PR>
where SS: Sync, OS: Sync, DS: Sync, PR: Sync, ES: Sync + Send,

§

impl<ES, SS, OS, DS, PR> Unpin for EngineContext<ES, SS, OS, DS, PR>
where SS: Unpin, OS: Unpin, DS: Unpin, PR: Unpin,

§

impl<ES, SS, OS, DS, PR> UnsafeUnpin for EngineContext<ES, SS, OS, DS, PR>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more