Skip to main content

Engine

Struct Engine 

Source
pub struct Engine { /* private fields */ }
Expand description

Process-wide long-running runtime. Cheap to clone() — an Arc lives inside.

Implementations§

Source§

impl Engine

Source

pub fn new(cfg: EngineCfg) -> Self

Backwards-compatible constructor that starts the engine without a layer registry, preserving the signature already used by ~88 existing call sites. Use this when automatic middleware wrapping at bind time is not needed. Callers such as mlua-swarm-server go through new_with_layers(cfg, registry) to enable the hint-resolution path.

Source

pub fn new_with_layers(cfg: EngineCfg, layer_registry: LayerRegistry) -> Self

Construct an Engine with an explicit LayerRegistry, enabling hint-resolution: spawner_hints.layers declared on a Blueprint are resolved against this registry when the spawner stack is bound at service::linker::link time.

Source

pub fn with_gate(self, gate: RoleVerbGate) -> Self

Rebuild this Engine with a different RoleVerbGate. The gate is treated as fixed-at-build-time, so this constructs a fresh EngineInner (fresh empty EngineState) rather than mutating in place — mainly a testing convenience for swapping gate rules.

Source

pub fn cfg(&self) -> &EngineCfg

Access the EngineCfg this engine was built with.

Source

pub fn layer_registry(&self) -> &LayerRegistry

Expose the internal LayerRegistry — used when deriving a sub-engine that needs the same registry re-injected. The per-request sub-engine in mlua-swarm-server reads the parent engine’s registry through this accessor and passes it to Engine::new_with_layers(cfg, parent.layer_registry().clone()).

Source

pub fn signer(&self) -> &TokenSigner

Access the TokenSigner used to mint/verify CapTokens.

Source

pub fn event_tx(&self) -> Sender<Event>

Clone a handle to the process-wide Event broadcast sender. Prefer subscribe for a ready-to-use receiver.

Source

pub fn subscribe(&self) -> EventStream

Subscribe to the engine’s Event broadcast stream.

Source

