Skip to main content

mlua_swarm/core/
engine.rs

1//! `Engine` — the long-running stateful runtime plus the `with_state`
2//! helper (R1-R4 discipline).
3//!
4//! The engine owns the Domain side of the Data / Domain split:
5//! flow control (dispatch / verdict), state (`EngineState`), and the
6//! `submit_output` / `output_tail` surface that feeds it. Data-plane
7//! traffic (Big Response bodies) is delegated to the `output_store` module
8//! plus its paired `SpawnerLayer`s and passes through here without the
9//! engine core needing to grow.
10
11use crate::core::config::EngineCfg;
12use crate::core::ctx::{Ctx, OperatorInfo, OperatorKind, SeniorBridge, SpawnHook};
13use crate::core::errors::EngineError;
14use crate::core::state::{
15    CapTokenRecord, DispatchOutcome, EngineState, Event, EventStream, OperatorSession, ResumeKey,
16    ResumePending, TaskSpec, TaskState, TaskStatus,
17};
18use crate::types::{
19    default_role_verb_table, now_unix, CapToken, Role, RoleVerbGate, SessionId, TaskId,
20    TokenSigner, Verb,
21};
22use crate::worker::adapter::SpawnerAdapter;
23use serde_json::Value;
24use std::collections::HashMap;
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use tokio::sync::{broadcast, Mutex};
28
29/// Process-wide long-running runtime. Cheap to `clone()` — an `Arc`
30/// lives inside.
31#[derive(Clone)]
32pub struct Engine {
33    inner: Arc<EngineInner>,
34}
35
36struct EngineInner {
37    state: Mutex<EngineState>,
38    cfg: EngineCfg,
39    signer: TokenSigner,
40    gate: RoleVerbGate,
41    event_tx: broadcast::Sender<Event>,
42    /// ID-keyed bridge registry (register-by-ID design). `SeniorBridge`
43    /// and `SpawnHook` are registered by ID; sessions bind to those IDs
44    /// only. Persistence stores just the ID, and on reattach the caller
45    /// re-registers under the same ID to restore presence.
46    senior_bridges: tokio::sync::RwLock<HashMap<String, Arc<dyn SeniorBridge>>>,
47    spawn_hooks: tokio::sync::RwLock<HashMap<String, Arc<dyn SpawnHook>>>,
48    /// ID registry for full-spawn Operator backends (backends that take the
49    /// entire spawn via `execute`). Sibling to `senior_bridges` /
50    /// `spawn_hooks`. `OperatorDelegateMiddleware` looks these up via
51    /// `ctx` and, when `kind = MainAi` / `Composite`, bypasses
52    /// `inner.spawn` and calls `operator.execute` instead.
53    operators: tokio::sync::RwLock<HashMap<String, Arc<dyn crate::operator::Operator>>>,
54    /// Base and hint layer factories for the `SpawnerStack`. At
55    /// `service::linker::link` time, `compiled.router` is wrapped with
56    /// the base factories plus the hint factories resolved from
57    /// `blueprint.spawner_hints.layers`. This is the engine-side
58    /// counterpart to the discipline "Flow / Blueprint doesn't spell out
59    /// middleware implementations — it declares the capabilities it needs
60    /// as hint keys".
61    layer_registry: crate::middleware::LayerRegistry,
62}
63
64impl Engine {
65    /// Backwards-compatible constructor that starts the engine without a
66    /// layer registry, preserving the signature already used by ~88
67    /// existing call sites. Use this when automatic middleware wrapping
68    /// at bind time is not needed. Callers such as `mlua-swarm-server` go through
69    /// `new_with_layers(cfg, registry)` to enable the hint-resolution path.
70    pub fn new(cfg: EngineCfg) -> Self {
71        Self::new_with_layers(cfg, crate::middleware::LayerRegistry::new())
72    }
73
74    /// Construct an `Engine` with an explicit `LayerRegistry`, enabling
75    /// hint-resolution: `spawner_hints.layers` declared on a `Blueprint`
76    /// are resolved against this registry when the spawner stack is bound
77    /// at `service::linker::link` time.
78    pub fn new_with_layers(
79        cfg: EngineCfg,
80        layer_registry: crate::middleware::LayerRegistry,
81    ) -> Self {
82        let (event_tx, _) = broadcast::channel(256);
83        let signer = TokenSigner::new(&cfg.token_secret);
84        Self {
85            inner: Arc::new(EngineInner {
86                state: Mutex::new(EngineState::new()),
87                cfg,
88                signer,
89                gate: default_role_verb_table(),
90                event_tx,
91                senior_bridges: tokio::sync::RwLock::new(HashMap::new()),
92                spawn_hooks: tokio::sync::RwLock::new(HashMap::new()),
93                operators: tokio::sync::RwLock::new(HashMap::new()),
94                layer_registry,
95            }),
96        }
97    }
98
99    /// Rebuild this `Engine` with a different `RoleVerbGate`. The gate is
100    /// treated as fixed-at-build-time, so this constructs a fresh
101    /// `EngineInner` (fresh empty `EngineState`) rather than mutating in
102    /// place — mainly a testing convenience for swapping gate rules.
103    pub fn with_gate(self, gate: RoleVerbGate) -> Self {
104        // The gate is fixed at build time — the intent is to build a fresh
105        // instance rather than mutating in place. As a testing convenience we
106        // do allow swapping the inner Arc. Simpler form: just rebuild
107        // Arc<EngineInner>.
108        let inner = Arc::new(EngineInner {
109            state: Mutex::new(EngineState::new()),
110            cfg: self.inner.cfg.clone(),
111            signer: self.inner.signer.clone(),
112            gate,
113            event_tx: self.inner.event_tx.clone(),
114            senior_bridges: tokio::sync::RwLock::new(HashMap::new()),
115            spawn_hooks: tokio::sync::RwLock::new(HashMap::new()),
116            operators: tokio::sync::RwLock::new(HashMap::new()),
117            layer_registry: self.inner.layer_registry.clone(),
118        });
119        Self { inner }
120    }
121
122    // ═══════════════════════════════════════════════════════════════════════
123    // Accessors. Production code drives execution through compile +
124    // `service::linker::link` + `dispatch_attempt_with(spawner)` inside
125    // `TaskLaunchService`; `Engine` itself is a pure execution surface — it
126    // does not own a BlueprintStore / EnhanceAdapter / Compiler, nor a
127    // global spawner (the spawner is carried per-request, never stashed on
128    // the engine).
129    // ═══════════════════════════════════════════════════════════════════════
130
131    /// Access the `EngineCfg` this engine was built with.
132    pub fn cfg(&self) -> &EngineCfg {
133        &self.inner.cfg
134    }
135
136    /// Expose the internal `LayerRegistry` — used when deriving a
137    /// sub-engine that needs the same registry re-injected. The
138    /// per-request sub-engine in `mlua-swarm-server` reads the parent engine's
139    /// registry through this accessor and passes it to
140    /// `Engine::new_with_layers(cfg, parent.layer_registry().clone())`.
141    pub fn layer_registry(&self) -> &crate::middleware::LayerRegistry {
142        &self.inner.layer_registry
143    }
144
145    /// Access the `TokenSigner` used to mint/verify `CapToken`s.
146    pub fn signer(&self) -> &TokenSigner {
147        &self.inner.signer
148    }
149
150    /// Clone a handle to the process-wide `Event` broadcast sender. Prefer
151    /// `subscribe` for a ready-to-use receiver.
152    pub fn event_tx(&self) -> broadcast::Sender<Event> {
153        self.inner.event_tx.clone()
154    }
155
156    /// Subscribe to the engine's `Event` broadcast stream.
157    pub fn subscribe(&self) -> EventStream {
158        self.inner.event_tx.subscribe()
159    }
160
161    // ═══════════════════════════════════════════════════════════════════════
162    // §7 with_state — single Mutex + R1-R4 (try_lock + bounded retry + max-hold panic)
163    // ═══════════════════════════════════════════════════════════════════════
164
165    /// The closure is a **sync** `FnOnce` — you cannot pass an async
166    /// closure, which enforces R3 at the type level. Exceeding `max_hold`
167    /// panics so that R4 violations surface immediately.
168    pub async fn with_state<F, R>(&self, op: &'static str, f: F) -> Result<R, EngineError>
169    where
170        F: FnOnce(&mut EngineState) -> R,
171    {
172        let cfg = &self.inner.cfg;
173
174        // R2: try_lock + bounded retry
175        let mut guard_opt = None;
176        for attempt in 0..=cfg.max_retry {
177            match self.inner.state.try_lock() {
178                Ok(g) => {
179                    guard_opt = Some(g);
180                    break;
181                }
182                Err(_) if cfg.try_only => return Err(EngineError::LockBusy(op)),
183                Err(_) => {
184                    let backoff = cfg.backoff_ms_step * (attempt as u64 + 1);
185                    tokio::time::sleep(Duration::from_millis(backoff)).await;
186                }
187            }
188        }
189        let mut guard = guard_opt.ok_or(EngineError::LockBusyAfterRetry(op))?;
190
191        // R4: max_hold guard
192        let start = Instant::now();
193        let result = f(&mut guard);
194        let elapsed_ms = start.elapsed().as_millis();
195        drop(guard);
196
197        if elapsed_ms > cfg.max_hold_ms {
198            panic!(
199                "Engine.with_state('{op}') held {elapsed_ms}ms > max {}ms — suspected R3 violation (long op inside lock)",
200                cfg.max_hold_ms
201            );
202        }
203        Ok(result)
204    }
205
206    // ═══════════════════════════════════════════════════════════════════════
207    // Token verify (= sig + expire + gate + uses_left)
208    // ═══════════════════════════════════════════════════════════════════════
209
210    /// Four steps: (1) signature verify, (2) expiry check, (3) role × verb
211    /// gate, (4) `uses_left` consume.
212    pub async fn verify_token(&self, token: &CapToken, verb: Verb) -> Result<(), EngineError> {
213        // (1) sig
214        if !self.inner.signer.verify_sig(token) {
215            return Err(EngineError::BadSignature);
216        }
217        // (2) expire
218        if token.is_expired(now_unix()) {
219            return Err(EngineError::TokenExpired);
220        }
221        // (3) role × verb gate
222        if !self.inner.gate.is_allowed(token.role, verb) {
223            return Err(EngineError::RoleViolation {
224                role: token.role,
225                verb,
226            });
227        }
228        // (4) server-side uses_left consume
229        self.with_state("token.consume", |s| {
230            let rec = s
231                .tokens
232                .get_mut(token.id())
233                .ok_or_else(|| EngineError::TokenNotFound(token.id().to_string()))?;
234            rec.consume()
235                .map_err(|_: crate::core::state::CapTokenConsumeError| {
236                    EngineError::TokenUsesExhausted
237                })?;
238            Ok::<(), EngineError>(())
239        })
240        .await??;
241        Ok(())
242    }
243
244    /// `verify_token` plus the **task-ownership gate**.
245    ///
246    /// When a Worker-role token calls a state-touch verb (`fetch_prompt` /
247    /// `post_result` / `read_task_state` / `cancel_task` / `poll_task`),
248    /// the gate checks that `CapTokenRecord.task_id` matches the argument
249    /// `task_id`; a mismatch returns `EngineError::TokenTaskMismatch`.
250    /// Operator / Senior / Observer tokens are outside the ownership gate
251    /// and may touch any task.
252    ///
253    /// **Verbs exempt from the gate.** `start_task` and `dispatch_attempt`
254    /// stay outside so recursive swarming keeps working; depth is capped
255    /// by `max_spawn_depth`.
256    pub async fn verify_token_for_task(
257        &self,
258        token: &CapToken,
259        verb: Verb,
260        task_id: &TaskId,
261    ) -> Result<(), EngineError> {
262        self.verify_token(token, verb).await?;
263        if token.role != Role::Worker {
264            return Ok(());
265        }
266        let nonce = token.nonce.clone();
267        let arg_tid = task_id.clone();
268        self.with_state("token.ownership_gate", move |s| {
269            let bound = s
270                .tokens
271                .get(&nonce)
272                .and_then(|r| r.task_id.as_ref())
273                .cloned();
274            match bound {
275                Some(t) if t == arg_tid => Ok(()),
276                Some(t) => Err(EngineError::TokenTaskMismatch {
277                    bound: t.0,
278                    arg: arg_tid.0,
279                }),
280                None => Err(EngineError::TokenNotFound(nonce.clone())),
281            }
282        })
283        .await??;
284        Ok(())
285    }
286
287    /// Resolve the bound `task_id` from a Worker-role token. Used on the
288    /// simple `/v1/worker/submit` endpoint, where the worker POSTs with a
289    /// token but no `task_id`. Returns `Err` if the token role is not
290    /// Worker, or if no bound task is set.
291    pub async fn task_id_from_token(&self, token: &CapToken) -> Result<TaskId, EngineError> {
292        if token.role != Role::Worker {
293            return Err(EngineError::RoleViolation {
294                role: token.role,
295                verb: Verb::PostResult,
296            });
297        }
298        let nonce = token.nonce.clone();
299        self.with_state("task_id_from_token", move |s| {
300            s.tokens
301                .get(&nonce)
302                .and_then(|r| r.task_id.as_ref())
303                .cloned()
304                .ok_or_else(|| EngineError::TokenNotFound(nonce.clone()))
305        })
306        .await?
307    }
308
309    /// Resolve a short worker handle (`wh-XXXXXXXX`) to the bound
310    /// `task_id`. Used on `/v1/worker/submit` when the Bearer is a short
311    /// handle string rather than a full `CapToken` JSON. A missing entry
312    /// returns `TokenNotFound`, i.e. "the handle is not in the store".
313    pub async fn task_id_from_handle(&self, handle: &str) -> Result<TaskId, EngineError> {
314        let h = handle.to_string();
315        self.with_state("task_id_from_handle", move |s| {
316            let nonce = s
317                .worker_handles
318                .get(&h)
319                .cloned()
320                .ok_or_else(|| EngineError::TokenNotFound(format!("handle={h}")))?;
321            s.tokens
322                .get(&nonce)
323                .and_then(|r| r.task_id.as_ref())
324                .cloned()
325                .ok_or_else(|| EngineError::TokenNotFound(format!("nonce={nonce}")))
326        })
327        .await?
328    }
329
330    /// Submit a worker result via a short handle. Skips token verification
331    /// and updates `output_tail` `Final` + `task.last_result` directly in
332    /// a thin path. The caller is expected to have already resolved
333    /// `task_id` via `task_id_from_handle` — the handle's presence in
334    /// `worker_handles` means it was minted server-side and is therefore
335    /// trusted.
336    pub async fn submit_worker_result_trusted(
337        &self,
338        task_id: &TaskId,
339        attempt: u32,
340        value: Value,
341        ok: bool,
342    ) -> Result<(), EngineError> {
343        let task_id_for_apply = task_id.clone();
344        let value_for_event = value.clone();
345        self.with_state("submit_worker_result_trusted.output", move |s| {
346            let ev = crate::worker::output::OutputEvent::Final {
347                content: crate::worker::output::ContentRef::Inline {
348                    value: value_for_event,
349                },
350                ok,
351            };
352            s.output_store
353                .entry((task_id_for_apply.clone(), attempt))
354                .or_default()
355                .push(ev.clone());
356            s.push_event(crate::core::state::Event::WorkerOutput {
357                task_id: task_id_for_apply,
358                attempt,
359                event: ev,
360            });
361        })
362        .await?;
363        let task_id_for_result = task_id.clone();
364        let value_for_result = value.clone();
365        self.with_state("submit_worker_result_trusted.last_result", move |s| {
366            if let Some(t) = s.tasks.get_mut(&task_id_for_result) {
367                t.last_result = Some(value_for_result);
368                t.updated_at = now_unix();
369            }
370        })
371        .await?;
372        Ok(())
373    }
374
375    /// Mint a short handle and register it in the `worker_handles` map.
376    /// Called immediately after the worker-token mint inside
377    /// `dispatch_attempt_with`, and issues a handle bound to the same
378    /// nonce. Format is `wh-<8 hex chars>` (11 chars total), designed to
379    /// remove the base64 copy-paste failure mode.
380    async fn mint_worker_handle(&self, worker_nonce: String) -> Result<String, EngineError> {
381        // The handle is a sole bearer secret on the `/v1/worker/submit`
382        // short-handle path (`submit_worker_result_trusted` skips token
383        // verification), so it must be unguessable — OS RNG, not the
384        // predictable uid counter. 8 hex chars (~4B entropy) keeps the
385        // documented `wh-<8 hex>` wire shape; collision between live
386        // handles is negligible at in-process handle counts.
387        let short = crate::types::secure_hex(4);
388        let handle = format!("wh-{short}");
389        let h = handle.clone();
390        let n = worker_nonce.clone();
391        self.with_state("mint_worker_handle", move |s| {
392            s.worker_handles.insert(h, n);
393        })
394        .await?;
395        Ok(handle)
396    }
397
398    // ═══════════════════════════════════════════════════════════════════════
399    // Session API
400    // ═══════════════════════════════════════════════════════════════════════
401
402    /// Attach a new session with default `OperatorInfo` (`Automate`, no
403    /// bridges/hooks). Shorthand for `attach_with(.., OperatorInfo::default())`.
404    pub async fn attach(
405        &self,
406        operator_id: impl Into<String>,
407        role: Role,
408        ttl: Duration,
409    ) -> Result<CapToken, EngineError> {
410        self.attach_with(
411            operator_id,
412            role,
413            ttl,
414            crate::core::ctx::OperatorInfo::default(),
415        )
416        .await
417    }
418
419    // ═══════════════════════════════════════════════════════════════════════
420    // BridgeRegistry API.
421    // ═══════════════════════════════════════════════════════════════════════
422
423    /// Register a `SeniorBridge` under a name. An existing entry with the
424    /// same name is overwritten. On the persisted-session reattach path,
425    /// the caller re-registers under the same ID beforehand and the
426    /// bridge becomes effective again.
427    pub async fn register_senior_bridge(
428        &self,
429        id: impl Into<String>,
430        bridge: Arc<dyn SeniorBridge>,
431    ) {
432        self.inner
433            .senior_bridges
434            .write()
435            .await
436            .insert(id.into(), bridge);
437    }
438
439    /// Register a `SpawnHook` under a name. An existing entry with the
440    /// same name is overwritten.
441    pub async fn register_spawn_hook(&self, id: impl Into<String>, hook: Arc<dyn SpawnHook>) {
442        self.inner.spawn_hooks.write().await.insert(id.into(), hook);
443    }
444
445    /// Register an `Operator` (a spawn-body backend) under a name. An
446    /// existing entry with the same name is overwritten.
447    /// `OperatorDelegateMiddleware` looks this up via `ctx` and, when
448    /// `kind = MainAi` / `Composite`, bypasses `inner.spawn` and calls
449    /// `operator.execute` instead.
450    pub async fn register_operator(
451        &self,
452        id: impl Into<String>,
453        operator: Arc<dyn crate::operator::Operator>,
454    ) {
455        self.inner
456            .operators
457            .write()
458            .await
459            .insert(id.into(), operator);
460    }
461
462    /// Unregister a `SeniorBridge` by name (e.g. on WebSocket disconnect
463    /// or explicit teardown). A missing ID is a no-op.
464    pub async fn unregister_senior_bridge(&self, id: &str) {
465        self.inner.senior_bridges.write().await.remove(id);
466    }
467
468    /// Unregister a `SpawnHook` by name. A missing ID is a no-op.
469    pub async fn unregister_spawn_hook(&self, id: &str) {
470        self.inner.spawn_hooks.write().await.remove(id);
471    }
472
473    /// Unregister an `Operator` backend by name. A missing ID is a no-op.
474    pub async fn unregister_operator(&self, id: &str) {
475        self.inner.operators.write().await.remove(id);
476    }
477
478    /// Snapshot the list of registered `SpawnHook` IDs (for test
479    /// observation and debugging).
480    pub async fn list_spawn_hook_ids(&self) -> Vec<String> {
481        self.inner
482            .spawn_hooks
483            .read()
484            .await
485            .keys()
486            .cloned()
487            .collect()
488    }
489
490    /// Snapshot the list of registered `SeniorBridge` IDs.
491    pub async fn list_senior_bridge_ids(&self) -> Vec<String> {
492        self.inner
493            .senior_bridges
494            .read()
495            .await
496            .keys()
497            .cloned()
498            .collect()
499    }
500
501    /// Snapshot the list of registered `Operator` IDs.
502    pub async fn list_operator_ids(&self) -> Vec<String> {
503        self.inner.operators.read().await.keys().cloned().collect()
504    }
505
506    /// Attach specifying IDs directly. The caller is expected to have
507    /// pre-registered them via `register_senior_bridge` /
508    /// `register_spawn_hook` / `register_operator`. This is the canonical
509    /// path when persistence is in play.
510    ///
511    /// `kind` is the "Runtime Global" tier of the `OperatorKind` cascade
512    /// (stored verbatim on `OperatorSession.operator_kind`): `Some(_)` is
513    /// an explicit request (including `Some(OperatorKind::Automate)`) that
514    /// outranks the BP-level tiers; `None` leaves it unspecified so the
515    /// BP-level tiers / final default decide. See
516    /// `crate::core::ctx::collapse_operator_kind`.
517    #[allow(clippy::too_many_arguments)]
518    pub async fn attach_with_ids(
519        &self,
520        operator_id: impl Into<String>,
521        role: Role,
522        ttl: Duration,
523        kind: Option<OperatorKind>,
524        bridge_id: Option<String>,
525        hook_id: Option<String>,
526        operator_backend_id: Option<String>,
527        operator_kind_overrides: HashMap<String, OperatorKind>,
528        bp_agent_kinds: HashMap<String, OperatorKind>,
529        bp_global_kind: Option<OperatorKind>,
530    ) -> Result<CapToken, EngineError> {
531        let operator_id = operator_id.into();
532        let token = self
533            .inner
534            .signer
535            .session(operator_id.clone(), role, vec!["*".into()], ttl);
536        let session_id = SessionId::new();
537        let nonce = token.nonce.clone();
538        let now = now_unix();
539        let token_for_store = token.clone();
540
541        self.with_state("attach_with_ids", |s| {
542            s.tokens
543                .insert(nonce.clone(), CapTokenRecord::from_token(token_for_store));
544            s.sessions.insert(
545                session_id.clone(),
546                OperatorSession {
547                    id: session_id.clone(),
548                    operator_id: operator_id.clone(),
549                    role,
550                    attached_at: now,
551                    last_seen: now,
552                    attached: true,
553                    owned_task_ids: Vec::new(),
554                    token_nonce: nonce.clone(),
555                    operator_kind: kind,
556                    runtime_agent_kinds: operator_kind_overrides,
557                    bp_agent_kinds,
558                    bp_global_kind,
559                    bridge_id,
560                    hook_id,
561                    operator_backend_id,
562                },
563            );
564            s.push_event(Event::SessionAttached {
565                session_id: session_id.clone(),
566                role,
567            });
568        })
569        .await?;
570
571        let _ = self
572            .inner
573            .event_tx
574            .send(Event::SessionAttached { session_id, role });
575        Ok(token)
576    }
577
578    /// Build an `OperatorInfo` by looking up the session's registered IDs
579    /// on the `BridgeRegistry`, plus resolving the 4-tier `OperatorKind`
580    /// cascade for `agent_name` via `crate::core::ctx::collapse_operator_kind`.
581    /// Used when `dispatch_attempt` injects `Ctx`. An unresolved ID
582    /// (nothing registered) is silently `None` — the bridge / hook simply
583    /// does not fire and the default behaviour applies.
584    async fn resolve_operator_info(
585        &self,
586        session: &OperatorSession,
587        agent_name: &str,
588    ) -> OperatorInfo {
589        let senior_bridge = if let Some(id) = &session.bridge_id {
590            self.inner.senior_bridges.read().await.get(id).cloned()
591        } else {
592            None
593        };
594        let spawn_hook = if let Some(id) = &session.hook_id {
595            self.inner.spawn_hooks.read().await.get(id).cloned()
596        } else {
597            None
598        };
599        let operator = if let Some(id) = &session.operator_backend_id {
600            self.inner.operators.read().await.get(id).cloned()
601        } else {
602            None
603        };
604        let runtime_agent = session.runtime_agent_kinds.get(agent_name).copied();
605        // "Runtime Global" tier: `Some(_)` is always an explicit request
606        // (see the field doc on `OperatorSession.operator_kind`).
607        let runtime_global = session.operator_kind;
608        let bp_agent = session.bp_agent_kinds.get(agent_name).copied();
609        let bp_global = session.bp_global_kind;
610        let kind = crate::core::ctx::collapse_operator_kind(
611            runtime_agent,
612            runtime_global,
613            bp_agent,
614            bp_global,
615        );
616        OperatorInfo {
617            kind,
618            id: session.operator_id.clone(),
619            senior_bridge,
620            spawn_hook,
621            operator,
622        }
623    }
624
625    /// Convenience attach that takes an `OperatorInfo` (three
626    /// `Arc<dyn ...>` fields plus `kind`) **inline**.
627    ///
628    /// # Pipeline
629    ///
630    /// Each `Arc<dyn ...>` is auto-registered on the engine's registry
631    /// under a synthetic ID (`br-<hex>` / `hk-<hex>` / `op-<hex>`), and
632    /// the session stores that synthetic ID. Subsequent `dispatch_attempt`
633    /// calls rebuild the `Arc`s from those IDs via
634    /// `resolve_operator_info`, and the three middlewares fire as usual.
635    ///
636    /// # ⚠ Non-persisted sessions only
637    ///
638    /// Because this API takes inline `Arc`s, the reattach path after
639    /// session persistence cannot rebuild them — the synthetic IDs are
640    /// not present in a freshly started process's registry. If you need
641    /// persistence, use [`Self::attach_with_ids`] with `register_*` calls
642    /// beforehand to go through **named IDs** instead.
643    ///
644    /// Handy for tests and short-lived in-process sessions. Production
645    /// WebSocket callbacks and the like should prefer `attach_with_ids`
646    /// as the canonical path.
647    pub async fn attach_with(
648        &self,
649        operator_id: impl Into<String>,
650        role: Role,
651        ttl: Duration,
652        operator_info: crate::core::ctx::OperatorInfo,
653    ) -> Result<CapToken, EngineError> {
654        let operator_id = operator_id.into();
655        // The caller always hands in a fully-formed `OperatorInfo`
656        // (including its `kind`), so it is stored as an explicit "Runtime
657        // Global" tier request (`Some(kind)`) — this path never persists
658        // BP-level tiers (both stay empty below), so `Some(kind)` resolves
659        // to the same `kind` at dispatch either way; see
660        // `OperatorSession.operator_kind` doc.
661        let kind = operator_info.kind;
662        // BridgeRegistry auto-register: when the caller hands in an
663        // `Arc<dyn>` directly, register it under a synthesised ID (the inline
664        // path aware of persistence). Callers who want to pre-register with a
665        // named ID should use `register_senior_bridge` / `register_spawn_hook`
666        // + `attach_with_ids`.
667        let bridge_id = if let Some(bridge) = operator_info.senior_bridge.clone() {
668            let id = format!("br-{}", crate::types::uid_hex(8));
669            self.inner
670                .senior_bridges
671                .write()
672                .await
673                .insert(id.clone(), bridge);
674            Some(id)
675        } else {
676            None
677        };
678        let hook_id = if let Some(hook) = operator_info.spawn_hook.clone() {
679            let id = format!("hk-{}", crate::types::uid_hex(8));
680            self.inner
681                .spawn_hooks
682                .write()
683                .await
684                .insert(id.clone(), hook);
685            Some(id)
686        } else {
687            None
688        };
689        let operator_backend_id = if let Some(operator) = operator_info.operator.clone() {
690            let id = format!("op-{}", crate::types::uid_hex(8));
691            self.inner
692                .operators
693                .write()
694                .await
695                .insert(id.clone(), operator);
696            Some(id)
697        } else {
698            None
699        };
700
701        let token = self
702            .inner
703            .signer
704            .session(operator_id.clone(), role, vec!["*".into()], ttl);
705        let session_id = SessionId::new();
706        let nonce = token.nonce.clone();
707        let now = now_unix();
708        let token_for_store = token.clone();
709
710        self.with_state("attach_with", |s| {
711            s.tokens
712                .insert(nonce.clone(), CapTokenRecord::from_token(token_for_store));
713            s.sessions.insert(
714                session_id.clone(),
715                OperatorSession {
716                    id: session_id.clone(),
717                    operator_id,
718                    role,
719                    attached_at: now,
720                    last_seen: now,
721                    attached: true,
722                    owned_task_ids: Vec::new(),
723                    token_nonce: nonce.clone(),
724                    operator_kind: Some(kind),
725                    runtime_agent_kinds: HashMap::new(),
726                    bp_agent_kinds: HashMap::new(),
727                    bp_global_kind: None,
728                    bridge_id,
729                    hook_id,
730                    operator_backend_id,
731                },
732            );
733            s.push_event(Event::SessionAttached {
734                session_id: session_id.clone(),
735                role,
736            });
737        })
738        .await?;
739
740        let _ = self
741            .inner
742            .event_tx
743            .send(Event::SessionAttached { session_id, role });
744        Ok(token)
745    }
746
747    /// Mark the session bound to `token` as detached (`attached = false`).
748    /// Tasks are left in place — a later `attach`/`attach_with_ids` call
749    /// carrying the same registered bridge/hook IDs can pick them back up.
750    pub async fn detach(&self, token: &CapToken) -> Result<(), EngineError> {
751        self.verify_token(token, Verb::DetachSession).await?;
752        self.with_state("detach", |s| {
753            let sid = s
754                .sessions
755                .iter()
756                .find(|(_, sess)| sess.token_nonce == token.nonce)
757                .map(|(id, _)| id.clone());
758            if let Some(sid) = sid {
759                if let Some(sess) = s.sessions.get_mut(&sid) {
760                    sess.attached = false;
761                }
762                s.push_event(Event::SessionDetached {
763                    session_id: sid.clone(),
764                });
765                let _ = sid;
766            }
767        })
768        .await?;
769        Ok(())
770    }
771
772    /// Refresh the session's `last_seen` timestamp and mark it `attached`.
773    /// Called periodically by an attached client to avoid being flipped to
774    /// detached by `start_detach_loop`.
775    pub async fn heartbeat(&self, token: &CapToken) -> Result<(), EngineError> {
776        self.verify_token(token, Verb::Heartbeat).await?;
777        let now = now_unix();
778        self.with_state("heartbeat", |s| {
779            if let Some(sess) = s
780                .sessions
781                .values_mut()
782                .find(|sess| sess.token_nonce == token.nonce)
783            {
784                sess.last_seen = now;
785                sess.attached = true;
786            }
787        })
788        .await?;
789        Ok(())
790    }
791
792    // ═══════════════════════════════════════════════════════════════════════
793    // Task lifecycle
794    // ═══════════════════════════════════════════════════════════════════════
795
796    /// Create a new `TaskState` from `spec` and register its initial
797    /// prompt. When the calling token is a Worker (i.e. this is a
798    /// recursive spawn), the new task inherits `parent.spawn_depth + 1`
799    /// and is rejected with `SpawnDepthExceeded` once `max_spawn_depth` is
800    /// hit; an Operator-issued call starts at depth 0.
801    pub async fn start_task(
802        &self,
803        token: &CapToken,
804        spec: TaskSpec,
805    ) -> Result<TaskId, EngineError> {
806        self.verify_token(token, Verb::StartTask).await?;
807        let task_id = TaskId::new();
808        let directive = spec.initial_directive.clone();
809        let task_id_clone = task_id.clone();
810        let nonce = token.nonce.clone();
811        let max_depth = self.inner.cfg.max_spawn_depth;
812        self.with_state("start_task", move |s| {
813            // Recursive swarm depth gate (recursion guard):
814            // Worker tokens carry CapTokenRecord.parent_task_id. Give the
815            // child parent's spawn_depth + 1; if it exceeds `max`, raise an
816            // error. Operator tokens (parent_task_id=None) start at depth 0.
817            let parent_depth_opt = s
818                .tokens
819                .get(&nonce)
820                .and_then(|rec| rec.task_id.as_ref())
821                .and_then(|tid| s.tasks.get(tid))
822                .map(|t| t.spawn_depth);
823            let depth = match parent_depth_opt {
824                Some(d) => {
825                    if d + 1 >= max_depth {
826                        return Err(EngineError::SpawnDepthExceeded {
827                            current: d + 1,
828                            max: max_depth,
829                        });
830                    }
831                    d + 1
832                }
833                None => 0,
834            };
835
836            let mut task = TaskState::new(task_id_clone.clone(), spec);
837            task.spawn_depth = depth;
838            s.tasks.insert(task_id_clone.clone(), task);
839            s.prompts.insert((task_id_clone.clone(), 1), directive);
840            // Link to the owner session (only Operator tokens match; Worker tokens have no session).
841            if let Some(sess) = s
842                .sessions
843                .values_mut()
844                .find(|sess| sess.token_nonce == nonce)
845            {
846                sess.owned_task_ids.push(task_id_clone.clone());
847            }
848            s.push_event(Event::TaskCreated {
849                task_id: task_id_clone.clone(),
850            });
851            Ok::<(), EngineError>(())
852        })
853        .await??;
854        let _ = self.inner.event_tx.send(Event::TaskCreated {
855            task_id: task_id.clone(),
856        });
857        Ok(task_id)
858    }
859
860    /// Fetch a snapshot of `TaskState` for `task_id`, subject to the
861    /// task-ownership gate (see `verify_token_for_task`).
862    pub async fn read_task_state(
863        &self,
864        token: &CapToken,
865        task_id: &TaskId,
866    ) -> Result<TaskState, EngineError> {
867        self.verify_token_for_task(token, Verb::ReadTaskState, task_id)
868            .await?;
869        let task_id = task_id.clone();
870        self.with_state("read_task_state", move |s| {
871            s.tasks
872                .get(&task_id)
873                .cloned()
874                .ok_or_else(|| EngineError::TaskNotFound(task_id.0.clone()))
875        })
876        .await?
877    }
878
879    /// Mark `task_id` as `Cancelled` and wake any caller blocked in
880    /// `poll_task` for it.
881    pub async fn cancel_task(&self, token: &CapToken, task_id: &TaskId) -> Result<(), EngineError> {
882        self.verify_token_for_task(token, Verb::CancelTask, task_id)
883            .await?;
884        let tid = task_id.clone();
885        self.with_state("cancel_task", move |s| {
886            let task = s
887                .tasks
888                .get_mut(&tid)
889                .ok_or_else(|| EngineError::TaskNotFound(tid.0.clone()))?;
890            task.status = TaskStatus::Cancelled;
891            task.updated_at = now_unix();
892            s.push_event(Event::TaskCancelled {
893                task_id: tid.clone(),
894            });
895            Ok::<(), EngineError>(())
896        })
897        .await??;
898        self.wake_task(task_id).await?;
899        Ok(())
900    }
901
902    /// Dispatch a single attempt through the given `spawner`.
903    ///
904    /// The lock is only held for snapshot capture; the actual spawn and
905    /// completion await happen outside the lock (R3 discipline).
906    ///
907    /// Sits on the Domain side of the Data / Domain split. The dispatch
908    /// path itself does not touch big response bodies — those flow through
909    /// the Data plane (`output_store` module + sink / input_inject
910    /// `SpawnerLayer`s) around this method.
911    ///
912    /// The caller does the compile plus `service::linker::link` and
913    /// carries the same stack through each dispatch. Because the spawner
914    /// is passed per-request rather than looked up from engine-global
915    /// state, parallel requests against a single `Engine` instance
916    /// (different Blueprints, different spawners) do not race.
917    pub async fn dispatch_attempt_with(
918        &self,
919        token: &CapToken,
920        task_id: &TaskId,
921        spawner: &Arc<dyn SpawnerAdapter>,
922    ) -> Result<DispatchOutcome, EngineError> {
923        self.verify_token(token, Verb::DispatchAttempt).await?;
924        let task_id = task_id.clone();
925
926        // 1) Under the lock: increment the attempt number, mark Running, snapshot the
927        //    prompt, and pull `operator_info` from the session so we can inject it into Ctx.
928        let nonce = token.nonce.clone();
929        let tid_for_prep = task_id.clone();
930        let (attempt, agent, session_snapshot) = self
931            .with_state("dispatch.prep", move |s| {
932                let task = s
933                    .tasks
934                    .get_mut(&tid_for_prep)
935                    .ok_or_else(|| EngineError::TaskNotFound(tid_for_prep.0.clone()))?;
936                task.attempt += 1;
937                task.status = TaskStatus::Running;
938                task.updated_at = now_unix();
939                // The spawner pulls the prompt via engine.fetch_prompt. In prep,
940                // if the prompts table has no entry for this attempt yet,
941                // fall back and insert `initial_directive` so the subsequent
942                // fetch_prompt succeeds.
943                let attempt = task.attempt;
944                let initial = task.spec.initial_directive.clone();
945                s.prompts
946                    .entry((tid_for_prep.clone(), attempt))
947                    .or_insert(initial);
948                let task = s
949                    .tasks
950                    .get(&tid_for_prep)
951                    .ok_or_else(|| EngineError::TaskNotFound(tid_for_prep.0.clone()))?;
952                let agent = task.spec.agent.clone();
953                // Session snapshot (looked up by token nonce). When no session
954                // exists (worker token invoked directly / test injection), fall
955                // back to None → default OperatorInfo.
956                let sess_clone = s
957                    .sessions
958                    .values()
959                    .find(|sess| sess.token_nonce == nonce)
960                    .cloned();
961                Ok::<_, EngineError>((attempt, agent, sess_clone))
962            })
963            .await??;
964        // BridgeRegistry lookup + per-agent OperatorKind cascade.
965        let operator_info = match session_snapshot {
966            Some(sess) => self.resolve_operator_info(&sess, &agent).await,
967            None => OperatorInfo::default(),
968        };
969
970        // 2) Outside the lock: worker token mint + spawn.
971        //
972        // Session-style mint (max_uses=None). Within one attempt the worker is
973        // expected to hit `verify_token + fetch_prompt + fetch_data + post_result`
974        // multiple times in order, so `one_time` would exhaust the token on the
975        // very first verb. Capability is guarded by (a) the role × verb gate and
976        // (b) the short TTL (600s).
977        let worker_token = self.inner.signer.session(
978            format!("worker-of-{task_id}"),
979            Role::Worker,
980            vec!["*".into()],
981            Duration::from_secs(1800),
982        );
983        let worker_nonce = worker_token.nonce.clone();
984        let task_id_for_worker = task_id.clone();
985        let worker_token_for_store = worker_token.clone();
986        self.with_state("dispatch.mint_worker", move |s| {
987            s.tokens.insert(
988                worker_nonce.clone(),
989                CapTokenRecord::from_worker_token(worker_token_for_store, task_id_for_worker),
990            );
991        })
992        .await?;
993
994        // Mint a short handle (`wh-XXXXXXXX`) and register it in worker_handles.
995        // Used by the simplified Bearer path for SubAgents (short-handle form
996        // avoids base64 copy-paste incidents).
997        let worker_handle = self.mint_worker_handle(worker_token.nonce.clone()).await?;
998
999        let mut ctx = Ctx::new(task_id.clone(), attempt, agent.clone());
1000        ctx.operator = operator_info; // activates MainAIMiddleware / Senior bridge
1001        ctx.meta
1002            .runtime
1003            .insert("worker_handle".to_string(), Value::String(worker_handle));
1004
1005        let worker = spawner
1006            .spawn(self, &ctx, task_id.clone(), attempt, worker_token)
1007            .await
1008            .map_err(|e| EngineError::DispatchFailed(e.to_string()))?;
1009
1010        // 3) Outside the lock: await worker.join() (signal-only). WorkerError is
1011        //    stringified. The value is fetched via output_tail (sink path).
1012        let signal_result: Result<(), String> = worker.join().await.map_err(|e| e.to_string());
1013
1014        // Pull the last Final from output_tail and use it as the value.
1015        let value_ok: Result<(Value, bool), String> = match signal_result {
1016            Ok(()) => {
1017                let tail = self.output_tail(&task_id, attempt).await;
1018                let last_final = tail.iter().rev().find_map(|ev| match ev {
1019                    crate::worker::output::OutputEvent::Final { content, ok } => {
1020                        Some((content.clone(), *ok))
1021                    }
1022                    _ => None,
1023                });
1024                match last_final {
1025                    Some((crate::worker::output::ContentRef::Inline { value }, ok)) => {
1026                        Ok((value, ok))
1027                    }
1028                    Some((
1029                        crate::worker::output::ContentRef::FileRef {
1030                            path,
1031                            mime,
1032                            size_hint,
1033                        },
1034                        ok,
1035                    )) => Ok((
1036                        serde_json::json!({
1037                            "file_ref": path.to_string_lossy(),
1038                            "mime": mime,
1039                            "size_hint": size_hint,
1040                        }),
1041                        ok,
1042                    )),
1043                    None => Err("no Final in output_tail".to_string()),
1044                }
1045            }
1046            Err(msg) => Err(msg),
1047        };
1048
1049        // 4) Under the lock: apply (split the borrow scope so push_event and task mut can co-exist).
1050        let outcome = self
1051            .with_state("dispatch.apply", |s| {
1052                if !s.tasks.contains_key(&task_id) {
1053                    return Err(EngineError::TaskNotFound(task_id.0.clone()));
1054                }
1055                match value_ok {
1056                    Ok((value, ok)) => {
1057                        let pass = ok;
1058                        {
1059                            let task = s.tasks.get_mut(&task_id).unwrap();
1060                            task.last_result = Some(value.clone());
1061                            task.updated_at = now_unix();
1062                            task.status = if pass {
1063                                TaskStatus::Pass
1064                            } else {
1065                                TaskStatus::Blocked
1066                            };
1067                        }
1068                        s.push_event(Event::TaskAttemptCompleted {
1069                            task_id: task_id.clone(),
1070                            attempt,
1071                            result: value.clone(),
1072                        });
1073                        if pass {
1074                            s.push_event(Event::TaskPass {
1075                                task_id: task_id.clone(),
1076                                result: value.clone(),
1077                            });
1078                            Ok::<_, EngineError>(DispatchOutcome::Pass(value))
1079                        } else {
1080                            s.push_event(Event::TaskBlocked {
1081                                task_id: task_id.clone(),
1082                                result: value.clone(),
1083                            });
1084                            Ok(DispatchOutcome::Blocked(value))
1085                        }
1086                    }
1087                    Err(msg) => {
1088                        let task = s.tasks.get_mut(&task_id).unwrap();
1089                        task.status = TaskStatus::Blocked;
1090                        task.updated_at = now_unix();
1091                        Err(EngineError::DispatchFailed(msg))
1092                    }
1093                }
1094            })
1095            .await??;
1096
1097        // event broadcast (outside the lock — push_event feeds the in-memory tail; broadcast is a separate path).
1098        let _ = self.inner.event_tx.send(Event::TaskAttemptCompleted {
1099            task_id: task_id.clone(),
1100            attempt,
1101            result: match &outcome {
1102                DispatchOutcome::Pass(v) | DispatchOutcome::Blocked(v) => v.clone(),
1103                _ => Value::Null,
1104            },
1105        });
1106
1107        // Wake any callers waiting in poll_task.
1108        self.wake_task(&task_id).await?;
1109
1110        Ok(outcome)
1111    }
1112
1113    // ═══════════════════════════════════════════════════════════════════════
1114    // Worker-side API (= prompt / data fetch + result post)
1115    // ═══════════════════════════════════════════════════════════════════════
1116
1117    /// Fetch the directive/prompt string for `task_id`'s current attempt.
1118    /// Falls back to `initial_directive` when no prompt has been recorded
1119    /// yet for that attempt.
1120    pub async fn fetch_prompt(
1121        &self,
1122        token: &CapToken,
1123        task_id: &TaskId,
1124    ) -> Result<String, EngineError> {
1125        self.verify_token_for_task(token, Verb::FetchPrompt, task_id)
1126            .await?;
1127        let task_id = task_id.clone();
1128        self.with_state("fetch_prompt", move |s| {
1129            let task = s
1130                .tasks
1131                .get(&task_id)
1132                .ok_or_else(|| EngineError::TaskNotFound(task_id.0.clone()))?;
1133            s.prompts
1134                .get(&(task_id.clone(), task.attempt.max(1)))
1135                .cloned()
1136                .ok_or_else(|| {
1137                    EngineError::ResourceNotFound(format!(
1138                        "prompt({}, attempt={})",
1139                        task_id.0, task.attempt
1140                    ))
1141                })
1142        })
1143        .await?
1144    }
1145
1146    /// Combined fetch for `HTTP /v1/worker/prompt`: returns `prompt` +
1147    /// (optional) `system` + `agent` + `attempt` in a single round trip.
1148    /// The verb gate reuses `FetchPrompt` — same semantics as "the worker
1149    /// pulls its task input".
1150    ///
1151    /// `system` is the value written by `OperatorSpawner::spawn` through
1152    /// `bake_worker_system_prompt` when it ran; otherwise `None` (no
1153    /// profile present, or the bake never happened).
1154    pub async fn fetch_worker_payload(
1155        &self,
1156        token: &CapToken,
1157        task_id: &TaskId,
1158    ) -> Result<crate::types::WorkerPayload, EngineError> {
1159        self.verify_token_for_task(token, Verb::FetchPrompt, task_id)
1160            .await?;
1161        let task_id_clone = task_id.clone();
1162        self.with_state("fetch_worker_payload", move |s| {
1163            let task = s
1164                .tasks
1165                .get(&task_id_clone)
1166                .ok_or_else(|| EngineError::TaskNotFound(task_id_clone.0.clone()))?;
1167            let attempt = task.attempt.max(1);
1168            let prompt = s
1169                .prompts
1170                .get(&(task_id_clone.clone(), attempt))
1171                .cloned()
1172                .ok_or_else(|| {
1173                    EngineError::ResourceNotFound(format!(
1174                        "prompt({}, attempt={})",
1175                        task_id_clone.0, attempt
1176                    ))
1177                })?;
1178            let system = s
1179                .systems
1180                .get(&(task_id_clone.clone(), attempt))
1181                .cloned()
1182                .unwrap_or(None);
1183            let agent = task.spec.agent.clone();
1184            Ok::<_, EngineError>(crate::types::WorkerPayload {
1185                task_id: task_id_clone.0.clone(),
1186                attempt,
1187                agent,
1188                prompt,
1189                system,
1190            })
1191        })
1192        .await?
1193    }
1194
1195    /// Fetch a worker payload via a short handle. Skips token verification
1196    /// and returns `prompt` + `system` + `agent` + `attempt` in a thin
1197    /// path. The caller is expected to have already resolved `task_id`
1198    /// via `task_id_from_handle` — the handle's presence in
1199    /// `worker_handles` means it was minted server-side and is therefore
1200    /// trusted.
1201    pub async fn fetch_worker_payload_trusted(
1202        &self,
1203        task_id: &TaskId,
1204    ) -> Result<crate::types::WorkerPayload, EngineError> {
1205        let task_id_clone = task_id.clone();
1206        self.with_state("fetch_worker_payload_trusted", move |s| {
1207            let task = s
1208                .tasks
1209                .get(&task_id_clone)
1210                .ok_or_else(|| EngineError::TaskNotFound(task_id_clone.0.clone()))?;
1211            let attempt = task.attempt.max(1);
1212            let prompt = s
1213                .prompts
1214                .get(&(task_id_clone.clone(), attempt))
1215                .cloned()
1216                .ok_or_else(|| {
1217                    EngineError::ResourceNotFound(format!(
1218                        "prompt({}, attempt={})",
1219                        task_id_clone.0, attempt
1220                    ))
1221                })?;
1222            let system = s
1223                .systems
1224                .get(&(task_id_clone.clone(), attempt))
1225                .cloned()
1226                .unwrap_or(None);
1227            let agent = task.spec.agent.clone();
1228            Ok::<_, EngineError>(crate::types::WorkerPayload {
1229                task_id: task_id_clone.0.clone(),
1230                attempt,
1231                agent,
1232                prompt,
1233                system,
1234            })
1235        })
1236        .await?
1237    }
1238
1239    /// Read the current attempt number for a task (server-side lookup, no
1240    /// token verification). Used on `HTTP /v1/worker/result` when the
1241    /// worker omits `attempt` and the server has to fill it in.
1242    pub async fn task_attempt(&self, task_id: &TaskId) -> Result<u32, EngineError> {
1243        let task_id = task_id.clone();
1244        self.with_state("task_attempt", move |s| {
1245            s.tasks
1246                .get(&task_id)
1247                .map(|t| t.attempt)
1248                .ok_or_else(|| EngineError::TaskNotFound(task_id.0.clone()))
1249        })
1250        .await?
1251    }
1252
1253    /// Server-side admin API that lets `OperatorSpawner::spawn` bake the
1254    /// rendered `system_prompt` into engine state. There is no verb gate
1255    /// — the only expected caller is inside the spawner. SubAgents fetch
1256    /// this alongside the prompt on the `/v1/worker/prompt` path.
1257    pub async fn bake_worker_system_prompt(
1258        &self,
1259        task_id: &TaskId,
1260        attempt: u32,
1261        system: Option<String>,
1262    ) -> Result<(), EngineError> {
1263        let task_id = task_id.clone();
1264        self.with_state("bake_worker_system_prompt", move |s| {
1265            s.systems.insert((task_id, attempt), system);
1266        })
1267        .await?;
1268        Ok(())
1269    }
1270
1271    /// Fetch an arbitrary named resource previously stored via
1272    /// `set_resource`. Not task-scoped — any valid token with the
1273    /// `FetchData` verb may read any key.
1274    pub async fn fetch_data(&self, token: &CapToken, key: &str) -> Result<Value, EngineError> {
1275        self.verify_token(token, Verb::FetchData).await?;
1276        let key = key.to_string();
1277        self.with_state("fetch_data", move |s| {
1278            s.resources
1279                .get(&key)
1280                .cloned()
1281                .ok_or(EngineError::ResourceNotFound(key))
1282        })
1283        .await?
1284    }
1285
1286    // ───────────────────────────────────────────────────────────────────────
1287    // Output path.
1288    // ───────────────────────────────────────────────────────────────────────
1289
1290    /// Send one output event from inside a `SpawnerAdapter` or worker.
1291    /// Structuring is assumed to be complete by the time we cross the
1292    /// `SpawnerAdapter` boundary; this API just appends to the
1293    /// `OutputStore`, pushes to the `EventLog`, and (for `Final`) emits
1294    /// the `TaskAttemptCompleted` event.
1295    ///
1296    /// This is Domain-side plumbing: it feeds the engine's verdict flow,
1297    /// not the Data-plane store in the `output_store` module. It also
1298    /// does not wake the dispatch path — that is done through the
1299    /// spawner's completion oneshot when the worker terminates.
1300    pub async fn submit_output(
1301        &self,
1302        token: &crate::types::CapToken,
1303        task_id: &TaskId,
1304        attempt: u32,
1305        event: crate::worker::output::OutputEvent,
1306    ) -> Result<(), EngineError> {
1307        self.verify_token_for_task(token, crate::types::Verb::EmitOutput, task_id)
1308            .await?;
1309        let task_id_for_apply = task_id.clone();
1310        let event_clone = event.clone();
1311        self.with_state("submit_output", move |s| {
1312            s.output_store
1313                .entry((task_id_for_apply.clone(), attempt))
1314                .or_default()
1315                .push(event_clone.clone());
1316            s.push_event(crate::core::state::Event::WorkerOutput {
1317                task_id: task_id_for_apply,
1318                attempt,
1319                event: event_clone,
1320            });
1321        })
1322        .await?;
1323        Ok(())
1324    }
1325
1326    /// Snapshot the entire output tail for a given `(task_id, attempt)`.
1327    /// Used by the dispatch path when pulling `Final`, and by observers
1328    /// reading the trace.
1329    pub async fn output_tail(
1330        &self,
1331        task_id: &TaskId,
1332        attempt: u32,
1333    ) -> Vec<crate::worker::output::OutputEvent> {
1334        let key = (task_id.clone(), attempt);
1335        self.with_state("output_tail", move |s| {
1336            s.output_store.get(&key).cloned().unwrap_or_default()
1337        })
1338        .await
1339        .unwrap_or_default()
1340    }
1341
1342    /// Record an interim `last_result` for `task_id` without changing its
1343    /// `status`. Distinct from the terminal `Final` output event handled
1344    /// through `submit_output` / `dispatch_attempt_with`.
1345    pub async fn post_result(
1346        &self,
1347        token: &CapToken,
1348        task_id: &TaskId,
1349        result: Value,
1350    ) -> Result<(), EngineError> {
1351        self.verify_token_for_task(token, Verb::PostResult, task_id)
1352            .await?;
1353        let task_id = task_id.clone();
1354        let result_clone = result.clone();
1355        self.with_state("post_result", move |s| {
1356            let task = s
1357                .tasks
1358                .get_mut(&task_id)
1359                .ok_or_else(|| EngineError::TaskNotFound(task_id.0.clone()))?;
1360            task.last_result = Some(result_clone);
1361            task.updated_at = now_unix();
1362            Ok::<(), EngineError>(())
1363        })
1364        .await??;
1365        Ok(())
1366    }
1367
1368    /// Store a named resource value, retrievable later via `fetch_data`.
1369    /// No token is required — this is a server-side/admin-style setter
1370    /// (mirrors `bake_worker_system_prompt`).
1371    pub async fn set_resource(
1372        &self,
1373        key: impl Into<String>,
1374        value: Value,
1375    ) -> Result<(), EngineError> {
1376        let key = key.into();
1377        self.with_state("set_resource", move |s| {
1378            s.resources.insert(key, value);
1379        })
1380        .await?;
1381        Ok(())
1382    }
1383
1384    // ═══════════════════════════════════════════════════════════════════════
1385    // Senior suspend / resume
1386    // ═══════════════════════════════════════════════════════════════════════
1387
1388    /// Ask a question of the Senior, mark the task `Suspended`, and
1389    /// return a `ResumeKey`. The suspended state persists until another
1390    /// task calls `resume(key, answer)`.
1391    ///
1392    /// Resume-side waiting is `Notify`-based, so a caller (typically
1393    /// MainAI) can detach, reattach from a different process, and still
1394    /// pull the answer out via `await_resume(key, timeout)` — the answer
1395    /// is stored inside `EngineState`.
1396    pub async fn query_senior(
1397        &self,
1398        token: &CapToken,
1399        task_id: &TaskId,
1400        question: Value,
1401    ) -> Result<ResumeKey, EngineError> {
1402        self.verify_token(token, Verb::QuerySenior).await?;
1403        let task_id = task_id.clone();
1404        let key = ResumeKey::for_senior(&task_id);
1405        let task_notify = self
1406            .with_state("query_senior.notify_ensure", |s| {
1407                s.ensure_task_notify(&task_id)
1408            })
1409            .await?;
1410
1411        let key_clone = key.clone();
1412        let task_id_inner = task_id.clone();
1413        let question_clone = question.clone();
1414        self.with_state("query_senior.suspend", move |s| {
1415            let task = s
1416                .tasks
1417                .get_mut(&task_id_inner)
1418                .ok_or_else(|| EngineError::TaskNotFound(task_id_inner.0.clone()))?;
1419            task.status = TaskStatus::Suspended;
1420            task.suspended_on = Some(key_clone.clone());
1421            task.updated_at = now_unix();
1422            s.pending_resumes
1423                .insert(key_clone.clone(), ResumePending::new());
1424            s.push_event(Event::SeniorQueried {
1425                task_id: task_id_inner.clone(),
1426                question: question_clone.clone(),
1427            });
1428            s.push_event(Event::TaskSuspended {
1429                task_id: task_id_inner.clone(),
1430                key: key_clone.clone(),
1431            });
1432            Ok::<(), EngineError>(())
1433        })
1434        .await??;
1435
1436        // Notify callers waiting for a task status change (Running → Suspended).
1437        task_notify.notify_waiters();
1438
1439        let _ = self
1440            .inner
1441            .event_tx
1442            .send(Event::SeniorQueried { task_id, question });
1443        Ok(key)
1444    }
1445
1446    /// Store the answer for a `ResumeKey` in `EngineState` and wake the
1447    /// waiting caller via `Notify`. Also flips the suspended task's
1448    /// status back to `Running` and fires the per-task notifier.
1449    pub async fn resume(&self, key: ResumeKey, answer: Value) -> Result<(), EngineError> {
1450        let answer_for_state = answer.clone();
1451        let answer_for_event = answer.clone();
1452        let key_clone = key.clone();
1453        let (notify, task_notify, task_id_opt) = self
1454            .with_state("resume.set", move |s| {
1455                let pending = s
1456                    .pending_resumes
1457                    .get_mut(&key_clone)
1458                    .ok_or(EngineError::ResumeKeyNotFound)?;
1459                pending.answer = Some(answer_for_state);
1460                let notify = pending.notify.clone();
1461
1462                let task_id = s
1463                    .tasks
1464                    .iter()
1465                    .find(|(_, t)| t.suspended_on.as_ref() == Some(&key_clone))
1466                    .map(|(id, _)| id.clone());
1467
1468                let task_notify = task_id.as_ref().map(|tid| s.ensure_task_notify(tid));
1469
1470                if let Some(tid) = &task_id {
1471                    if let Some(task) = s.tasks.get_mut(tid) {
1472                        task.suspended_on = None;
1473                        task.status = TaskStatus::Running;
1474                        task.updated_at = now_unix();
1475                    }
1476                    s.push_event(Event::TaskResumed {
1477                        task_id: tid.clone(),
1478                        key: key_clone.clone(),
1479                    });
1480                    s.push_event(Event::SeniorAnswered {
1481                        task_id: tid.clone(),
1482                        answer: answer_for_event.clone(),
1483                    });
1484                }
1485                Ok::<_, EngineError>((notify, task_notify, task_id))
1486            })
1487            .await??;
1488
1489        // Outside the lock: notify_waiters for both the ResumePending and task-status waits.
1490        notify.notify_waiters();
1491        if let Some(n) = task_notify {
1492            n.notify_waiters();
1493        }
1494
1495        if let Some(tid) = task_id_opt {
1496            let _ = self
1497                .inner
1498                .event_tx
1499                .send(Event::TaskResumed { task_id: tid, key });
1500        }
1501        Ok(())
1502    }
1503
1504    /// Wait for the resume answer. Even if the caller (an Operator)
1505    /// detached and reattached, the answer is available immediately here
1506    /// — if it was already stored, this returns without waiting on the
1507    /// notifier.
1508    ///
1509    /// `timeout = Duration::ZERO` performs an instant check without
1510    /// waiting.
1511    pub async fn await_resume(
1512        &self,
1513        key: ResumeKey,
1514        timeout: Duration,
1515    ) -> Result<Value, EngineError> {
1516        // (1) Under the lock: clone the notify handle and check for an existing answer.
1517        let key_clone = key.clone();
1518        let (notify, existing) = self
1519            .with_state("await_resume.snapshot", move |s| {
1520                let pending = s
1521                    .pending_resumes
1522                    .get(&key_clone)
1523                    .ok_or(EngineError::ResumeKeyNotFound)?;
1524                Ok::<_, EngineError>((pending.notify.clone(), pending.answer.clone()))
1525            })
1526            .await??;
1527
1528        // (2) If an answer has already been stored, return immediately (detach / reattach pattern).
1529        if let Some(v) = existing {
1530            return Ok(v);
1531        }
1532
1533        // (3) Outside the lock: wait on the notify with a timeout.
1534        if timeout.is_zero() {
1535            return Err(EngineError::PollTimeout);
1536        }
1537        let waited = tokio::time::timeout(timeout, notify.notified()).await;
1538        if waited.is_err() {
1539            return Err(EngineError::PollTimeout);
1540        }
1541
1542        // (4) Under the lock: re-read the answer (should be present now that we were notified).
1543        let key_clone = key.clone();
1544        self.with_state("await_resume.read", move |s| {
1545            let pending = s
1546                .pending_resumes
1547                .get(&key_clone)
1548                .ok_or(EngineError::ResumeKeyNotFound)?;
1549            pending
1550                .answer
1551                .clone()
1552                .ok_or_else(|| EngineError::Internal("notified but answer missing".into()))
1553        })
1554        .await?
1555    }
1556
1557    // ═══════════════════════════════════════════════════════════════════════
1558    // poll_task — the "wait" path that waits for task-status changes (works for long-poll and regular wait).
1559    // ═══════════════════════════════════════════════════════════════════════
1560
1561    /// Wait until the task's status **transitions to terminal or
1562    /// `Suspended`**, then return the latest `TaskState`. Returns
1563    /// immediately if the task is already in a terminal state.
1564    /// Exceeding the timeout returns `EngineError::PollTimeout`.
1565    ///
1566    /// A `hold` of `Duration::from_secs(0)` returns a snapshot immediately
1567    /// (no wait). Larger holds — tens of minutes up to days — are fine;
1568    /// the wait state is kept in memory inside the engine and does not
1569    /// degrade.
1570    pub async fn poll_task(
1571        &self,
1572        token: &CapToken,
1573        task_id: &TaskId,
1574        hold: Duration,
1575    ) -> Result<TaskState, EngineError> {
1576        self.verify_token_for_task(token, Verb::PollTask, task_id)
1577            .await?;
1578        let task_id_inner = task_id.clone();
1579
1580        // (1) Under the lock: take a snapshot and clone task_notify.
1581        let (state, notify) = self
1582            .with_state("poll_task.snapshot", move |s| {
1583                let task = s
1584                    .tasks
1585                    .get(&task_id_inner)
1586                    .cloned()
1587                    .ok_or_else(|| EngineError::TaskNotFound(task_id_inner.0.clone()))?;
1588                let notify = s.ensure_task_notify(&task_id_inner);
1589                Ok::<_, EngineError>((task, notify))
1590            })
1591            .await??;
1592
1593        // (2) Immediate-return condition: already terminal / Suspended (nothing left to wait on).
1594        if matches!(
1595            state.status,
1596            TaskStatus::Pass | TaskStatus::Blocked | TaskStatus::Cancelled | TaskStatus::Suspended
1597        ) {
1598            return Ok(state);
1599        }
1600        if hold.is_zero() {
1601            return Ok(state);
1602        }
1603
1604        // (3) Outside the lock: wait on Notify with a timeout.
1605        let waited = tokio::time::timeout(hold, notify.notified()).await;
1606        if waited.is_err() {
1607            return Err(EngineError::PollTimeout);
1608        }
1609
1610        // (4) Under the lock: take a fresh snapshot.
1611        let task_id_inner = task_id.clone();
1612        self.with_state("poll_task.reread", move |s| {
1613            s.tasks
1614                .get(&task_id_inner)
1615                .cloned()
1616                .ok_or_else(|| EngineError::TaskNotFound(task_id_inner.0.clone()))
1617        })
1618        .await?
1619    }
1620
1621    // ═══════════════════════════════════════════════════════════════════════
1622    // Background: heartbeat miss → detach loop
1623    // ═══════════════════════════════════════════════════════════════════════
1624
1625    /// Background loop that scans sessions every `heartbeat_interval` and
1626    /// flips `attached = false` on any session whose `last_seen` exceeds
1627    /// `heartbeat_miss_threshold * interval`.
1628    ///
1629    /// The tasks themselves are kept (assuming
1630    /// `keepalive_on_idle = true`), so another client can reattach with
1631    /// the same token and resume immediately. Dropping the returned
1632    /// `JoinHandle` does not stop the loop — the handle exists so callers
1633    /// who want to abort can hold onto it.
1634    pub fn start_detach_loop(&self) -> tokio::task::JoinHandle<()> {
1635        let engine = self.clone();
1636        let cfg = self.inner.cfg.long_hold.clone();
1637        let interval = cfg.heartbeat_interval;
1638        let miss_secs = cfg.heartbeat_interval.as_secs() * cfg.heartbeat_miss_threshold as u64;
1639
1640        tokio::spawn(async move {
1641            let mut ticker = tokio::time::interval(interval);
1642            ticker.tick().await; // first tick is immediate
1643            loop {
1644                ticker.tick().await;
1645                let now = now_unix();
1646                let detached = engine
1647                    .with_state("detach_loop.scan", |s| {
1648                        let mut detached = Vec::new();
1649                        for (sid, sess) in s.sessions.iter_mut() {
1650                            if !sess.attached {
1651                                continue;
1652                            }
1653                            if now.saturating_sub(sess.last_seen) >= miss_secs {
1654                                sess.attached = false;
1655                                detached.push(sid.clone());
1656                            }
1657                        }
1658                        for sid in &detached {
1659                            s.push_event(Event::SessionDetached {
1660                                session_id: sid.clone(),
1661                            });
1662                        }
1663                        detached
1664                    })
1665                    .await
1666                    .unwrap_or_default();
1667                for sid in detached {
1668                    let _ = engine
1669                        .inner
1670                        .event_tx
1671                        .send(Event::SessionDetached { session_id: sid });
1672                }
1673            }
1674        })
1675    }
1676
1677    /// Helper: wake a task whose status has changed. Called from the
1678    /// method body outside the lock.
1679    async fn wake_task(&self, task_id: &TaskId) -> Result<(), EngineError> {
1680        let task_id = task_id.clone();
1681        let notify_opt = self
1682            .with_state("wake_task.get_notify", move |s| {
1683                s.task_notifies.get(&task_id).cloned()
1684            })
1685            .await?;
1686        if let Some(n) = notify_opt {
1687            n.notify_waiters();
1688        }
1689        Ok(())
1690    }
1691}
1692
1693// ─── UT: `OperatorKind` "Runtime Global" tier — `Option` semantics ─────────
1694//
1695// Regression coverage for the "explicit Automate is indistinguishable from
1696// unspecified" defect: `OperatorSession.operator_kind` (and the
1697// `attach_with_ids` `kind` parameter it stores) is `Option<OperatorKind>`,
1698// so `Some(Automate)` is an explicit Runtime Global request that must
1699// outrank `bp_global`, while `None` must let `bp_global` decide. Exercises
1700// the real `resolve_operator_info` cascade path (not just
1701// `collapse_operator_kind` in isolation), attaching via `attach_with_ids`
1702// exactly as `TaskLaunchService::launch` does.
1703#[cfg(test)]
1704mod resolve_operator_info_runtime_global_tests {
1705    use super::*;
1706
1707    async fn attach_and_resolve(
1708        runtime_global: Option<OperatorKind>,
1709        bp_global: Option<OperatorKind>,
1710    ) -> OperatorInfo {
1711        let engine = Engine::new(EngineCfg::default());
1712        let token = engine
1713            .attach_with_ids(
1714                "ut-op",
1715                Role::Operator,
1716                Duration::from_secs(30),
1717                runtime_global,
1718                None,
1719                None,
1720                None,
1721                HashMap::new(),
1722                HashMap::new(),
1723                bp_global,
1724            )
1725            .await
1726            .expect("attach_with_ids ok");
1727        let session = engine
1728            .with_state("test.find_session", |s| {
1729                s.sessions
1730                    .values()
1731                    .find(|sess| sess.token_nonce == token.nonce)
1732                    .cloned()
1733            })
1734            .await
1735            .expect("with_state ok")
1736            .expect("session present after attach_with_ids");
1737        engine.resolve_operator_info(&session, "agent-x").await
1738    }
1739
1740    #[tokio::test]
1741    async fn explicit_some_automate_outranks_bp_global_main_ai() {
1742        // Runtime Global explicitly requests Automate; bp_global is MainAi.
1743        // The explicit `Some(Automate)` must win — this is exactly the case
1744        // the old `== OperatorKind::default()` convention got wrong (it
1745        // could not tell "explicitly Automate" from "unspecified" and would
1746        // have let `bp_global` (MainAi) take over instead).
1747        let info =
1748            attach_and_resolve(Some(OperatorKind::Automate), Some(OperatorKind::MainAi)).await;
1749        assert_eq!(
1750            info.kind,
1751            OperatorKind::Automate,
1752            "explicit Some(Automate) runtime_global must outrank bp_global MainAi"
1753        );
1754    }
1755
1756    #[tokio::test]
1757    async fn none_lets_bp_global_main_ai_win() {
1758        // Runtime Global left unspecified (`None`); bp_global is MainAi.
1759        // With nothing more specific set, `bp_global` must decide.
1760        let info = attach_and_resolve(None, Some(OperatorKind::MainAi)).await;
1761        assert_eq!(
1762            info.kind,
1763            OperatorKind::MainAi,
1764            "None runtime_global must let bp_global MainAi win"
1765        );
1766    }
1767}