mlua_swarm/core/engine.rs
1//! `Engine` — the long-running stateful runtime plus the `with_state`
2//! helper (R1-R4 discipline).
3//!
4//! The engine owns the Domain side of the Data / Domain split:
5//! flow control (dispatch / verdict), state (`EngineState`), and the
6//! `submit_output` / `output_tail` surface that feeds it. Data-plane
7//! traffic (Big Response bodies) is delegated to the `output_store` module
8//! plus its paired `SpawnerLayer`s and passes through here without the
9//! engine core needing to grow.
10
11use crate::core::config::EngineCfg;
12use crate::core::ctx::{Ctx, OperatorInfo, OperatorKind, SeniorBridge, SpawnHook};
13use crate::core::errors::EngineError;
14use crate::core::state::{
15 CapTokenRecord, DispatchOutcome, EngineState, Event, EventStream, OperatorSession, ResumeKey,
16 ResumePending, TaskSpec, TaskState, TaskStatus,
17};
18use crate::types::{
19 default_role_verb_table, now_unix, CapToken, Role, RoleVerbGate, SessionId, TaskId,
20 TokenSigner, Verb,
21};
22use crate::worker::adapter::SpawnerAdapter;
23use serde_json::Value;
24use std::collections::HashMap;
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use tokio::sync::{broadcast, Mutex};
28
29/// Process-wide long-running runtime. Cheap to `clone()` — an `Arc`
30/// lives inside.
31#[derive(Clone)]
32pub struct Engine {
33 inner: Arc<EngineInner>,
34}
35
36struct EngineInner {
37 state: Mutex<EngineState>,
38 cfg: EngineCfg,
39 signer: TokenSigner,
40 gate: RoleVerbGate,
41 event_tx: broadcast::Sender<Event>,
42 /// ID-keyed bridge registry (register-by-ID design). `SeniorBridge`
43 /// and `SpawnHook` are registered by ID; sessions bind to those IDs
44 /// only. Persistence stores just the ID, and on reattach the caller
45 /// re-registers under the same ID to restore presence.
46 senior_bridges: tokio::sync::RwLock<HashMap<String, Arc<dyn SeniorBridge>>>,
47 spawn_hooks: tokio::sync::RwLock<HashMap<String, Arc<dyn SpawnHook>>>,
48 /// ID registry for full-spawn Operator backends (backends that take the
49 /// entire spawn via `execute`). Sibling to `senior_bridges` /
50 /// `spawn_hooks`. `OperatorDelegateMiddleware` looks these up via
51 /// `ctx` and, when `kind = MainAi` / `Composite`, bypasses
52 /// `inner.spawn` and calls `operator.execute` instead.
53 operators: tokio::sync::RwLock<HashMap<String, Arc<dyn crate::operator::Operator>>>,
54 /// Base and hint layer factories for the `SpawnerStack`. At
55 /// `service::linker::link` time, `compiled.router` is wrapped with
56 /// the base factories plus the hint factories resolved from
57 /// `blueprint.spawner_hints.layers`. This is the engine-side
58 /// counterpart to the discipline "Flow / Blueprint doesn't spell out
59 /// middleware implementations — it declares the capabilities it needs
60 /// as hint keys".
61 layer_registry: crate::middleware::LayerRegistry,
62}
63
64impl Engine {
65 /// Backwards-compatible constructor that starts the engine without a
66 /// layer registry, preserving the signature already used by ~88
67 /// existing call sites. Use this when automatic middleware wrapping
68 /// at bind time is not needed. Callers such as `mlua-swarm-server` go through
69 /// `new_with_layers(cfg, registry)` to enable the hint-resolution path.
70 pub fn new(cfg: EngineCfg) -> Self {
71 Self::new_with_layers(cfg, crate::middleware::LayerRegistry::new())
72 }
73
74 /// Construct an `Engine` with an explicit `LayerRegistry`, enabling
75 /// hint-resolution: `spawner_hints.layers` declared on a `Blueprint`
76 /// are resolved against this registry when the spawner stack is bound
77 /// at `service::linker::link` time.
78 pub fn new_with_layers(
79 cfg: EngineCfg,
80 layer_registry: crate::middleware::LayerRegistry,
81 ) -> Self {
82 let (event_tx, _) = broadcast::channel(256);
83 let signer = TokenSigner::new(&cfg.token_secret);
84 Self {
85 inner: Arc::new(EngineInner {
86 state: Mutex::new(EngineState::new()),
87 cfg,
88 signer,
89 gate: default_role_verb_table(),
90 event_tx,
91 senior_bridges: tokio::sync::RwLock::new(HashMap::new()),
92 spawn_hooks: tokio::sync::RwLock::new(HashMap::new()),
93 operators: tokio::sync::RwLock::new(HashMap::new()),
94 layer_registry,
95 }),
96 }
97 }
98
99 /// Rebuild this `Engine` with a different `RoleVerbGate`. The gate is
100 /// treated as fixed-at-build-time, so this constructs a fresh
101 /// `EngineInner` (fresh empty `EngineState`) rather than mutating in
102 /// place — mainly a testing convenience for swapping gate rules.
103 pub fn with_gate(self, gate: RoleVerbGate) -> Self {
104 // The gate is fixed at build time — the intent is to build a fresh
105 // instance rather than mutating in place. As a testing convenience we
106 // do allow swapping the inner Arc. Simpler form: just rebuild
107 // Arc<EngineInner>.
108 let inner = Arc::new(EngineInner {
109 state: Mutex::new(EngineState::new()),
110 cfg: self.inner.cfg.clone(),
111 signer: self.inner.signer.clone(),
112 gate,
113 event_tx: self.inner.event_tx.clone(),
114 senior_bridges: tokio::sync::RwLock::new(HashMap::new()),
115 spawn_hooks: tokio::sync::RwLock::new(HashMap::new()),
116 operators: tokio::sync::RwLock::new(HashMap::new()),
117 layer_registry: self.inner.layer_registry.clone(),
118 });
119 Self { inner }
120 }
121
122 // ═══════════════════════════════════════════════════════════════════════
123 // Accessors. Production code drives execution through compile +
124 // `service::linker::link` + `dispatch_attempt_with(spawner)` inside
125 // `TaskLaunchService`; `Engine` itself is a pure execution surface — it
126 // does not own a BlueprintStore / EnhanceAdapter / Compiler, nor a
127 // global spawner (the spawner is carried per-request, never stashed on
128 // the engine).
129 // ═══════════════════════════════════════════════════════════════════════
130
131 /// Access the `EngineCfg` this engine was built with.
132 pub fn cfg(&self) -> &EngineCfg {
133 &self.inner.cfg
134 }
135
136 /// Expose the internal `LayerRegistry` — used when deriving a
137 /// sub-engine that needs the same registry re-injected. The
138 /// per-request sub-engine in `mlua-swarm-server` reads the parent engine's
139 /// registry through this accessor and passes it to
140 /// `Engine::new_with_layers(cfg, parent.layer_registry().clone())`.
141 pub fn layer_registry(&self) -> &crate::middleware::LayerRegistry {
142 &self.inner.layer_registry
143 }
144
145 /// Access the `TokenSigner` used to mint/verify `CapToken`s.
146 pub fn signer(&self) -> &TokenSigner {
147 &self.inner.signer
148 }
149
150 /// Clone a handle to the process-wide `Event` broadcast sender. Prefer
151 /// `subscribe` for a ready-to-use receiver.
152 pub fn event_tx(&self) -> broadcast::Sender<Event> {
153 self.inner.event_tx.clone()
154 }
155
156 /// Subscribe to the engine's `Event` broadcast stream.
157 pub fn subscribe(&self) -> EventStream {
158 self.inner.event_tx.subscribe()
159 }
160
161 // ═══════════════════════════════════════════════════════════════════════
162 // §7 with_state — single Mutex + R1-R4 (try_lock + bounded retry + max-hold panic)
163 // ═══════════════════════════════════════════════════════════════════════
164
165 /// The closure is a **sync** `FnOnce` — you cannot pass an async
166 /// closure, which enforces R3 at the type level. Exceeding `max_hold`
167 /// panics so that R4 violations surface immediately.
168 pub async fn with_state<F, R>(&self, op: &'static str, f: F) -> Result<R, EngineError>
169 where
170 F: FnOnce(&mut EngineState) -> R,
171 {
172 let cfg = &self.inner.cfg;
173
174 // R2: try_lock + bounded retry
175 let mut guard_opt = None;
176 for attempt in 0..=cfg.max_retry {
177 match self.inner.state.try_lock() {
178 Ok(g) => {
179 guard_opt = Some(g);
180 break;
181 }
182 Err(_) if cfg.try_only => return Err(EngineError::LockBusy(op)),
183 Err(_) => {
184 let backoff = cfg.backoff_ms_step * (attempt as u64 + 1);
185 tokio::time::sleep(Duration::from_millis(backoff)).await;
186 }
187 }
188 }
189 let mut guard = guard_opt.ok_or(EngineError::LockBusyAfterRetry(op))?;
190
191 // R4: max_hold guard
192 let start = Instant::now();
193 let result = f(&mut guard);
194 let elapsed_ms = start.elapsed().as_millis();
195 drop(guard);
196
197 if elapsed_ms > cfg.max_hold_ms {
198 panic!(
199 "Engine.with_state('{op}') held {elapsed_ms}ms > max {}ms — suspected R3 violation (long op inside lock)",
200 cfg.max_hold_ms
201 );
202 }
203 Ok(result)
204 }
205
206 // ═══════════════════════════════════════════════════════════════════════
207 // Token verify (= sig + expire + gate + uses_left)
208 // ═══════════════════════════════════════════════════════════════════════
209
210 /// Four steps: (1) signature verify, (2) expiry check, (3) role × verb
211 /// gate, (4) `uses_left` consume.
212 pub async fn verify_token(&self, token: &CapToken, verb: Verb) -> Result<(), EngineError> {
213 // (1) sig
214 if !self.inner.signer.verify_sig(token) {
215 return Err(EngineError::BadSignature);
216 }
217 // (2) expire
218 if token.is_expired(now_unix()) {
219 return Err(EngineError::TokenExpired);
220 }
221 // (3) role × verb gate
222 if !self.inner.gate.is_allowed(token.role, verb) {
223 return Err(EngineError::RoleViolation {
224 role: token.role,
225 verb,
226 });
227 }
228 // (4) server-side uses_left consume
229 self.with_state("token.consume", |s| {
230 let rec = s
231 .tokens
232 .get_mut(token.id())
233 .ok_or_else(|| EngineError::TokenNotFound(token.id().to_string()))?;
234 rec.consume()
235 .map_err(|_: crate::core::state::CapTokenConsumeError| {
236 EngineError::TokenUsesExhausted
237 })?;
238 Ok::<(), EngineError>(())
239 })
240 .await??;
241 Ok(())
242 }
243
244 /// `verify_token` plus the **task-ownership gate**.
245 ///
246 /// When a Worker-role token calls a state-touch verb (`fetch_prompt` /
247 /// `post_result` / `read_task_state` / `cancel_task` / `poll_task`),
248 /// the gate checks that `CapTokenRecord.task_id` matches the argument
249 /// `task_id`; a mismatch returns `EngineError::TokenTaskMismatch`.
250 /// Operator / Senior / Observer tokens are outside the ownership gate
251 /// and may touch any task.
252 ///
253 /// **Verbs exempt from the gate.** `start_task` and `dispatch_attempt`
254 /// stay outside so recursive swarming keeps working; depth is capped
255 /// by `max_spawn_depth`.
256 pub async fn verify_token_for_task(
257 &self,
258 token: &CapToken,
259 verb: Verb,
260 task_id: &TaskId,
261 ) -> Result<(), EngineError> {
262 self.verify_token(token, verb).await?;
263 if token.role != Role::Worker {
264 return Ok(());
265 }
266 let nonce = token.nonce.clone();
267 let arg_tid = task_id.clone();
268 self.with_state("token.ownership_gate", move |s| {
269 let bound = s
270 .tokens
271 .get(&nonce)
272 .and_then(|r| r.task_id.as_ref())
273 .cloned();
274 match bound {
275 Some(t) if t == arg_tid => Ok(()),
276 Some(t) => Err(EngineError::TokenTaskMismatch {
277 bound: t.0,
278 arg: arg_tid.0,
279 }),
280 None => Err(EngineError::TokenNotFound(nonce.clone())),
281 }
282 })
283 .await??;
284 Ok(())
285 }
286
287 /// Resolve the bound `task_id` from a Worker-role token. Used on the
288 /// simple `/v1/worker/submit` endpoint, where the worker POSTs with a
289 /// token but no `task_id`. Returns `Err` if the token role is not
290 /// Worker, or if no bound task is set.
291 pub async fn task_id_from_token(&self, token: &CapToken) -> Result<TaskId, EngineError> {
292 if token.role != Role::Worker {
293 return Err(EngineError::RoleViolation {
294 role: token.role,
295 verb: Verb::PostResult,
296 });
297 }
298 let nonce = token.nonce.clone();
299 self.with_state("task_id_from_token", move |s| {
300 s.tokens
301 .get(&nonce)
302 .and_then(|r| r.task_id.as_ref())
303 .cloned()
304 .ok_or_else(|| EngineError::TokenNotFound(nonce.clone()))
305 })
306 .await?
307 }
308
309 /// Resolve a short worker handle (`wh-XXXXXXXX`) to the bound
310 /// `task_id`. Used on `/v1/worker/submit` when the Bearer is a short
311 /// handle string rather than a full `CapToken` JSON. A missing entry
312 /// returns `TokenNotFound`, i.e. "the handle is not in the store".
313 pub async fn task_id_from_handle(&self, handle: &str) -> Result<TaskId, EngineError> {
314 let h = handle.to_string();
315 self.with_state("task_id_from_handle", move |s| {
316 let nonce = s
317 .worker_handles
318 .get(&h)
319 .cloned()
320 .ok_or_else(|| EngineError::TokenNotFound(format!("handle={h}")))?;
321 s.tokens
322 .get(&nonce)
323 .and_then(|r| r.task_id.as_ref())
324 .cloned()
325 .ok_or_else(|| EngineError::TokenNotFound(format!("nonce={nonce}")))
326 })
327 .await?
328 }
329
330 /// Submit a worker result via a short handle. Skips token verification
331 /// and updates `output_tail` `Final` + `task.last_result` directly in
332 /// a thin path. The caller is expected to have already resolved
333 /// `task_id` via `task_id_from_handle` — the handle's presence in
334 /// `worker_handles` means it was minted server-side and is therefore
335 /// trusted.
336 pub async fn submit_worker_result_trusted(
337 &self,
338 task_id: &TaskId,
339 attempt: u32,
340 value: Value,
341 ok: bool,
342 ) -> Result<(), EngineError> {
343 let task_id_for_apply = task_id.clone();
344 let value_for_event = value.clone();
345 self.with_state("submit_worker_result_trusted.output", move |s| {
346 let ev = crate::worker::output::OutputEvent::Final {
347 content: crate::worker::output::ContentRef::Inline {
348 value: value_for_event,
349 },
350 ok,
351 };
352 s.output_store
353 .entry((task_id_for_apply.clone(), attempt))
354 .or_default()
355 .push(ev.clone());
356 s.push_event(crate::core::state::Event::WorkerOutput {
357 task_id: task_id_for_apply,
358 attempt,
359 event: ev,
360 });
361 })
362 .await?;
363 let task_id_for_result = task_id.clone();
364 let value_for_result = value.clone();
365 self.with_state("submit_worker_result_trusted.last_result", move |s| {
366 if let Some(t) = s.tasks.get_mut(&task_id_for_result) {
367 t.last_result = Some(value_for_result);
368 t.updated_at = now_unix();
369 }
370 })
371 .await?;
372 Ok(())
373 }
374
375 /// Mint a short handle and register it in the `worker_handles` map.
376 /// Called immediately after the worker-token mint inside
377 /// `dispatch_attempt_with`, and issues a handle bound to the same
378 /// nonce. Format is `wh-<8 hex chars>` (11 chars total), designed to
379 /// remove the base64 copy-paste failure mode.
380 async fn mint_worker_handle(&self, worker_nonce: String) -> Result<String, EngineError> {
381 // The handle is a sole bearer secret on the `/v1/worker/submit`
382 // short-handle path (`submit_worker_result_trusted` skips token
383 // verification), so it must be unguessable — OS RNG, not the
384 // predictable uid counter. 8 hex chars (~4B entropy) keeps the
385 // documented `wh-<8 hex>` wire shape; collision between live
386 // handles is negligible at in-process handle counts.
387 let short = crate::types::secure_hex(4);
388 let handle = format!("wh-{short}");
389 let h = handle.clone();
390 let n = worker_nonce.clone();
391 self.with_state("mint_worker_handle", move |s| {
392 s.worker_handles.insert(h, n);
393 })
394 .await?;
395 Ok(handle)
396 }
397
398 // ═══════════════════════════════════════════════════════════════════════
399 // Session API
400 // ═══════════════════════════════════════════════════════════════════════
401
402 /// Attach a new session with default `OperatorInfo` (`Automate`, no
403 /// bridges/hooks). Shorthand for `attach_with(.., OperatorInfo::default())`.
404 pub async fn attach(
405 &self,
406 operator_id: impl Into<String>,
407 role: Role,
408 ttl: Duration,
409 ) -> Result<CapToken, EngineError> {
410 self.attach_with(
411 operator_id,
412 role,
413 ttl,
414 crate::core::ctx::OperatorInfo::default(),
415 )
416 .await
417 }
418
419 // ═══════════════════════════════════════════════════════════════════════
420 // BridgeRegistry API.
421 // ═══════════════════════════════════════════════════════════════════════
422
423 /// Register a `SeniorBridge` under a name. An existing entry with the
424 /// same name is overwritten. On the persisted-session reattach path,
425 /// the caller re-registers under the same ID beforehand and the
426 /// bridge becomes effective again.
427 pub async fn register_senior_bridge(
428 &self,
429 id: impl Into<String>,
430 bridge: Arc<dyn SeniorBridge>,
431 ) {
432 self.inner
433 .senior_bridges
434 .write()
435 .await
436 .insert(id.into(), bridge);
437 }
438
439 /// Register a `SpawnHook` under a name. An existing entry with the
440 /// same name is overwritten.
441 pub async fn register_spawn_hook(&self, id: impl Into<String>, hook: Arc<dyn SpawnHook>) {
442 self.inner.spawn_hooks.write().await.insert(id.into(), hook);
443 }
444
445 /// Register an `Operator` (a spawn-body backend) under a name. An
446 /// existing entry with the same name is overwritten.
447 /// `OperatorDelegateMiddleware` looks this up via `ctx` and, when
448 /// `kind = MainAi` / `Composite`, bypasses `inner.spawn` and calls
449 /// `operator.execute` instead.
450 pub async fn register_operator(
451 &self,
452 id: impl Into<String>,
453 operator: Arc<dyn crate::operator::Operator>,
454 ) {
455 self.inner
456 .operators
457 .write()
458 .await
459 .insert(id.into(), operator);
460 }
461
462 /// Unregister a `SeniorBridge` by name (e.g. on WebSocket disconnect
463 /// or explicit teardown). A missing ID is a no-op.
464 pub async fn unregister_senior_bridge(&self, id: &str) {
465 self.inner.senior_bridges.write().await.remove(id);
466 }
467
468 /// Unregister a `SpawnHook` by name. A missing ID is a no-op.
469 pub async fn unregister_spawn_hook(&self, id: &str) {
470 self.inner.spawn_hooks.write().await.remove(id);
471 }
472
473 /// Unregister an `Operator` backend by name. A missing ID is a no-op.
474 pub async fn unregister_operator(&self, id: &str) {
475 self.inner.operators.write().await.remove(id);
476 }
477
478 /// Snapshot the list of registered `SpawnHook` IDs (for test
479 /// observation and debugging).
480 pub async fn list_spawn_hook_ids(&self) -> Vec<String> {
481 self.inner
482 .spawn_hooks
483 .read()
484 .await
485 .keys()
486 .cloned()
487 .collect()
488 }
489
490 /// Snapshot the list of registered `SeniorBridge` IDs.
491 pub async fn list_senior_bridge_ids(&self) -> Vec<String> {
492 self.inner
493 .senior_bridges
494 .read()
495 .await
496 .keys()
497 .cloned()
498 .collect()
499 }
500
501 /// Snapshot the list of registered `Operator` IDs.
502 pub async fn list_operator_ids(&self) -> Vec<String> {
503 self.inner.operators.read().await.keys().cloned().collect()
504 }
505
506 /// Attach specifying IDs directly. The caller is expected to have
507 /// pre-registered them via `register_senior_bridge` /
508 /// `register_spawn_hook` / `register_operator`. This is the canonical
509 /// path when persistence is in play.
510 ///
511 /// `kind` is the "Runtime Global" tier of the `OperatorKind` cascade
512 /// (stored verbatim on `OperatorSession.operator_kind`): `Some(_)` is
513 /// an explicit request (including `Some(OperatorKind::Automate)`) that
514 /// outranks the BP-level tiers; `None` leaves it unspecified so the
515 /// BP-level tiers / final default decide. See
516 /// `crate::core::ctx::collapse_operator_kind`.
517 #[allow(clippy::too_many_arguments)]
518 pub async fn attach_with_ids(
519 &self,
520 operator_id: impl Into<String>,
521 role: Role,
522 ttl: Duration,
523 kind: Option<OperatorKind>,
524 bridge_id: Option<String>,
525 hook_id: Option<String>,
526 operator_backend_id: Option<String>,
527 operator_kind_overrides: HashMap<String, OperatorKind>,
528 bp_agent_kinds: HashMap<String, OperatorKind>,
529 bp_global_kind: Option<OperatorKind>,
530 ) -> Result<CapToken, EngineError> {
531 let operator_id = operator_id.into();
532 let token = self
533 .inner
534 .signer
535 .session(operator_id.clone(), role, vec!["*".into()], ttl);
536 let session_id = SessionId::new();
537 let nonce = token.nonce.clone();
538 let now = now_unix();
539 let token_for_store = token.clone();
540
541 self.with_state("attach_with_ids", |s| {
542 s.tokens
543 .insert(nonce.clone(), CapTokenRecord::from_token(token_for_store));
544 s.sessions.insert(
545 session_id.clone(),
546 OperatorSession {
547 id: session_id.clone(),
548 operator_id: operator_id.clone(),
549 role,
550 attached_at: now,
551 last_seen: now,
552 attached: true,
553 owned_task_ids: Vec::new(),
554 token_nonce: nonce.clone(),
555 operator_kind: kind,
556 runtime_agent_kinds: operator_kind_overrides,
557 bp_agent_kinds,
558 bp_global_kind,
559 bridge_id,
560 hook_id,
561 operator_backend_id,
562 },
563 );
564 s.push_event(Event::SessionAttached {
565 session_id: session_id.clone(),
566 role,
567 });
568 })
569 .await?;
570
571 let _ = self
572 .inner
573 .event_tx
574 .send(Event::SessionAttached { session_id, role });
575 Ok(token)
576 }
577
578 /// Build an `OperatorInfo` by looking up the session's registered IDs
579 /// on the `BridgeRegistry`, plus resolving the 4-tier `OperatorKind`
580 /// cascade for `agent_name` via `crate::core::ctx::collapse_operator_kind`.
581 /// Used when `dispatch_attempt` injects `Ctx`. An unresolved ID
582 /// (nothing registered) is silently `None` — the bridge / hook simply
583 /// does not fire and the default behaviour applies.
584 async fn resolve_operator_info(
585 &self,
586 session: &OperatorSession,
587 agent_name: &str,
588 ) -> OperatorInfo {
589 let senior_bridge = if let Some(id) = &session.bridge_id {
590 self.inner.senior_bridges.read().await.get(id).cloned()
591 } else {
592 None
593 };
594 let spawn_hook = if let Some(id) = &session.hook_id {
595 self.inner.spawn_hooks.read().await.get(id).cloned()
596 } else {
597 None
598 };
599 let operator = if let Some(id) = &session.operator_backend_id {
600 self.inner.operators.read().await.get(id).cloned()
601 } else {
602 None
603 };
604 let runtime_agent = session.runtime_agent_kinds.get(agent_name).copied();
605 // "Runtime Global" tier: `Some(_)` is always an explicit request
606 // (see the field doc on `OperatorSession.operator_kind`).
607 let runtime_global = session.operator_kind;
608 let bp_agent = session.bp_agent_kinds.get(agent_name).copied();
609 let bp_global = session.bp_global_kind;
610 let kind = crate::core::ctx::collapse_operator_kind(
611 runtime_agent,
612 runtime_global,
613 bp_agent,
614 bp_global,
615 );
616 OperatorInfo {
617 kind,
618 id: session.operator_id.clone(),
619 senior_bridge,
620 spawn_hook,
621 operator,
622 }
623 }
624
625 /// Convenience attach that takes an `OperatorInfo` (three
626 /// `Arc<dyn ...>` fields plus `kind`) **inline**.
627 ///
628 /// # Pipeline
629 ///
630 /// Each `Arc<dyn ...>` is auto-registered on the engine's registry
631 /// under a synthetic ID (`br-<hex>` / `hk-<hex>` / `op-<hex>`), and
632 /// the session stores that synthetic ID. Subsequent `dispatch_attempt`
633 /// calls rebuild the `Arc`s from those IDs via
634 /// `resolve_operator_info`, and the three middlewares fire as usual.
635 ///
636 /// # ⚠ Non-persisted sessions only
637 ///
638 /// Because this API takes inline `Arc`s, the reattach path after
639 /// session persistence cannot rebuild them — the synthetic IDs are
640 /// not present in a freshly started process's registry. If you need
641 /// persistence, use [`Self::attach_with_ids`] with `register_*` calls
642 /// beforehand to go through **named IDs** instead.
643 ///
644 /// Handy for tests and short-lived in-process sessions. Production
645 /// WebSocket callbacks and the like should prefer `attach_with_ids`
646 /// as the canonical path.
647 pub async fn attach_with(
648 &self,
649 operator_id: impl Into<String>,
650 role: Role,
651 ttl: Duration,
652 operator_info: crate::core::ctx::OperatorInfo,
653 ) -> Result<CapToken, EngineError> {
654 let operator_id = operator_id.into();
655 // The caller always hands in a fully-formed `OperatorInfo`
656 // (including its `kind`), so it is stored as an explicit "Runtime
657 // Global" tier request (`Some(kind)`) — this path never persists
658 // BP-level tiers (both stay empty below), so `Some(kind)` resolves
659 // to the same `kind` at dispatch either way; see
660 // `OperatorSession.operator_kind` doc.
661 let kind = operator_info.kind;
662 // BridgeRegistry auto-register: when the caller hands in an
663 // `Arc<dyn>` directly, register it under a synthesised ID (the inline
664 // path aware of persistence). Callers who want to pre-register with a
665 // named ID should use `register_senior_bridge` / `register_spawn_hook`
666 // + `attach_with_ids`.
667 let bridge_id = if let Some(bridge) = operator_info.senior_bridge.clone() {
668 let id = format!("br-{}", crate::types::uid_hex(8));
669 self.inner
670 .senior_bridges
671 .write()
672 .await
673 .insert(id.clone(), bridge);
674 Some(id)
675 } else {
676 None
677 };
678 let hook_id = if let Some(hook) = operator_info.spawn_hook.clone() {
679 let id = format!("hk-{}", crate::types::uid_hex(8));
680 self.inner
681 .spawn_hooks
682 .write()
683 .await
684 .insert(id.clone(), hook);
685 Some(id)
686 } else {
687 None
688 };
689 let operator_backend_id = if let Some(operator) = operator_info.operator.clone() {
690 let id = format!("op-{}", crate::types::uid_hex(8));
691 self.inner
692 .operators
693 .write()
694 .await
695 .insert(id.clone(), operator);
696 Some(id)
697 } else {
698 None
699 };
700
701 let token = self
702 .inner
703 .signer
704 .session(operator_id.clone(), role, vec!["*".into()], ttl);
705 let session_id = SessionId::new();
706 let nonce = token.nonce.clone();
707 let now = now_unix();
708 let token_for_store = token.clone();
709
710 self.with_state("attach_with", |s| {
711 s.tokens
712 .insert(nonce.clone(), CapTokenRecord::from_token(token_for_store));
713 s.sessions.insert(
714 session_id.clone(),
715 OperatorSession {
716 id: session_id.clone(),
717 operator_id,
718 role,
719 attached_at: now,
720 last_seen: now,
721 attached: true,
722 owned_task_ids: Vec::new(),
723 token_nonce: nonce.clone(),
724 operator_kind: Some(kind),
725 runtime_agent_kinds: HashMap::new(),
726 bp_agent_kinds: HashMap::new(),
727 bp_global_kind: None,
728 bridge_id,
729 hook_id,
730 operator_backend_id,
731 },
732 );
733 s.push_event(Event::SessionAttached {
734 session_id: session_id.clone(),
735 role,
736 });
737 })
738 .await?;
739
740 let _ = self
741 .inner
742 .event_tx
743 .send(Event::SessionAttached { session_id, role });
744 Ok(token)
745 }
746
747 /// Mark the session bound to `token` as detached (`attached = false`).
748 /// Tasks are left in place — a later `attach`/`attach_with_ids` call
749 /// carrying the same registered bridge/hook IDs can pick them back up.
750 pub async fn detach(&self, token: &CapToken) -> Result<(), EngineError> {
751 self.verify_token(token, Verb::DetachSession).await?;
752 self.with_state("detach", |s| {
753 let sid = s
754 .sessions
755 .iter()
756 .find(|(_, sess)| sess.token_nonce == token.nonce)
757 .map(|(id, _)| id.clone());
758 if let Some(sid) = sid {
759 if let Some(sess) = s.sessions.get_mut(&sid) {
760 sess.attached = false;
761 }
762 s.push_event(Event::SessionDetached {
763 session_id: sid.clone(),
764 });
765 let _ = sid;
766 }
767 })
768 .await?;
769 Ok(())
770 }
771
772 /// Refresh the session's `last_seen` timestamp and mark it `attached`.
773 /// Called periodically by an attached client to avoid being flipped to
774 /// detached by `start_detach_loop`.
775 pub async fn heartbeat(&self, token: &CapToken) -> Result<(), EngineError> {
776 self.verify_token(token, Verb::Heartbeat).await?;
777 let now = now_unix();
778 self.with_state("heartbeat", |s| {
779 if let Some(sess) = s
780 .sessions
781 .values_mut()
782 .find(|sess| sess.token_nonce == token.nonce)
783 {
784 sess.last_seen = now;
785 sess.attached = true;
786 }
787 })
788 .await?;
789 Ok(())
790 }
791
792 // ═══════════════════════════════════════════════════════════════════════
793 // Task lifecycle
794 // ═══════════════════════════════════════════════════════════════════════
795
796 /// Create a new `TaskState` from `spec` and register its initial
797 /// prompt. When the calling token is a Worker (i.e. this is a
798 /// recursive spawn), the new task inherits `parent.spawn_depth + 1`
799 /// and is rejected with `SpawnDepthExceeded` once `max_spawn_depth` is
800 /// hit; an Operator-issued call starts at depth 0.
801 pub async fn start_task(
802 &self,
803 token: &CapToken,
804 spec: TaskSpec,
805 ) -> Result<TaskId, EngineError> {
806 self.verify_token(token, Verb::StartTask).await?;
807 let task_id = TaskId::new();
808 let directive = spec.initial_directive.clone();
809 let task_id_clone = task_id.clone();
810 let nonce = token.nonce.clone();
811 let max_depth = self.inner.cfg.max_spawn_depth;
812 self.with_state("start_task", move |s| {
813 // Recursive swarm depth gate (recursion guard):
814 // Worker tokens carry CapTokenRecord.parent_task_id. Give the
815 // child parent's spawn_depth + 1; if it exceeds `max`, raise an
816 // error. Operator tokens (parent_task_id=None) start at depth 0.
817 let parent_depth_opt = s
818 .tokens
819 .get(&nonce)
820 .and_then(|rec| rec.task_id.as_ref())
821 .and_then(|tid| s.tasks.get(tid))
822 .map(|t| t.spawn_depth);
823 let depth = match parent_depth_opt {
824 Some(d) => {
825 if d + 1 >= max_depth {
826 return Err(EngineError::SpawnDepthExceeded {
827 current: d + 1,
828 max: max_depth,
829 });
830 }
831 d + 1
832 }
833 None => 0,
834 };
835
836 let mut task = TaskState::new(task_id_clone.clone(), spec);
837 task.spawn_depth = depth;
838 s.tasks.insert(task_id_clone.clone(), task);
839 s.prompts.insert((task_id_clone.clone(), 1), directive);
840 // Link to the owner session (only Operator tokens match; Worker tokens have no session).
841 if let Some(sess) = s
842 .sessions
843 .values_mut()
844 .find(|sess| sess.token_nonce == nonce)
845 {
846 sess.owned_task_ids.push(task_id_clone.clone());
847 }
848 s.push_event(Event::TaskCreated {
849 task_id: task_id_clone.clone(),
850 });
851 Ok::<(), EngineError>(())
852 })
853 .await??;
854 let _ = self.inner.event_tx.send(Event::TaskCreated {
855 task_id: task_id.clone(),
856 });
857 Ok(task_id)
858 }
859
860 /// Fetch a snapshot of `TaskState` for `task_id`, subject to the
861 /// task-ownership gate (see `verify_token_for_task`).
862 pub async fn read_task_state(
863 &self,
864 token: &CapToken,
865 task_id: &TaskId,
866 ) -> Result<TaskState, EngineError> {
867 self.verify_token_for_task(token, Verb::ReadTaskState, task_id)
868 .await?;
869 let task_id = task_id.clone();
870 self.with_state("read_task_state", move |s| {
871 s.tasks
872 .get(&task_id)
873 .cloned()
874 .ok_or_else(|| EngineError::TaskNotFound(task_id.0.clone()))
875 })
876 .await?
877 }
878
879 /// Mark `task_id` as `Cancelled` and wake any caller blocked in
880 /// `poll_task` for it.
881 pub async fn cancel_task(&self, token: &CapToken, task_id: &TaskId) -> Result<(), EngineError> {
882 self.verify_token_for_task(token, Verb::CancelTask, task_id)
883 .await?;
884 let tid = task_id.clone();
885 self.with_state("cancel_task", move |s| {
886 let task = s
887 .tasks
888 .get_mut(&tid)
889 .ok_or_else(|| EngineError::TaskNotFound(tid.0.clone()))?;
890 task.status = TaskStatus::Cancelled;
891 task.updated_at = now_unix();
892 s.push_event(Event::TaskCancelled {
893 task_id: tid.clone(),
894 });
895 Ok::<(), EngineError>(())
896 })
897 .await??;
898 self.wake_task(task_id).await?;
899 Ok(())
900 }
901
902 /// Dispatch a single attempt through the given `spawner`.
903 ///
904 /// The lock is only held for snapshot capture; the actual spawn and
905 /// completion await happen outside the lock (R3 discipline).
906 ///
907 /// Sits on the Domain side of the Data / Domain split. The dispatch
908 /// path itself does not touch big response bodies — those flow through
909 /// the Data plane (`output_store` module + sink / input_inject
910 /// `SpawnerLayer`s) around this method.
911 ///
912 /// The caller does the compile plus `service::linker::link` and
913 /// carries the same stack through each dispatch. Because the spawner
914 /// is passed per-request rather than looked up from engine-global
915 /// state, parallel requests against a single `Engine` instance
916 /// (different Blueprints, different spawners) do not race.
917 pub async fn dispatch_attempt_with(
918 &self,
919 token: &CapToken,
920 task_id: &TaskId,
921 spawner: &Arc<dyn SpawnerAdapter>,
922 ) -> Result<DispatchOutcome, EngineError> {
923 self.verify_token(token, Verb::DispatchAttempt).await?;
924 let task_id = task_id.clone();
925
926 // 1) Under the lock: increment the attempt number, mark Running, snapshot the
927 // prompt, and pull `operator_info` from the session so we can inject it into Ctx.
928 let nonce = token.nonce.clone();
929 let tid_for_prep = task_id.clone();
930 let (attempt, agent, session_snapshot) = self
931 .with_state("dispatch.prep", move |s| {
932 let task = s
933 .tasks
934 .get_mut(&tid_for_prep)
935 .ok_or_else(|| EngineError::TaskNotFound(tid_for_prep.0.clone()))?;
936 task.attempt += 1;
937 task.status = TaskStatus::Running;
938 task.updated_at = now_unix();
939 // The spawner pulls the prompt via engine.fetch_prompt. In prep,
940 // if the prompts table has no entry for this attempt yet,
941 // fall back and insert `initial_directive` so the subsequent
942 // fetch_prompt succeeds.
943 let attempt = task.attempt;
944 let initial = task.spec.initial_directive.clone();
945 s.prompts
946 .entry((tid_for_prep.clone(), attempt))
947 .or_insert(initial);
948 let task = s
949 .tasks
950 .get(&tid_for_prep)
951 .ok_or_else(|| EngineError::TaskNotFound(tid_for_prep.0.clone()))?;
952 let agent = task.spec.agent.clone();
953 // Session snapshot (looked up by token nonce). When no session
954 // exists (worker token invoked directly / test injection), fall
955 // back to None → default OperatorInfo.
956 let sess_clone = s
957 .sessions
958 .values()
959 .find(|sess| sess.token_nonce == nonce)
960 .cloned();
961 Ok::<_, EngineError>((attempt, agent, sess_clone))
962 })
963 .await??;
964 // BridgeRegistry lookup + per-agent OperatorKind cascade.
965 let operator_info = match session_snapshot {
966 Some(sess) => self.resolve_operator_info(&sess, &agent).await,
967 None => OperatorInfo::default(),
968 };
969
970 // 2) Outside the lock: worker token mint + spawn.
971 //
972 // Session-style mint (max_uses=None). Within one attempt the worker is
973 // expected to hit `verify_token + fetch_prompt + fetch_data + post_result`
974 // multiple times in order, so `one_time` would exhaust the token on the
975 // very first verb. Capability is guarded by (a) the role × verb gate and
976 // (b) the short TTL (600s).
977 let worker_token = self.inner.signer.session(
978 format!("worker-of-{task_id}"),
979 Role::Worker,
980 vec!["*".into()],
981 Duration::from_secs(1800),
982 );
983 let worker_nonce = worker_token.nonce.clone();
984 let task_id_for_worker = task_id.clone();
985 let worker_token_for_store = worker_token.clone();
986 self.with_state("dispatch.mint_worker", move |s| {
987 s.tokens.insert(
988 worker_nonce.clone(),
989 CapTokenRecord::from_worker_token(worker_token_for_store, task_id_for_worker),
990 );
991 })
992 .await?;
993
994 // Mint a short handle (`wh-XXXXXXXX`) and register it in worker_handles.
995 // Used by the simplified Bearer path for SubAgents (short-handle form
996 // avoids base64 copy-paste incidents).
997 let worker_handle = self.mint_worker_handle(worker_token.nonce.clone()).await?;
998
999 let mut ctx = Ctx::new(task_id.clone(), attempt, agent.clone());
1000 ctx.operator = operator_info; // activates MainAIMiddleware / Senior bridge
1001 ctx.meta
1002 .runtime
1003 .insert("worker_handle".to_string(), Value::String(worker_handle));
1004
1005 let worker = spawner
1006 .spawn(self, &ctx, task_id.clone(), attempt, worker_token)
1007 .await
1008 .map_err(|e| EngineError::DispatchFailed(e.to_string()))?;
1009
1010 // 3) Outside the lock: await worker.join() (signal-only). WorkerError is
1011 // stringified. The value is fetched via output_tail (sink path).
1012 let signal_result: Result<(), String> = worker.join().await.map_err(|e| e.to_string());
1013
1014 // Pull the last Final from output_tail and use it as the value.
1015 let value_ok: Result<(Value, bool), String> = match signal_result {
1016 Ok(()) => {
1017 let tail = self.output_tail(&task_id, attempt).await;
1018 let last_final = tail.iter().rev().find_map(|ev| match ev {
1019 crate::worker::output::OutputEvent::Final { content, ok } => {
1020 Some((content.clone(), *ok))
1021 }
1022 _ => None,
1023 });
1024 match last_final {
1025 Some((crate::worker::output::ContentRef::Inline { value }, ok)) => {
1026 Ok((value, ok))
1027 }
1028 Some((
1029 crate::worker::output::ContentRef::FileRef {
1030 path,
1031 mime,
1032 size_hint,
1033 },
1034 ok,
1035 )) => Ok((
1036 serde_json::json!({
1037 "file_ref": path.to_string_lossy(),
1038 "mime": mime,
1039 "size_hint": size_hint,
1040 }),
1041 ok,
1042 )),
1043 None => Err("no Final in output_tail".to_string()),
1044 }
1045 }
1046 Err(msg) => Err(msg),
1047 };
1048
1049 // 4) Under the lock: apply (split the borrow scope so push_event and task mut can co-exist).
1050 let outcome = self
1051 .with_state("dispatch.apply", |s| {
1052 if !s.tasks.contains_key(&task_id) {
1053 return Err(EngineError::TaskNotFound(task_id.0.clone()));
1054 }
1055 match value_ok {
1056 Ok((value, ok)) => {
1057 let pass = ok;
1058 {
1059 let task = s.tasks.get_mut(&task_id).unwrap();
1060 task.last_result = Some(value.clone());
1061 task.updated_at = now_unix();
1062 task.status = if pass {
1063 TaskStatus::Pass
1064 } else {
1065 TaskStatus::Blocked
1066 };
1067 }
1068 s.push_event(Event::TaskAttemptCompleted {
1069 task_id: task_id.clone(),
1070 attempt,
1071 result: value.clone(),
1072 });
1073 if pass {
1074 s.push_event(Event::TaskPass {
1075 task_id: task_id.clone(),
1076 result: value.clone(),
1077 });
1078 Ok::<_, EngineError>(DispatchOutcome::Pass(value))
1079 } else {
1080 s.push_event(Event::TaskBlocked {
1081 task_id: task_id.clone(),
1082 result: value.clone(),
1083 });
1084 Ok(DispatchOutcome::Blocked(value))
1085 }
1086 }
1087 Err(msg) => {
1088 let task = s.tasks.get_mut(&task_id).unwrap();
1089 task.status = TaskStatus::Blocked;
1090 task.updated_at = now_unix();
1091 Err(EngineError::DispatchFailed(msg))
1092 }
1093 }
1094 })
1095 .await??;
1096
1097 // event broadcast (outside the lock — push_event feeds the in-memory tail; broadcast is a separate path).
1098 let _ = self.inner.event_tx.send(Event::TaskAttemptCompleted {
1099 task_id: task_id.clone(),
1100 attempt,
1101 result: match &outcome {
1102 DispatchOutcome::Pass(v) | DispatchOutcome::Blocked(v) => v.clone(),
1103 _ => Value::Null,
1104 },
1105 });
1106
1107 // Wake any callers waiting in poll_task.
1108 self.wake_task(&task_id).await?;
1109
1110 Ok(outcome)
1111 }
1112
1113 // ═══════════════════════════════════════════════════════════════════════
1114 // Worker-side API (= prompt / data fetch + result post)
1115 // ═══════════════════════════════════════════════════════════════════════
1116
1117 /// Fetch the directive/prompt string for `task_id`'s current attempt.
1118 /// Falls back to `initial_directive` when no prompt has been recorded
1119 /// yet for that attempt.
1120 pub async fn fetch_prompt(
1121 &self,
1122 token: &CapToken,
1123 task_id: &TaskId,
1124 ) -> Result<String, EngineError> {
1125 self.verify_token_for_task(token, Verb::FetchPrompt, task_id)
1126 .await?;
1127 let task_id = task_id.clone();
1128 self.with_state("fetch_prompt", move |s| {
1129 let task = s
1130 .tasks
1131 .get(&task_id)
1132 .ok_or_else(|| EngineError::TaskNotFound(task_id.0.clone()))?;
1133 s.prompts
1134 .get(&(task_id.clone(), task.attempt.max(1)))
1135 .cloned()
1136 .ok_or_else(|| {
1137 EngineError::ResourceNotFound(format!(
1138 "prompt({}, attempt={})",
1139 task_id.0, task.attempt
1140 ))
1141 })
1142 })
1143 .await?
1144 }
1145
1146 /// Combined fetch for `HTTP /v1/worker/prompt`: returns `prompt` +
1147 /// (optional) `system` + `agent` + `attempt` in a single round trip.
1148 /// The verb gate reuses `FetchPrompt` — same semantics as "the worker
1149 /// pulls its task input".
1150 ///
1151 /// `system` is the value written by `OperatorSpawner::spawn` through
1152 /// `bake_worker_system_prompt` when it ran; otherwise `None` (no
1153 /// profile present, or the bake never happened).
1154 pub async fn fetch_worker_payload(
1155 &self,
1156 token: &CapToken,
1157 task_id: &TaskId,
1158 ) -> Result<crate::types::WorkerPayload, EngineError> {
1159 self.verify_token_for_task(token, Verb::FetchPrompt, task_id)
1160 .await?;
1161 let task_id_clone = task_id.clone();
1162 self.with_state("fetch_worker_payload", move |s| {
1163 let task = s
1164 .tasks
1165 .get(&task_id_clone)
1166 .ok_or_else(|| EngineError::TaskNotFound(task_id_clone.0.clone()))?;
1167 let attempt = task.attempt.max(1);
1168 let prompt = s
1169 .prompts
1170 .get(&(task_id_clone.clone(), attempt))
1171 .cloned()
1172 .ok_or_else(|| {
1173 EngineError::ResourceNotFound(format!(
1174 "prompt({}, attempt={})",
1175 task_id_clone.0, attempt
1176 ))
1177 })?;
1178 let system = s
1179 .systems
1180 .get(&(task_id_clone.clone(), attempt))
1181 .cloned()
1182 .unwrap_or(None);
1183 let agent = task.spec.agent.clone();
1184 Ok::<_, EngineError>(crate::types::WorkerPayload {
1185 task_id: task_id_clone.0.clone(),
1186 attempt,
1187 agent,
1188 prompt,
1189 system,
1190 })
1191 })
1192 .await?
1193 }
1194
1195 /// Fetch a worker payload via a short handle. Skips token verification
1196 /// and returns `prompt` + `system` + `agent` + `attempt` in a thin
1197 /// path. The caller is expected to have already resolved `task_id`
1198 /// via `task_id_from_handle` — the handle's presence in
1199 /// `worker_handles` means it was minted server-side and is therefore
1200 /// trusted.
1201 pub async fn fetch_worker_payload_trusted(
1202 &self,
1203 task_id: &TaskId,
1204 ) -> Result<crate::types::WorkerPayload, EngineError> {
1205 let task_id_clone = task_id.clone();
1206 self.with_state("fetch_worker_payload_trusted", move |s| {
1207 let task = s
1208 .tasks
1209 .get(&task_id_clone)
1210 .ok_or_else(|| EngineError::TaskNotFound(task_id_clone.0.clone()))?;
1211 let attempt = task.attempt.max(1);
1212 let prompt = s
1213 .prompts
1214 .get(&(task_id_clone.clone(), attempt))
1215 .cloned()
1216 .ok_or_else(|| {
1217 EngineError::ResourceNotFound(format!(
1218 "prompt({}, attempt={})",
1219 task_id_clone.0, attempt
1220 ))
1221 })?;
1222 let system = s
1223 .systems
1224 .get(&(task_id_clone.clone(), attempt))
1225 .cloned()
1226 .unwrap_or(None);
1227 let agent = task.spec.agent.clone();
1228 Ok::<_, EngineError>(crate::types::WorkerPayload {
1229 task_id: task_id_clone.0.clone(),
1230 attempt,
1231 agent,
1232 prompt,
1233 system,
1234 })
1235 })
1236 .await?
1237 }
1238
1239 /// Read the current attempt number for a task (server-side lookup, no
1240 /// token verification). Used on `HTTP /v1/worker/result` when the
1241 /// worker omits `attempt` and the server has to fill it in.
1242 pub async fn task_attempt(&self, task_id: &TaskId) -> Result<u32, EngineError> {
1243 let task_id = task_id.clone();
1244 self.with_state("task_attempt", move |s| {
1245 s.tasks
1246 .get(&task_id)
1247 .map(|t| t.attempt)
1248 .ok_or_else(|| EngineError::TaskNotFound(task_id.0.clone()))
1249 })
1250 .await?
1251 }
1252
1253 /// Server-side admin API that lets `OperatorSpawner::spawn` bake the
1254 /// rendered `system_prompt` into engine state. There is no verb gate
1255 /// — the only expected caller is inside the spawner. SubAgents fetch
1256 /// this alongside the prompt on the `/v1/worker/prompt` path.
1257 pub async fn bake_worker_system_prompt(
1258 &self,
1259 task_id: &TaskId,
1260 attempt: u32,
1261 system: Option<String>,
1262 ) -> Result<(), EngineError> {
1263 let task_id = task_id.clone();
1264 self.with_state("bake_worker_system_prompt", move |s| {
1265 s.systems.insert((task_id, attempt), system);
1266 })
1267 .await?;
1268 Ok(())
1269 }
1270
1271 /// Fetch an arbitrary named resource previously stored via
1272 /// `set_resource`. Not task-scoped — any valid token with the
1273 /// `FetchData` verb may read any key.
1274 pub async fn fetch_data(&self, token: &CapToken, key: &str) -> Result<Value, EngineError> {
1275 self.verify_token(token, Verb::FetchData).await?;
1276 let key = key.to_string();
1277 self.with_state("fetch_data", move |s| {
1278 s.resources
1279 .get(&key)
1280 .cloned()
1281 .ok_or(EngineError::ResourceNotFound(key))
1282 })
1283 .await?
1284 }
1285
1286 // ───────────────────────────────────────────────────────────────────────
1287 // Output path.
1288 // ───────────────────────────────────────────────────────────────────────
1289
1290 /// Send one output event from inside a `SpawnerAdapter` or worker.
1291 /// Structuring is assumed to be complete by the time we cross the
1292 /// `SpawnerAdapter` boundary; this API just appends to the
1293 /// `OutputStore`, pushes to the `EventLog`, and (for `Final`) emits
1294 /// the `TaskAttemptCompleted` event.
1295 ///
1296 /// This is Domain-side plumbing: it feeds the engine's verdict flow,
1297 /// not the Data-plane store in the `output_store` module. It also
1298 /// does not wake the dispatch path — that is done through the
1299 /// spawner's completion oneshot when the worker terminates.
1300 pub async fn submit_output(
1301 &self,
1302 token: &crate::types::CapToken,
1303 task_id: &TaskId,
1304 attempt: u32,
1305 event: crate::worker::output::OutputEvent,
1306 ) -> Result<(), EngineError> {
1307 self.verify_token_for_task(token, crate::types::Verb::EmitOutput, task_id)
1308 .await?;
1309 let task_id_for_apply = task_id.clone();
1310 let event_clone = event.clone();
1311 self.with_state("submit_output", move |s| {
1312 s.output_store
1313 .entry((task_id_for_apply.clone(), attempt))
1314 .or_default()
1315 .push(event_clone.clone());
1316 s.push_event(crate::core::state::Event::WorkerOutput {
1317 task_id: task_id_for_apply,
1318 attempt,
1319 event: event_clone,
1320 });
1321 })
1322 .await?;
1323 Ok(())
1324 }
1325
1326 /// Snapshot the entire output tail for a given `(task_id, attempt)`.
1327 /// Used by the dispatch path when pulling `Final`, and by observers
1328 /// reading the trace.
1329 pub async fn output_tail(
1330 &self,
1331 task_id: &TaskId,
1332 attempt: u32,
1333 ) -> Vec<crate::worker::output::OutputEvent> {
1334 let key = (task_id.clone(), attempt);
1335 self.with_state("output_tail", move |s| {
1336 s.output_store.get(&key).cloned().unwrap_or_default()
1337 })
1338 .await
1339 .unwrap_or_default()
1340 }
1341
1342 /// Record an interim `last_result` for `task_id` without changing its
1343 /// `status`. Distinct from the terminal `Final` output event handled
1344 /// through `submit_output` / `dispatch_attempt_with`.
1345 pub async fn post_result(
1346 &self,
1347 token: &CapToken,
1348 task_id: &TaskId,
1349 result: Value,
1350 ) -> Result<(), EngineError> {
1351 self.verify_token_for_task(token, Verb::PostResult, task_id)
1352 .await?;
1353 let task_id = task_id.clone();
1354 let result_clone = result.clone();
1355 self.with_state("post_result", move |s| {
1356 let task = s
1357 .tasks
1358 .get_mut(&task_id)
1359 .ok_or_else(|| EngineError::TaskNotFound(task_id.0.clone()))?;
1360 task.last_result = Some(result_clone);
1361 task.updated_at = now_unix();
1362 Ok::<(), EngineError>(())
1363 })
1364 .await??;
1365 Ok(())
1366 }
1367
1368 /// Store a named resource value, retrievable later via `fetch_data`.
1369 /// No token is required — this is a server-side/admin-style setter
1370 /// (mirrors `bake_worker_system_prompt`).
1371 pub async fn set_resource(
1372 &self,
1373 key: impl Into<String>,
1374 value: Value,
1375 ) -> Result<(), EngineError> {
1376 let key = key.into();
1377 self.with_state("set_resource", move |s| {
1378 s.resources.insert(key, value);
1379 })
1380 .await?;
1381 Ok(())
1382 }
1383
1384 // ═══════════════════════════════════════════════════════════════════════
1385 // Senior suspend / resume
1386 // ═══════════════════════════════════════════════════════════════════════
1387
1388 /// Ask a question of the Senior, mark the task `Suspended`, and
1389 /// return a `ResumeKey`. The suspended state persists until another
1390 /// task calls `resume(key, answer)`.
1391 ///
1392 /// Resume-side waiting is `Notify`-based, so a caller (typically
1393 /// MainAI) can detach, reattach from a different process, and still
1394 /// pull the answer out via `await_resume(key, timeout)` — the answer
1395 /// is stored inside `EngineState`.
1396 pub async fn query_senior(
1397 &self,
1398 token: &CapToken,
1399 task_id: &TaskId,
1400 question: Value,
1401 ) -> Result<ResumeKey, EngineError> {
1402 self.verify_token(token, Verb::QuerySenior).await?;
1403 let task_id = task_id.clone();
1404 let key = ResumeKey::for_senior(&task_id);
1405 let task_notify = self
1406 .with_state("query_senior.notify_ensure", |s| {
1407 s.ensure_task_notify(&task_id)
1408 })
1409 .await?;
1410
1411 let key_clone = key.clone();
1412 let task_id_inner = task_id.clone();
1413 let question_clone = question.clone();
1414 self.with_state("query_senior.suspend", move |s| {
1415 let task = s
1416 .tasks
1417 .get_mut(&task_id_inner)
1418 .ok_or_else(|| EngineError::TaskNotFound(task_id_inner.0.clone()))?;
1419 task.status = TaskStatus::Suspended;
1420 task.suspended_on = Some(key_clone.clone());
1421 task.updated_at = now_unix();
1422 s.pending_resumes
1423 .insert(key_clone.clone(), ResumePending::new());
1424 s.push_event(Event::SeniorQueried {
1425 task_id: task_id_inner.clone(),
1426 question: question_clone.clone(),
1427 });
1428 s.push_event(Event::TaskSuspended {
1429 task_id: task_id_inner.clone(),
1430 key: key_clone.clone(),
1431 });
1432 Ok::<(), EngineError>(())
1433 })
1434 .await??;
1435
1436 // Notify callers waiting for a task status change (Running → Suspended).
1437 task_notify.notify_waiters();
1438
1439 let _ = self
1440 .inner
1441 .event_tx
1442 .send(Event::SeniorQueried { task_id, question });
1443 Ok(key)
1444 }
1445
1446 /// Store the answer for a `ResumeKey` in `EngineState` and wake the
1447 /// waiting caller via `Notify`. Also flips the suspended task's
1448 /// status back to `Running` and fires the per-task notifier.
1449 pub async fn resume(&self, key: ResumeKey, answer: Value) -> Result<(), EngineError> {
1450 let answer_for_state = answer.clone();
1451 let answer_for_event = answer.clone();
1452 let key_clone = key.clone();
1453 let (notify, task_notify, task_id_opt) = self
1454 .with_state("resume.set", move |s| {
1455 let pending = s
1456 .pending_resumes
1457 .get_mut(&key_clone)
1458 .ok_or(EngineError::ResumeKeyNotFound)?;
1459 pending.answer = Some(answer_for_state);
1460 let notify = pending.notify.clone();
1461
1462 let task_id = s
1463 .tasks
1464 .iter()
1465 .find(|(_, t)| t.suspended_on.as_ref() == Some(&key_clone))
1466 .map(|(id, _)| id.clone());
1467
1468 let task_notify = task_id.as_ref().map(|tid| s.ensure_task_notify(tid));
1469
1470 if let Some(tid) = &task_id {
1471 if let Some(task) = s.tasks.get_mut(tid) {
1472 task.suspended_on = None;
1473 task.status = TaskStatus::Running;
1474 task.updated_at = now_unix();
1475 }
1476 s.push_event(Event::TaskResumed {
1477 task_id: tid.clone(),
1478 key: key_clone.clone(),
1479 });
1480 s.push_event(Event::SeniorAnswered {
1481 task_id: tid.clone(),
1482 answer: answer_for_event.clone(),
1483 });
1484 }
1485 Ok::<_, EngineError>((notify, task_notify, task_id))
1486 })
1487 .await??;
1488
1489 // Outside the lock: notify_waiters for both the ResumePending and task-status waits.
1490 notify.notify_waiters();
1491 if let Some(n) = task_notify {
1492 n.notify_waiters();
1493 }
1494
1495 if let Some(tid) = task_id_opt {
1496 let _ = self
1497 .inner
1498 .event_tx
1499 .send(Event::TaskResumed { task_id: tid, key });
1500 }
1501 Ok(())
1502 }
1503
1504 /// Wait for the resume answer. Even if the caller (an Operator)
1505 /// detached and reattached, the answer is available immediately here
1506 /// — if it was already stored, this returns without waiting on the
1507 /// notifier.
1508 ///
1509 /// `timeout = Duration::ZERO` performs an instant check without
1510 /// waiting.
1511 pub async fn await_resume(
1512 &self,
1513 key: ResumeKey,
1514 timeout: Duration,
1515 ) -> Result<Value, EngineError> {
1516 // (1) Under the lock: clone the notify handle and check for an existing answer.
1517 let key_clone = key.clone();
1518 let (notify, existing) = self
1519 .with_state("await_resume.snapshot", move |s| {
1520 let pending = s
1521 .pending_resumes
1522 .get(&key_clone)
1523 .ok_or(EngineError::ResumeKeyNotFound)?;
1524 Ok::<_, EngineError>((pending.notify.clone(), pending.answer.clone()))
1525 })
1526 .await??;
1527
1528 // (2) If an answer has already been stored, return immediately (detach / reattach pattern).
1529 if let Some(v) = existing {
1530 return Ok(v);
1531 }
1532
1533 // (3) Outside the lock: wait on the notify with a timeout.
1534 if timeout.is_zero() {
1535 return Err(EngineError::PollTimeout);
1536 }
1537 let waited = tokio::time::timeout(timeout, notify.notified()).await;
1538 if waited.is_err() {
1539 return Err(EngineError::PollTimeout);
1540 }
1541
1542 // (4) Under the lock: re-read the answer (should be present now that we were notified).
1543 let key_clone = key.clone();
1544 self.with_state("await_resume.read", move |s| {
1545 let pending = s
1546 .pending_resumes
1547 .get(&key_clone)
1548 .ok_or(EngineError::ResumeKeyNotFound)?;
1549 pending
1550 .answer
1551 .clone()
1552 .ok_or_else(|| EngineError::Internal("notified but answer missing".into()))
1553 })
1554 .await?
1555 }
1556
1557 // ═══════════════════════════════════════════════════════════════════════
1558 // poll_task — the "wait" path that waits for task-status changes (works for long-poll and regular wait).
1559 // ═══════════════════════════════════════════════════════════════════════
1560
1561 /// Wait until the task's status **transitions to terminal or
1562 /// `Suspended`**, then return the latest `TaskState`. Returns
1563 /// immediately if the task is already in a terminal state.
1564 /// Exceeding the timeout returns `EngineError::PollTimeout`.
1565 ///
1566 /// A `hold` of `Duration::from_secs(0)` returns a snapshot immediately
1567 /// (no wait). Larger holds — tens of minutes up to days — are fine;
1568 /// the wait state is kept in memory inside the engine and does not
1569 /// degrade.
1570 pub async fn poll_task(
1571 &self,
1572 token: &CapToken,
1573 task_id: &TaskId,
1574 hold: Duration,
1575 ) -> Result<TaskState, EngineError> {
1576 self.verify_token_for_task(token, Verb::PollTask, task_id)
1577 .await?;
1578 let task_id_inner = task_id.clone();
1579
1580 // (1) Under the lock: take a snapshot and clone task_notify.
1581 let (state, notify) = self
1582 .with_state("poll_task.snapshot", move |s| {
1583 let task = s
1584 .tasks
1585 .get(&task_id_inner)
1586 .cloned()
1587 .ok_or_else(|| EngineError::TaskNotFound(task_id_inner.0.clone()))?;
1588 let notify = s.ensure_task_notify(&task_id_inner);
1589 Ok::<_, EngineError>((task, notify))
1590 })
1591 .await??;
1592
1593 // (2) Immediate-return condition: already terminal / Suspended (nothing left to wait on).
1594 if matches!(
1595 state.status,
1596 TaskStatus::Pass | TaskStatus::Blocked | TaskStatus::Cancelled | TaskStatus::Suspended
1597 ) {
1598 return Ok(state);
1599 }
1600 if hold.is_zero() {
1601 return Ok(state);
1602 }
1603
1604 // (3) Outside the lock: wait on Notify with a timeout.
1605 let waited = tokio::time::timeout(hold, notify.notified()).await;
1606 if waited.is_err() {
1607 return Err(EngineError::PollTimeout);
1608 }
1609
1610 // (4) Under the lock: take a fresh snapshot.
1611 let task_id_inner = task_id.clone();
1612 self.with_state("poll_task.reread", move |s| {
1613 s.tasks
1614 .get(&task_id_inner)
1615 .cloned()
1616 .ok_or_else(|| EngineError::TaskNotFound(task_id_inner.0.clone()))
1617 })
1618 .await?
1619 }
1620
1621 // ═══════════════════════════════════════════════════════════════════════
1622 // Background: heartbeat miss → detach loop
1623 // ═══════════════════════════════════════════════════════════════════════
1624
1625 /// Background loop that scans sessions every `heartbeat_interval` and
1626 /// flips `attached = false` on any session whose `last_seen` exceeds
1627 /// `heartbeat_miss_threshold * interval`.
1628 ///
1629 /// The tasks themselves are kept (assuming
1630 /// `keepalive_on_idle = true`), so another client can reattach with
1631 /// the same token and resume immediately. Dropping the returned
1632 /// `JoinHandle` does not stop the loop — the handle exists so callers
1633 /// who want to abort can hold onto it.
1634 pub fn start_detach_loop(&self) -> tokio::task::JoinHandle<()> {
1635 let engine = self.clone();
1636 let cfg = self.inner.cfg.long_hold.clone();
1637 let interval = cfg.heartbeat_interval;
1638 let miss_secs = cfg.heartbeat_interval.as_secs() * cfg.heartbeat_miss_threshold as u64;
1639
1640 tokio::spawn(async move {
1641 let mut ticker = tokio::time::interval(interval);
1642 ticker.tick().await; // first tick is immediate
1643 loop {
1644 ticker.tick().await;
1645 let now = now_unix();
1646 let detached = engine
1647 .with_state("detach_loop.scan", |s| {
1648 let mut detached = Vec::new();
1649 for (sid, sess) in s.sessions.iter_mut() {
1650 if !sess.attached {
1651 continue;
1652 }
1653 if now.saturating_sub(sess.last_seen) >= miss_secs {
1654 sess.attached = false;
1655 detached.push(sid.clone());
1656 }
1657 }
1658 for sid in &detached {
1659 s.push_event(Event::SessionDetached {
1660 session_id: sid.clone(),
1661 });
1662 }
1663 detached
1664 })
1665 .await
1666 .unwrap_or_default();
1667 for sid in detached {
1668 let _ = engine
1669 .inner
1670 .event_tx
1671 .send(Event::SessionDetached { session_id: sid });
1672 }
1673 }
1674 })
1675 }
1676
1677 /// Helper: wake a task whose status has changed. Called from the
1678 /// method body outside the lock.
1679 async fn wake_task(&self, task_id: &TaskId) -> Result<(), EngineError> {
1680 let task_id = task_id.clone();
1681 let notify_opt = self
1682 .with_state("wake_task.get_notify", move |s| {
1683 s.task_notifies.get(&task_id).cloned()
1684 })
1685 .await?;
1686 if let Some(n) = notify_opt {
1687 n.notify_waiters();
1688 }
1689 Ok(())
1690 }
1691}
1692
1693// ─── UT: `OperatorKind` "Runtime Global" tier — `Option` semantics ─────────
1694//
1695// Regression coverage for the "explicit Automate is indistinguishable from
1696// unspecified" defect: `OperatorSession.operator_kind` (and the
1697// `attach_with_ids` `kind` parameter it stores) is `Option<OperatorKind>`,
1698// so `Some(Automate)` is an explicit Runtime Global request that must
1699// outrank `bp_global`, while `None` must let `bp_global` decide. Exercises
1700// the real `resolve_operator_info` cascade path (not just
1701// `collapse_operator_kind` in isolation), attaching via `attach_with_ids`
1702// exactly as `TaskLaunchService::launch` does.
1703#[cfg(test)]
1704mod resolve_operator_info_runtime_global_tests {
1705 use super::*;
1706
1707 async fn attach_and_resolve(
1708 runtime_global: Option<OperatorKind>,
1709 bp_global: Option<OperatorKind>,
1710 ) -> OperatorInfo {
1711 let engine = Engine::new(EngineCfg::default());
1712 let token = engine
1713 .attach_with_ids(
1714 "ut-op",
1715 Role::Operator,
1716 Duration::from_secs(30),
1717 runtime_global,
1718 None,
1719 None,
1720 None,
1721 HashMap::new(),
1722 HashMap::new(),
1723 bp_global,
1724 )
1725 .await
1726 .expect("attach_with_ids ok");
1727 let session = engine
1728 .with_state("test.find_session", |s| {
1729 s.sessions
1730 .values()
1731 .find(|sess| sess.token_nonce == token.nonce)
1732 .cloned()
1733 })
1734 .await
1735 .expect("with_state ok")
1736 .expect("session present after attach_with_ids");
1737 engine.resolve_operator_info(&session, "agent-x").await
1738 }
1739
1740 #[tokio::test]
1741 async fn explicit_some_automate_outranks_bp_global_main_ai() {
1742 // Runtime Global explicitly requests Automate; bp_global is MainAi.
1743 // The explicit `Some(Automate)` must win — this is exactly the case
1744 // the old `== OperatorKind::default()` convention got wrong (it
1745 // could not tell "explicitly Automate" from "unspecified" and would
1746 // have let `bp_global` (MainAi) take over instead).
1747 let info =
1748 attach_and_resolve(Some(OperatorKind::Automate), Some(OperatorKind::MainAi)).await;
1749 assert_eq!(
1750 info.kind,
1751 OperatorKind::Automate,
1752 "explicit Some(Automate) runtime_global must outrank bp_global MainAi"
1753 );
1754 }
1755
1756 #[tokio::test]
1757 async fn none_lets_bp_global_main_ai_win() {
1758 // Runtime Global left unspecified (`None`); bp_global is MainAi.
1759 // With nothing more specific set, `bp_global` must decide.
1760 let info = attach_and_resolve(None, Some(OperatorKind::MainAi)).await;
1761 assert_eq!(
1762 info.kind,
1763 OperatorKind::MainAi,
1764 "None runtime_global must let bp_global MainAi win"
1765 );
1766 }
1767}