pub async fn with_state<F, R>( &self, op: &'static str, f: F, ) -> Result<R, EngineError>
where F: FnOnce(&mut EngineState) -> R,

The closure is a sync FnOnce — you cannot pass an async closure, which enforces R3 at the type level. Exceeding max_hold panics so that R4 violations surface immediately.

Source

pub async fn verify_token( &self, token: &CapToken, verb: Verb, ) -> Result<(), EngineError>

Four steps: (1) signature verify, (2) expiry check, (3) role × verb gate, (4) uses_left consume.

Source

pub async fn verify_token_for_task( &self, token: &CapToken, verb: Verb, task_id: &TaskId, ) -> Result<(), EngineError>

verify_token plus the task-ownership gate.

When a Worker-role token calls a state-touch verb (fetch_prompt / post_result / read_task_state / cancel_task / poll_task), the gate checks that CapTokenRecord.task_id matches the argument task_id; a mismatch returns EngineError::TokenTaskMismatch. Operator / Senior / Observer tokens are outside the ownership gate and may touch any task.

Verbs exempt from the gate. start_task and dispatch_attempt stay outside so recursive swarming keeps working; depth is capped by max_spawn_depth.

Source

pub async fn task_id_from_token( &self, token: &CapToken, ) -> Result<TaskId, EngineError>

Resolve the bound task_id from a Worker-role token. Used on the simple /v1/worker/submit endpoint, where the worker POSTs with a token but no task_id. Returns Err if the token role is not Worker, or if no bound task is set.

Source

pub async fn task_id_from_handle( &self, handle: &str, ) -> Result<TaskId, EngineError>

Resolve a short worker handle (wh-XXXXXXXX) to the bound task_id. Used on /v1/worker/submit when the Bearer is a short handle string rather than a full CapToken JSON. A missing entry returns TokenNotFound, i.e. “the handle is not in the store”.

Source

pub async fn submit_worker_result_trusted( &self, task_id: &TaskId, attempt: u32, value: Value, ok: bool, ) -> Result<(), EngineError>

Submit a worker result via a short handle. Skips token verification and updates output_tail Final + task.last_result directly in a thin path. The caller is expected to have already resolved task_id via task_id_from_handle — the handle’s presence in worker_handles means it was minted server-side and is therefore trusted.

Source

pub async fn attach( &self, operator_id: impl Into<String>, role: Role, ttl: Duration, ) -> Result<CapToken, EngineError>

Attach a new session with default OperatorInfo (Automate, no bridges/hooks). Shorthand for attach_with(.., OperatorInfo::default()).

Source

pub async fn register_senior_bridge( &self, id: impl Into<String>, bridge: Arc<dyn SeniorBridge>, )

Register a SeniorBridge under a name. An existing entry with the same name is overwritten. On the persisted-session reattach path, the caller re-registers under the same ID beforehand and the bridge becomes effective again.

Source

pub async fn register_spawn_hook( &self, id: impl Into<String>, hook: Arc<dyn SpawnHook>, )

Register a SpawnHook under a name. An existing entry with the same name is overwritten.

Source

pub async fn register_operator( &self, id: impl Into<String>, operator: Arc<dyn Operator>, )

Register an Operator (a spawn-body backend) under a name. An existing entry with the same name is overwritten. OperatorDelegateMiddleware looks this up via ctx and, when kind = MainAi / Composite, bypasses inner.spawn and calls operator.execute instead.

Source

pub async fn unregister_senior_bridge(&self, id: &str)

Unregister a SeniorBridge by name (e.g. on WebSocket disconnect or explicit teardown). A missing ID is a no-op.

Source

pub async fn unregister_spawn_hook(&self, id: &str)

Unregister a SpawnHook by name. A missing ID is a no-op.

Source

pub async fn unregister_operator(&self, id: &str)

Unregister an Operator backend by name. A missing ID is a no-op.

Source

pub async fn list_spawn_hook_ids(&self) -> Vec<String>

Snapshot the list of registered SpawnHook IDs (for test observation and debugging).

Source

pub async fn list_senior_bridge_ids(&self) -> Vec<String>

Snapshot the list of registered SeniorBridge IDs.

Source

pub async fn list_operator_ids(&self) -> Vec<String>

Snapshot the list of registered Operator IDs.

Source

pub async fn attach_with_ids( &self, operator_id: impl Into<String>, role: Role, ttl: Duration, kind: Option<OperatorKind>, bridge_id: Option<String>, hook_id: Option<String>, operator_backend_id: Option<String>, operator_kind_overrides: HashMap<String, OperatorKind>, bp_agent_kinds: HashMap<String, OperatorKind>, bp_global_kind: Option<OperatorKind>, ) -> Result<CapToken, EngineError>

Attach specifying IDs directly. The caller is expected to have pre-registered them via register_senior_bridge / register_spawn_hook / register_operator. This is the canonical path when persistence is in play.

kind is the “Runtime Global” tier of the OperatorKind cascade (stored verbatim on OperatorSession.operator_kind): Some(_) is an explicit request (including Some(OperatorKind::Automate)) that outranks the BP-level tiers; None leaves it unspecified so the BP-level tiers / final default decide. See crate::core::ctx::collapse_operator_kind.

Source

pub async fn attach_with( &self, operator_id: impl Into<String>, role: Role, ttl: Duration, operator_info: OperatorInfo, ) -> Result<CapToken, EngineError>

Convenience attach that takes an OperatorInfo (three Arc<dyn ...> fields plus kind) inline.

§Pipeline

Each Arc<dyn ...> is auto-registered on the engine’s registry under a synthetic ID (br-<hex> / hk-<hex> / op-<hex>), and the session stores that synthetic ID. Subsequent dispatch_attempt calls rebuild the Arcs from those IDs via resolve_operator_info, and the three middlewares fire as usual.

§⚠ Non-persisted sessions only

Because this API takes inline Arcs, the reattach path after session persistence cannot rebuild them — the synthetic IDs are not present in a freshly started process’s registry. If you need persistence, use Self::attach_with_ids with register_* calls beforehand to go through named IDs instead.

Handy for tests and short-lived in-process sessions. Production WebSocket callbacks and the like should prefer attach_with_ids as the canonical path.

Source

pub async fn detach(&self, token: &CapToken) -> Result<(), EngineError>

Mark the session bound to token as detached (attached = false). Tasks are left in place — a later attach/attach_with_ids call carrying the same registered bridge/hook IDs can pick them back up.

Source

pub async fn heartbeat(&self, token: &CapToken) -> Result<(), EngineError>

Refresh the session’s last_seen timestamp and mark it attached. Called periodically by an attached client to avoid being flipped to detached by start_detach_loop.

Source

pub async fn start_task( &self, token: &CapToken, spec: TaskSpec, ) -> Result<TaskId, EngineError>

Create a new TaskState from spec and register its initial prompt. When the calling token is a Worker (i.e. this is a recursive spawn), the new task inherits parent.spawn_depth + 1 and is rejected with SpawnDepthExceeded once max_spawn_depth is hit; an Operator-issued call starts at depth 0.

Source

pub async fn read_task_state( &self, token: &CapToken, task_id: &TaskId, ) -> Result<TaskState, EngineError>

Fetch a snapshot of TaskState for task_id, subject to the task-ownership gate (see verify_token_for_task).

Source

pub async fn cancel_task( &self, token: &CapToken, task_id: &TaskId, ) -> Result<(), EngineError>

Mark task_id as Cancelled and wake any caller blocked in poll_task for it.

Source

pub async fn dispatch_attempt_with( &self, token: &CapToken, task_id: &TaskId, spawner: &Arc<dyn SpawnerAdapter>, ) -> Result<DispatchOutcome, EngineError>

Dispatch a single attempt through the given spawner.

The lock is only held for snapshot capture; the actual spawn and completion await happen outside the lock (R3 discipline).

Sits on the Domain side of the Data / Domain split. The dispatch path itself does not touch big response bodies — those flow through the Data plane (output_store module + sink / input_inject SpawnerLayers) around this method.

The caller does the compile plus service::linker::link and carries the same stack through each dispatch. Because the spawner is passed per-request rather than looked up from engine-global state, parallel requests against a single Engine instance (different Blueprints, different spawners) do not race.

Source

pub async fn fetch_prompt( &self, token: &CapToken, task_id: &TaskId, ) -> Result<String, EngineError>

Fetch the directive/prompt string for task_id’s current attempt. Falls back to initial_directive when no prompt has been recorded yet for that attempt.

Source

pub async fn fetch_worker_payload( &self, token: &CapToken, task_id: &TaskId, ) -> Result<WorkerPayload, EngineError>

Combined fetch for HTTP /v1/worker/prompt: returns prompt + (optional) system + agent + attempt in a single round trip. The verb gate reuses FetchPrompt — same semantics as “the worker pulls its task input”.

system is the value written by OperatorSpawner::spawn through bake_worker_system_prompt when it ran; otherwise None (no profile present, or the bake never happened).

Source

pub async fn fetch_worker_payload_trusted( &self, task_id: &TaskId, ) -> Result<WorkerPayload, EngineError>

Fetch a worker payload via a short handle. Skips token verification and returns prompt + system + agent + attempt in a thin path. The caller is expected to have already resolved task_id via task_id_from_handle — the handle’s presence in worker_handles means it was minted server-side and is therefore trusted.

Source

pub async fn task_attempt(&self, task_id: &TaskId) -> Result<u32, EngineError>

Read the current attempt number for a task (server-side lookup, no token verification). Used on HTTP /v1/worker/result when the worker omits attempt and the server has to fill it in.

Source

pub async fn bake_worker_system_prompt( &self, task_id: &TaskId, attempt: u32, system: Option<String>, ) -> Result<(), EngineError>

Server-side admin API that lets OperatorSpawner::spawn bake the rendered system_prompt into engine state. There is no verb gate — the only expected caller is inside the spawner. SubAgents fetch this alongside the prompt on the /v1/worker/prompt path.

Source

pub async fn fetch_data( &self, token: &CapToken, key: &str, ) -> Result<Value, EngineError>

Fetch an arbitrary named resource previously stored via set_resource. Not task-scoped — any valid token with the FetchData verb may read any key.

Source

pub async fn submit_output( &self, token: &CapToken, task_id: &TaskId, attempt: u32, event: OutputEvent, ) -> Result<(), EngineError>

Send one output event from inside a SpawnerAdapter or worker. Structuring is assumed to be complete by the time we cross the SpawnerAdapter boundary; this API just appends to the OutputStore, pushes to the EventLog, and (for Final) emits the TaskAttemptCompleted event.

This is Domain-side plumbing: it feeds the engine’s verdict flow, not the Data-plane store in the output_store module. It also does not wake the dispatch path — that is done through the spawner’s completion oneshot when the worker terminates.

Source

pub async fn output_tail( &self, task_id: &TaskId, attempt: u32, ) -> Vec<OutputEvent>

Snapshot the entire output tail for a given (task_id, attempt). Used by the dispatch path when pulling Final, and by observers reading the trace.

Source

pub async fn post_result( &self, token: &CapToken, task_id: &TaskId, result: Value, ) -> Result<(), EngineError>

Record an interim last_result for task_id without changing its status. Distinct from the terminal Final output event handled through submit_output / dispatch_attempt_with.

Source

pub async fn set_resource( &self, key: impl Into<String>, value: Value, ) -> Result<(), EngineError>

Store a named resource value, retrievable later via fetch_data. No token is required — this is a server-side/admin-style setter (mirrors bake_worker_system_prompt).

Source

pub async fn query_senior( &self, token: &CapToken, task_id: &TaskId, question: Value, ) -> Result<ResumeKey, EngineError>

Ask a question of the Senior, mark the task Suspended, and return a ResumeKey. The suspended state persists until another task calls resume(key, answer).

Resume-side waiting is Notify-based, so a caller (typically MainAI) can detach, reattach from a different process, and still pull the answer out via await_resume(key, timeout) — the answer is stored inside EngineState.

Source

pub async fn resume( &self, key: ResumeKey, answer: Value, ) -> Result<(), EngineError>

Store the answer for a ResumeKey in EngineState and wake the waiting caller via Notify. Also flips the suspended task’s status back to Running and fires the per-task notifier.

Source

pub async fn await_resume( &self, key: ResumeKey, timeout: Duration, ) -> Result<Value, EngineError>

Wait for the resume answer. Even if the caller (an Operator) detached and reattached, the answer is available immediately here — if it was already stored, this returns without waiting on the notifier.

timeout = Duration::ZERO performs an instant check without waiting.

Source

pub async fn poll_task( &self, token: &CapToken, task_id: &TaskId, hold: Duration, ) -> Result<TaskState, EngineError>

Wait until the task’s status transitions to terminal or Suspended, then return the latest TaskState. Returns immediately if the task is already in a terminal state. Exceeding the timeout returns EngineError::PollTimeout.

A hold of Duration::from_secs(0) returns a snapshot immediately (no wait). Larger holds — tens of minutes up to days — are fine; the wait state is kept in memory inside the engine and does not degrade.

Source

pub fn start_detach_loop(&self) -> JoinHandle<()>

Background loop that scans sessions every heartbeat_interval and flips attached = false on any session whose last_seen exceeds heartbeat_miss_threshold * interval.

The tasks themselves are kept (assuming keepalive_on_idle = true), so another client can reattach with the same token and resume immediately. Dropping the returned JoinHandle does not stop the loop — the handle exists so callers who want to abort can hold onto it.

Trait Implementations§

Source§

impl Clone for Engine

Source§

fn clone(&self) -> Engine

Returns a duplicate of the value. Read more
1.0.0 (const: unstable) · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<ST, DT> CastableFrom<ST, Initialized, Initialized> for DT
where ST: ?Sized, DT: ?Sized,

Source§

impl<ST, DT> CastableFrom<ST, Uninit, Uninit> for DT
where ST: ?Sized, DT: ?Sized,

Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> DynClone for T
where T: Clone,

Source§

fn __clone_box(&self, _: Private) -> *mut ()

Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> MaybeSend for T

Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Sized + Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Sized + Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Read<Exclusive, BecauseExclusive> for T
where T: ?Sized,

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more