pub struct Engine { /* private fields */ }Expand description
Process-wide long-running runtime. Cheap to clone() — an Arc
lives inside.
Implementations§
Source§impl Engine
impl Engine
Sourcepub fn new(cfg: EngineCfg) -> Self
pub fn new(cfg: EngineCfg) -> Self
Backwards-compatible constructor that starts the engine without a
layer registry, preserving the signature already used by ~88
existing call sites. Use this when automatic middleware wrapping
at bind time is not needed. Callers such as mlua-swarm-server go through
new_with_layers(cfg, registry) to enable the hint-resolution path.
Sourcepub fn new_with_layers(cfg: EngineCfg, layer_registry: LayerRegistry) -> Self
pub fn new_with_layers(cfg: EngineCfg, layer_registry: LayerRegistry) -> Self
Construct an Engine with an explicit LayerRegistry, enabling
hint-resolution: spawner_hints.layers declared on a Blueprint
are resolved against this registry when the spawner stack is bound
at service::linker::link time.
Sourcepub fn with_gate(self, gate: RoleVerbGate) -> Self
pub fn with_gate(self, gate: RoleVerbGate) -> Self
Rebuild this Engine with a different RoleVerbGate. The gate is
treated as fixed-at-build-time, so this constructs a fresh
EngineInner (fresh empty EngineState) rather than mutating in
place — mainly a testing convenience for swapping gate rules.
Sourcepub fn layer_registry(&self) -> &LayerRegistry
pub fn layer_registry(&self) -> &LayerRegistry
Expose the internal LayerRegistry — used when deriving a
sub-engine that needs the same registry re-injected. The
per-request sub-engine in mlua-swarm-server reads the parent engine’s
registry through this accessor and passes it to
Engine::new_with_layers(cfg, parent.layer_registry().clone()).
Sourcepub fn signer(&self) -> &TokenSigner
pub fn signer(&self) -> &TokenSigner
Access the TokenSigner used to mint/verify CapTokens.
Sourcepub fn event_tx(&self) -> Sender<Event>
pub fn event_tx(&self) -> Sender<Event>
Clone a handle to the process-wide Event broadcast sender. Prefer
subscribe for a ready-to-use receiver.
Sourcepub fn subscribe(&self) -> EventStream
pub fn subscribe(&self) -> EventStream
Subscribe to the engine’s Event broadcast stream.
Sourcepub async fn with_state<F, R>(
&self,
op: &'static str,
f: F,
) -> Result<R, EngineError>where
F: FnOnce(&mut EngineState) -> R,
pub async fn with_state<F, R>(
&self,
op: &'static str,
f: F,
) -> Result<R, EngineError>where
F: FnOnce(&mut EngineState) -> R,
The closure is a sync FnOnce — you cannot pass an async
closure, which enforces R3 at the type level. Exceeding max_hold
panics so that R4 violations surface immediately.
Sourcepub async fn verify_token(
&self,
token: &CapToken,
verb: Verb,
) -> Result<(), EngineError>
pub async fn verify_token( &self, token: &CapToken, verb: Verb, ) -> Result<(), EngineError>
Four steps: (1) signature verify, (2) expiry check, (3) role × verb
gate, (4) uses_left consume.
Sourcepub async fn verify_token_for_task(
&self,
token: &CapToken,
verb: Verb,
task_id: &TaskId,
) -> Result<(), EngineError>
pub async fn verify_token_for_task( &self, token: &CapToken, verb: Verb, task_id: &TaskId, ) -> Result<(), EngineError>
verify_token plus the task-ownership gate.
When a Worker-role token calls a state-touch verb (fetch_prompt /
post_result / read_task_state / cancel_task / poll_task),
the gate checks that CapTokenRecord.task_id matches the argument
task_id; a mismatch returns EngineError::TokenTaskMismatch.
Operator / Senior / Observer tokens are outside the ownership gate
and may touch any task.
Verbs exempt from the gate. start_task and dispatch_attempt
stay outside so recursive swarming keeps working; depth is capped
by max_spawn_depth.
Sourcepub async fn task_id_from_token(
&self,
token: &CapToken,
) -> Result<TaskId, EngineError>
pub async fn task_id_from_token( &self, token: &CapToken, ) -> Result<TaskId, EngineError>
Resolve the bound task_id from a Worker-role token. Used on the
simple /v1/worker/submit endpoint, where the worker POSTs with a
token but no task_id. Returns Err if the token role is not
Worker, or if no bound task is set.
Sourcepub async fn task_id_from_handle(
&self,
handle: &str,
) -> Result<TaskId, EngineError>
pub async fn task_id_from_handle( &self, handle: &str, ) -> Result<TaskId, EngineError>
Resolve a short worker handle (wh-XXXXXXXX) to the bound
task_id. Used on /v1/worker/submit when the Bearer is a short
handle string rather than a full CapToken JSON. A missing entry
returns TokenNotFound, i.e. “the handle is not in the store”.
Sourcepub async fn submit_worker_result_trusted(
&self,
task_id: &TaskId,
attempt: u32,
value: Value,
ok: bool,
) -> Result<(), EngineError>
pub async fn submit_worker_result_trusted( &self, task_id: &TaskId, attempt: u32, value: Value, ok: bool, ) -> Result<(), EngineError>
Submit a worker result via a short handle. Skips token verification
and updates output_tail Final + task.last_result directly in
a thin path. The caller is expected to have already resolved
task_id via task_id_from_handle — the handle’s presence in
worker_handles means it was minted server-side and is therefore
trusted.
Sourcepub async fn attach(
&self,
operator_id: impl Into<String>,
role: Role,
ttl: Duration,
) -> Result<CapToken, EngineError>
pub async fn attach( &self, operator_id: impl Into<String>, role: Role, ttl: Duration, ) -> Result<CapToken, EngineError>
Attach a new session with default OperatorInfo (Automate, no
bridges/hooks). Shorthand for attach_with(.., OperatorInfo::default()).
Sourcepub async fn register_senior_bridge(
&self,
id: impl Into<String>,
bridge: Arc<dyn SeniorBridge>,
)
pub async fn register_senior_bridge( &self, id: impl Into<String>, bridge: Arc<dyn SeniorBridge>, )
Register a SeniorBridge under a name. An existing entry with the
same name is overwritten. On the persisted-session reattach path,
the caller re-registers under the same ID beforehand and the
bridge becomes effective again.
Sourcepub async fn register_spawn_hook(
&self,
id: impl Into<String>,
hook: Arc<dyn SpawnHook>,
)
pub async fn register_spawn_hook( &self, id: impl Into<String>, hook: Arc<dyn SpawnHook>, )
Register a SpawnHook under a name. An existing entry with the
same name is overwritten.
Sourcepub async fn register_operator(
&self,
id: impl Into<String>,
operator: Arc<dyn Operator>,
)
pub async fn register_operator( &self, id: impl Into<String>, operator: Arc<dyn Operator>, )
Register an Operator (a spawn-body backend) under a name. An
existing entry with the same name is overwritten.
OperatorDelegateMiddleware looks this up via ctx and, when
kind = MainAi / Composite, bypasses inner.spawn and calls
operator.execute instead.
Sourcepub async fn unregister_senior_bridge(&self, id: &str)
pub async fn unregister_senior_bridge(&self, id: &str)
Unregister a SeniorBridge by name (e.g. on WebSocket disconnect
or explicit teardown). A missing ID is a no-op.
Sourcepub async fn unregister_spawn_hook(&self, id: &str)
pub async fn unregister_spawn_hook(&self, id: &str)
Unregister a SpawnHook by name. A missing ID is a no-op.
Sourcepub async fn unregister_operator(&self, id: &str)
pub async fn unregister_operator(&self, id: &str)
Unregister an Operator backend by name. A missing ID is a no-op.
Sourcepub async fn list_spawn_hook_ids(&self) -> Vec<String>
pub async fn list_spawn_hook_ids(&self) -> Vec<String>
Snapshot the list of registered SpawnHook IDs (for test
observation and debugging).
Sourcepub async fn list_senior_bridge_ids(&self) -> Vec<String>
pub async fn list_senior_bridge_ids(&self) -> Vec<String>
Snapshot the list of registered SeniorBridge IDs.
Sourcepub async fn list_operator_ids(&self) -> Vec<String>
pub async fn list_operator_ids(&self) -> Vec<String>
Snapshot the list of registered Operator IDs.
Sourcepub async fn attach_with_ids(
&self,
operator_id: impl Into<String>,
role: Role,
ttl: Duration,
kind: Option<OperatorKind>,
bridge_id: Option<String>,
hook_id: Option<String>,
operator_backend_id: Option<String>,
operator_kind_overrides: HashMap<String, OperatorKind>,
bp_agent_kinds: HashMap<String, OperatorKind>,
bp_global_kind: Option<OperatorKind>,
) -> Result<CapToken, EngineError>
pub async fn attach_with_ids( &self, operator_id: impl Into<String>, role: Role, ttl: Duration, kind: Option<OperatorKind>, bridge_id: Option<String>, hook_id: Option<String>, operator_backend_id: Option<String>, operator_kind_overrides: HashMap<String, OperatorKind>, bp_agent_kinds: HashMap<String, OperatorKind>, bp_global_kind: Option<OperatorKind>, ) -> Result<CapToken, EngineError>
Attach specifying IDs directly. The caller is expected to have
pre-registered them via register_senior_bridge /
register_spawn_hook / register_operator. This is the canonical
path when persistence is in play.
kind is the “Runtime Global” tier of the OperatorKind cascade
(stored verbatim on OperatorSession.operator_kind): Some(_) is
an explicit request (including Some(OperatorKind::Automate)) that
outranks the BP-level tiers; None leaves it unspecified so the
BP-level tiers / final default decide. See
crate::core::ctx::collapse_operator_kind.
Sourcepub async fn attach_with(
&self,
operator_id: impl Into<String>,
role: Role,
ttl: Duration,
operator_info: OperatorInfo,
) -> Result<CapToken, EngineError>
pub async fn attach_with( &self, operator_id: impl Into<String>, role: Role, ttl: Duration, operator_info: OperatorInfo, ) -> Result<CapToken, EngineError>
Convenience attach that takes an OperatorInfo (three
Arc<dyn ...> fields plus kind) inline.
§Pipeline
Each Arc<dyn ...> is auto-registered on the engine’s registry
under a synthetic ID (br-<hex> / hk-<hex> / op-<hex>), and
the session stores that synthetic ID. Subsequent dispatch_attempt
calls rebuild the Arcs from those IDs via
resolve_operator_info, and the three middlewares fire as usual.
§⚠ Non-persisted sessions only
Because this API takes inline Arcs, the reattach path after
session persistence cannot rebuild them — the synthetic IDs are
not present in a freshly started process’s registry. If you need
persistence, use Self::attach_with_ids with register_* calls
beforehand to go through named IDs instead.
Handy for tests and short-lived in-process sessions. Production
WebSocket callbacks and the like should prefer attach_with_ids
as the canonical path.
Sourcepub async fn detach(&self, token: &CapToken) -> Result<(), EngineError>
pub async fn detach(&self, token: &CapToken) -> Result<(), EngineError>
Mark the session bound to token as detached (attached = false).
Tasks are left in place — a later attach/attach_with_ids call
carrying the same registered bridge/hook IDs can pick them back up.
Sourcepub async fn heartbeat(&self, token: &CapToken) -> Result<(), EngineError>
pub async fn heartbeat(&self, token: &CapToken) -> Result<(), EngineError>
Refresh the session’s last_seen timestamp and mark it attached.
Called periodically by an attached client to avoid being flipped to
detached by start_detach_loop.
Sourcepub async fn start_task(
&self,
token: &CapToken,
spec: TaskSpec,
) -> Result<TaskId, EngineError>
pub async fn start_task( &self, token: &CapToken, spec: TaskSpec, ) -> Result<TaskId, EngineError>
Create a new TaskState from spec and register its initial
prompt. When the calling token is a Worker (i.e. this is a
recursive spawn), the new task inherits parent.spawn_depth + 1
and is rejected with SpawnDepthExceeded once max_spawn_depth is
hit; an Operator-issued call starts at depth 0.
Sourcepub async fn read_task_state(
&self,
token: &CapToken,
task_id: &TaskId,
) -> Result<TaskState, EngineError>
pub async fn read_task_state( &self, token: &CapToken, task_id: &TaskId, ) -> Result<TaskState, EngineError>
Fetch a snapshot of TaskState for task_id, subject to the
task-ownership gate (see verify_token_for_task).
Sourcepub async fn cancel_task(
&self,
token: &CapToken,
task_id: &TaskId,
) -> Result<(), EngineError>
pub async fn cancel_task( &self, token: &CapToken, task_id: &TaskId, ) -> Result<(), EngineError>
Mark task_id as Cancelled and wake any caller blocked in
poll_task for it.
Sourcepub async fn dispatch_attempt_with(
&self,
token: &CapToken,
task_id: &TaskId,
spawner: &Arc<dyn SpawnerAdapter>,
) -> Result<DispatchOutcome, EngineError>
pub async fn dispatch_attempt_with( &self, token: &CapToken, task_id: &TaskId, spawner: &Arc<dyn SpawnerAdapter>, ) -> Result<DispatchOutcome, EngineError>
Dispatch a single attempt through the given spawner.
The lock is only held for snapshot capture; the actual spawn and completion await happen outside the lock (R3 discipline).
Sits on the Domain side of the Data / Domain split. The dispatch
path itself does not touch big response bodies — those flow through
the Data plane (output_store module + sink / input_inject
SpawnerLayers) around this method.
The caller does the compile plus service::linker::link and
carries the same stack through each dispatch. Because the spawner
is passed per-request rather than looked up from engine-global
state, parallel requests against a single Engine instance
(different Blueprints, different spawners) do not race.
Sourcepub async fn fetch_prompt(
&self,
token: &CapToken,
task_id: &TaskId,
) -> Result<String, EngineError>
pub async fn fetch_prompt( &self, token: &CapToken, task_id: &TaskId, ) -> Result<String, EngineError>
Fetch the directive/prompt string for task_id’s current attempt.
Falls back to initial_directive when no prompt has been recorded
yet for that attempt.
Sourcepub async fn fetch_worker_payload(
&self,
token: &CapToken,
task_id: &TaskId,
) -> Result<WorkerPayload, EngineError>
pub async fn fetch_worker_payload( &self, token: &CapToken, task_id: &TaskId, ) -> Result<WorkerPayload, EngineError>
Combined fetch for HTTP /v1/worker/prompt: returns prompt +
(optional) system + agent + attempt in a single round trip.
The verb gate reuses FetchPrompt — same semantics as “the worker
pulls its task input”.
system is the value written by OperatorSpawner::spawn through
bake_worker_system_prompt when it ran; otherwise None (no
profile present, or the bake never happened).
Sourcepub async fn fetch_worker_payload_trusted(
&self,
task_id: &TaskId,
) -> Result<WorkerPayload, EngineError>
pub async fn fetch_worker_payload_trusted( &self, task_id: &TaskId, ) -> Result<WorkerPayload, EngineError>
Fetch a worker payload via a short handle. Skips token verification
and returns prompt + system + agent + attempt in a thin
path. The caller is expected to have already resolved task_id
via task_id_from_handle — the handle’s presence in
worker_handles means it was minted server-side and is therefore
trusted.
Sourcepub async fn task_attempt(&self, task_id: &TaskId) -> Result<u32, EngineError>
pub async fn task_attempt(&self, task_id: &TaskId) -> Result<u32, EngineError>
Read the current attempt number for a task (server-side lookup, no
token verification). Used on HTTP /v1/worker/result when the
worker omits attempt and the server has to fill it in.
Sourcepub async fn bake_worker_system_prompt(
&self,
task_id: &TaskId,
attempt: u32,
system: Option<String>,
) -> Result<(), EngineError>
pub async fn bake_worker_system_prompt( &self, task_id: &TaskId, attempt: u32, system: Option<String>, ) -> Result<(), EngineError>
Server-side admin API that lets OperatorSpawner::spawn bake the
rendered system_prompt into engine state. There is no verb gate
— the only expected caller is inside the spawner. SubAgents fetch
this alongside the prompt on the /v1/worker/prompt path.
Sourcepub async fn fetch_data(
&self,
token: &CapToken,
key: &str,
) -> Result<Value, EngineError>
pub async fn fetch_data( &self, token: &CapToken, key: &str, ) -> Result<Value, EngineError>
Fetch an arbitrary named resource previously stored via
set_resource. Not task-scoped — any valid token with the
FetchData verb may read any key.
Sourcepub async fn submit_output(
&self,
token: &CapToken,
task_id: &TaskId,
attempt: u32,
event: OutputEvent,
) -> Result<(), EngineError>
pub async fn submit_output( &self, token: &CapToken, task_id: &TaskId, attempt: u32, event: OutputEvent, ) -> Result<(), EngineError>
Send one output event from inside a SpawnerAdapter or worker.
Structuring is assumed to be complete by the time we cross the
SpawnerAdapter boundary; this API just appends to the
OutputStore, pushes to the EventLog, and (for Final) emits
the TaskAttemptCompleted event.
This is Domain-side plumbing: it feeds the engine’s verdict flow,
not the Data-plane store in the output_store module. It also
does not wake the dispatch path — that is done through the
spawner’s completion oneshot when the worker terminates.
Sourcepub async fn output_tail(
&self,
task_id: &TaskId,
attempt: u32,
) -> Vec<OutputEvent>
pub async fn output_tail( &self, task_id: &TaskId, attempt: u32, ) -> Vec<OutputEvent>
Snapshot the entire output tail for a given (task_id, attempt).
Used by the dispatch path when pulling Final, and by observers
reading the trace.
Sourcepub async fn post_result(
&self,
token: &CapToken,
task_id: &TaskId,
result: Value,
) -> Result<(), EngineError>
pub async fn post_result( &self, token: &CapToken, task_id: &TaskId, result: Value, ) -> Result<(), EngineError>
Record an interim last_result for task_id without changing its
status. Distinct from the terminal Final output event handled
through submit_output / dispatch_attempt_with.
Sourcepub async fn set_resource(
&self,
key: impl Into<String>,
value: Value,
) -> Result<(), EngineError>
pub async fn set_resource( &self, key: impl Into<String>, value: Value, ) -> Result<(), EngineError>
Store a named resource value, retrievable later via fetch_data.
No token is required — this is a server-side/admin-style setter
(mirrors bake_worker_system_prompt).
Sourcepub async fn query_senior(
&self,
token: &CapToken,
task_id: &TaskId,
question: Value,
) -> Result<ResumeKey, EngineError>
pub async fn query_senior( &self, token: &CapToken, task_id: &TaskId, question: Value, ) -> Result<ResumeKey, EngineError>
Ask a question of the Senior, mark the task Suspended, and
return a ResumeKey. The suspended state persists until another
task calls resume(key, answer).
Resume-side waiting is Notify-based, so a caller (typically
MainAI) can detach, reattach from a different process, and still
pull the answer out via await_resume(key, timeout) — the answer
is stored inside EngineState.
Sourcepub async fn resume(
&self,
key: ResumeKey,
answer: Value,
) -> Result<(), EngineError>
pub async fn resume( &self, key: ResumeKey, answer: Value, ) -> Result<(), EngineError>
Store the answer for a ResumeKey in EngineState and wake the
waiting caller via Notify. Also flips the suspended task’s
status back to Running and fires the per-task notifier.
Sourcepub async fn await_resume(
&self,
key: ResumeKey,
timeout: Duration,
) -> Result<Value, EngineError>
pub async fn await_resume( &self, key: ResumeKey, timeout: Duration, ) -> Result<Value, EngineError>
Wait for the resume answer. Even if the caller (an Operator) detached and reattached, the answer is available immediately here — if it was already stored, this returns without waiting on the notifier.
timeout = Duration::ZERO performs an instant check without
waiting.
Sourcepub async fn poll_task(
&self,
token: &CapToken,
task_id: &TaskId,
hold: Duration,
) -> Result<TaskState, EngineError>
pub async fn poll_task( &self, token: &CapToken, task_id: &TaskId, hold: Duration, ) -> Result<TaskState, EngineError>
Wait until the task’s status transitions to terminal or
Suspended, then return the latest TaskState. Returns
immediately if the task is already in a terminal state.
Exceeding the timeout returns EngineError::PollTimeout.
A hold of Duration::from_secs(0) returns a snapshot immediately
(no wait). Larger holds — tens of minutes up to days — are fine;
the wait state is kept in memory inside the engine and does not
degrade.
Sourcepub fn start_detach_loop(&self) -> JoinHandle<()>
pub fn start_detach_loop(&self) -> JoinHandle<()>
Background loop that scans sessions every heartbeat_interval and
flips attached = false on any session whose last_seen exceeds
heartbeat_miss_threshold * interval.
The tasks themselves are kept (assuming
keepalive_on_idle = true), so another client can reattach with
the same token and resume immediately. Dropping the returned
JoinHandle does not stop the loop — the handle exists so callers
who want to abort can hold onto it.
Trait Implementations§
Auto Trait Implementations§
impl !RefUnwindSafe for Engine
impl !UnwindSafe for Engine
impl Freeze for Engine
impl Send for Engine
impl Sync for Engine
impl Unpin for Engine
impl UnsafeUnpin for Engine
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
impl<ST, DT> CastableFrom<ST, Initialized, Initialized> for DT
impl<ST, DT> CastableFrom<ST, Uninit, Uninit> for DT
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more