Skip to main content

mlua_swarm/
middleware.rs

1//! Middleware overlay — cross-cutting concerns (Audit / MainAI / Senior /
2//! LongHold).
3//!
4//! Ships four `SpawnerLayer` implementations plus the `SpawnerStack` builder.
5//! Some layers key off `Ctx.operator.kind` and only fire for
6//! `MainAi` / `Composite` sessions; others (`Audit` / `LongHold`) apply
7//! uniformly across every kind.
8//!
9//! # Extension discipline — this layer is THE extension point (canonical)
10//!
11//! Background: an earlier iteration grew a verdict-specialised machinery
12//! (`judgment.rs` canonical type + 3-form parser + `state.agent_verdicts`
13//! map + dedicated accessor) that re-interpreted agent output *inside the
14//! engine core* and banned string-literal conds in favour of a Blueprint
15//! compile-layer translation. That whole complex was dismantled: the value
16//! it added over plain data was zero, while it created an IN-side dialect
17//! that every consumer had to learn. The design conclusion is a
18//! three-principle layering:
19//!
20//! 1. **IN is immutable, canonical form is JSON.** `Blueprint` /
21//!    `mlua_flow_ir::Node` are plain serde data. No compile pass, no schema
22//!    field that the engine expands, no Rust helper that builds `Expr`s.
23//!    Flow control is written literally in Flow.ir:
24//!    `Eq(Path("$.<step>.verdict"), Lit("blocked"))` — domain verdicts are
25//!    plain strings inside step output, consumed by plain conds.
26//! 2. **Generation (authoring sugar) lives OUT**, on the consumer side
27//!    (e.g. a vendored pure-Lua builder that prints Blueprint JSON). It
28//!    never leaks into engine / schema crates, whatever language it is
29//!    written in — the ban is on the *placement*, not the language.
30//! 3. **Runtime extension lives HERE, as a `SpawnerLayer`.** A middleware
31//!    (or any future extension mechanism) may interpret the *results* of a
32//!    Flow.ir run — `Ctx`, the `output_tail`, `Final { ok }` — in its own
33//!    way and transform them. What it must NOT do:
34//!    - introduce a new dialect on the IN side (schema fields / node
35//!      rewriting / cond translation) — extensions read and transform, the
36//!      wire format stays plain Flow.ir + JSON;
37//!    - hide its effect: overrides are *appended* to the output tail
38//!      (e.g. `SeniorEscalationMiddleware` pushes an override `Final`
39//!      rather than mutating the recorded one), so the trace stays
40//!      replayable and the flow stays observable;
41//!    - accumulate private engine state keyed by its own semantics (the
42//!      `agent_verdicts` anti-pattern) — state lives in ctx / output store
43//!      as plain data.
44//!
45//! `AgentResolver`, `ProjectNameAliasMiddleware`, `SinkMiddleware`,
46//! `InputInjectMiddleware`, `LuaMiddleware`, `SeniorEscalationMiddleware`
47//! all follow this shape: edit `ctx` / wrap the worker, call the inner
48//! spawner, append observable output. Note `LuaMiddleware`'s scripts are
49//! host-constructed — embedding Lua source in a Blueprint is the IN-side
50//! dialect this discipline forbids, and would require its own guard
51//! design if ever revisited).
52
53pub mod input_inject;
54pub mod lua_layer;
55pub mod project_name_alias;
56pub mod resolver;
57pub mod sink;
58pub mod worker_binding;
59
60use crate::core::ctx::{Ctx, OperatorKind};
61use crate::core::engine::Engine;
62use crate::core::state::Event;
63use crate::types::{CapToken, TaskId};
64use crate::worker::adapter::{SpawnError, SpawnerAdapter};
65use crate::worker::output::{ContentRef, OutputEvent};
66use crate::worker::{wrap_join, MiddlewareWorker, Worker, WorkerJoinHandler};
67use async_trait::async_trait;
68use serde_json::Value;
69use std::sync::Arc;
70use std::time::{Duration, Instant};
71use tokio::sync::broadcast;
72
73/// Pull the terminal `Final` event's `(value, ok)` out of the tail (works
74/// for both `Inline` and `FileRef` content).
75async fn pull_final_value_ok(
76    engine: &Engine,
77    task_id: &TaskId,
78    attempt: u32,
79) -> Option<(Value, bool)> {
80    let tail = engine.output_tail(task_id, attempt).await;
81    tail.iter().rev().find_map(|ev| match ev {
82        OutputEvent::Final {
83            content: ContentRef::Inline { value },
84            ok,
85        } => Some((value.clone(), *ok)),
86        OutputEvent::Final {
87            content: ContentRef::FileRef { path, .. },
88            ok,
89        } => Some((serde_json::json!({"file_ref": path.to_string_lossy()}), *ok)),
90        _ => None,
91    })
92}
93
94/// Layer trait — one middleware stage wrapping a `SpawnerAdapter`.
95pub trait SpawnerLayer: Send + Sync + 'static {
96    /// Wraps `inner` in this layer's behaviour, returning a new
97    /// `SpawnerAdapter` that delegates to `inner` (directly or via
98    /// `wrap_join`) while adding this layer's cross-cutting effect.
99    fn wrap(&self, inner: Arc<dyn SpawnerAdapter>) -> Arc<dyn SpawnerAdapter>;
100}
101
102/// Stack builder that layers `SpawnerLayer`s on top of a base adapter.
103///
104/// Each `.layer(...)` call wraps a new **outer** stage — same ergonomics as
105/// `tower::ServiceBuilder`.
106pub struct SpawnerStack {
107    inner: Arc<dyn SpawnerAdapter>,
108}
109
110impl SpawnerStack {
111    /// Starts a stack with `base` as the innermost adapter.
112    pub fn new(base: Arc<dyn SpawnerAdapter>) -> Self {
113        Self { inner: base }
114    }
115
116    /// Wraps the current stack with a statically-typed `SpawnerLayer`,
117    /// becoming the new outermost stage.
118    pub fn layer<L: SpawnerLayer>(mut self, layer: L) -> Self {
119        self.inner = layer.wrap(self.inner);
120        self
121    }
122
123    /// Dynamically-typed variant taking `Arc<dyn SpawnerLayer>`. Used via
124    /// the `LayerRegistry` resolution path (where a factory returns
125    /// `Arc<dyn ...>`).
126    pub fn layer_dyn(mut self, layer: Arc<dyn SpawnerLayer>) -> Self {
127        self.inner = layer.wrap(self.inner);
128        self
129    }
130
131    /// Finishes the stack, returning the fully-wrapped adapter.
132    pub fn build(self) -> Arc<dyn SpawnerAdapter> {
133        self.inner
134    }
135}
136
137// ─── SpawnerLayerFactory + LayerRegistry ─────────────────────────────────
138//
139// # Design rationale
140//
141// Wiring is assembled per-launch through `TaskLaunchService.launch`:
142//
143//   Compiler.compile(bp) ─┬─→ compiled.router (CompiledAgentTable: agent name → SpawnerAdapter dispatch)
144//                         │
145//                         │   service::linker::link(router, bp.spawner_hints.layers, &engine)
146//                         │     internal:
147//                         │       SpawnerStack::new(router)
148//                         │         .layer_dyn(base_factory_n(engine))   ← every LayerRegistry.base entry
149//                         │         .layer_dyn(hint_factory(engine))     ← resolves each bp.spawner_hints.layers key
150//                         │         .build()
151//                         ▼
152//                   EngineDispatcher::with_spawner(engine, op_token, stacked)
153//                         ▼
154//                   engine.dispatch_attempt_with(op_token, task_id, &stacked)
155//
156// # base vs hint — when to use each
157//
158// - **base layer**: wrapped around every Blueprint. Example: AuditMiddleware
159//   (a mandatory EventLog audit). The caller registers with
160//   `LayerRegistry::with_base(|e| Arc::new(AuditMiddleware::new(e.event_tx())))`.
161//
162// - **hint layer**: wrapped **only when the Blueprint declares the key** in
163//   `spawner_hints.layers`. Examples: MainAIMiddleware /
164//   SeniorEscalationMiddleware / OperatorDelegateMiddleware. The Blueprint
165//   only declares a capability key (e.g. `"main_ai"`) without knowing the
166//   implementation; the engine-side LayerRegistry resolves key → factory,
167//   keeping the pure Flow layer separate from implementation details.
168//
169// # Factory pattern (handles layers that need Engine context)
170//
171// We do not hold `Arc<dyn SpawnerLayer>` directly because some layers
172// depend on the engine instance — for example AuditMiddleware needs
173// `engine.event_tx()` and can only be built after the engine exists. A
174// factory closure defers construction: the Layer instance is created only
175// when the engine is handed in.
176
177/// Factory closure for a `SpawnerLayer`. The caller registers these at
178/// startup, and they are called with the engine context at bind time.
179/// Stateless layers can use `|_engine| Arc::new(MyLayer)`; layers that need
180/// something like `event_tx` should do `|engine| Arc::new(MyLayer::new(engine.event_tx()))`.
181pub type LayerFactory =
182    Arc<dyn Fn(&crate::core::engine::Engine) -> Arc<dyn SpawnerLayer> + Send + Sync + 'static>;
183
184/// Registry of `LayerFactory`s, split into `base` (always applied) and
185/// `hints` (applied only when a Blueprint declares the matching key in
186/// `spawner_hints.layers`). See the module-level `# Factory pattern`
187/// notes above for why factories rather than pre-built layers.
188#[derive(Default, Clone)]
189pub struct LayerRegistry {
190    base: Vec<LayerFactory>,
191    hints: std::collections::HashMap<String, LayerFactory>,
192}
193
194impl LayerRegistry {
195    /// Empty registry (no base layers, no hint layers).
196    pub fn new() -> Self {
197        Self::default()
198    }
199
200    /// Register a base layer factory that is applied on every Blueprint bind
201    /// (for layers that must fire for every task — e.g. `AuditMiddleware`).
202    pub fn with_base<F>(mut self, factory: F) -> Self
203    where
204        F: Fn(&crate::core::engine::Engine) -> Arc<dyn SpawnerLayer> + Send + Sync + 'static,
205    {
206        self.base.push(Arc::new(factory));
207        self
208    }
209
210    /// Register a layer factory addressable by hint key. If
211    /// `Blueprint.spawner_hints.layers` lists the same key, it is wrapped at
212    /// bind time; otherwise it is a no-op.
213    pub fn with_hint<F>(mut self, key: impl Into<String>, factory: F) -> Self
214    where
215        F: Fn(&crate::core::engine::Engine) -> Arc<dyn SpawnerLayer> + Send + Sync + 'static,
216    {
217        self.hints.insert(key.into(), Arc::new(factory));
218        self
219    }
220
221    /// All registered base-layer factories, in registration order.
222    pub fn base_factories(&self) -> &[LayerFactory] {
223        &self.base
224    }
225
226    /// Looks up the hint-layer factory registered under `key`, if any.
227    pub fn lookup_hint(&self, key: &str) -> Option<&LayerFactory> {
228        self.hints.get(key)
229    }
230}
231
232// ─── AuditMiddleware (pushes into the EventLog broadcast path) ────────────
233
234/// Mandatory base layer that emits `Event::TaskAttemptStarted` on every
235/// spawn, before delegating. This is the audit trail's entry point into
236/// the EventLog broadcast channel.
237pub struct AuditMiddleware {
238    /// Broadcast sender the EventLog subscribes to.
239    pub event_tx: broadcast::Sender<Event>,
240}
241
242impl AuditMiddleware {
243    /// Wraps a broadcast sender to notify on every spawn.
244    pub fn new(event_tx: broadcast::Sender<Event>) -> Self {
245        Self { event_tx }
246    }
247}
248
249impl SpawnerLayer for AuditMiddleware {
250    fn wrap(&self, inner: Arc<dyn SpawnerAdapter>) -> Arc<dyn SpawnerAdapter> {
251        Arc::new(AuditWrapped {
252            inner,
253            event_tx: self.event_tx.clone(),
254        })
255    }
256}
257
258struct AuditWrapped {
259    inner: Arc<dyn SpawnerAdapter>,
260    event_tx: broadcast::Sender<Event>,
261}
262
263#[async_trait]
264impl SpawnerAdapter for AuditWrapped {
265    async fn spawn(
266        &self,
267        engine: &Engine,
268        ctx: &Ctx,
269        task_id: TaskId,
270        attempt: u32,
271        token: CapToken,
272    ) -> Result<Box<dyn Worker>, SpawnError> {
273        let _ = self.event_tx.send(Event::TaskAttemptStarted {
274            task_id: task_id.clone(),
275            attempt,
276        });
277        self.inner.spawn(engine, ctx, task_id, attempt, token).await
278    }
279}
280
281// ─── MainAIMiddleware (fires SpawnHook before/after for MainAI/Composite) ─
282
283/// Hint layer that fires `ctx.operator.spawn_hook.before`/`after` around
284/// a spawn, but only for `MainAi` / `Composite` sessions. No-op for
285/// other kinds (still delegates, just skips the hook calls).
286pub struct MainAIMiddleware;
287
288impl MainAIMiddleware {
289    /// Stateless constructor.
290    pub fn new() -> Self {
291        Self
292    }
293}
294
295impl Default for MainAIMiddleware {
296    fn default() -> Self {
297        Self::new()
298    }
299}
300
301impl SpawnerLayer for MainAIMiddleware {
302    fn wrap(&self, inner: Arc<dyn SpawnerAdapter>) -> Arc<dyn SpawnerAdapter> {
303        Arc::new(MainAIWrapped { inner })
304    }
305}
306
307struct MainAIWrapped {
308    inner: Arc<dyn SpawnerAdapter>,
309}
310
311#[async_trait]
312impl SpawnerAdapter for MainAIWrapped {
313    async fn spawn(
314        &self,
315        engine: &Engine,
316        ctx: &Ctx,
317        task_id: TaskId,
318        attempt: u32,
319        token: CapToken,
320    ) -> Result<Box<dyn Worker>, SpawnError> {
321        let mainai = matches!(
322            ctx.operator.kind,
323            OperatorKind::MainAi | OperatorKind::Composite
324        );
325        if mainai {
326            if let Some(hook) = &ctx.operator.spawn_hook {
327                hook.before(ctx)
328                    .await
329                    .map_err(SpawnError::RejectedByMiddleware)?;
330            }
331        }
332
333        let handle = self
334            .inner
335            .spawn(engine, ctx, task_id.clone(), attempt, token)
336            .await?;
337
338        if !mainai {
339            return Ok(handle);
340        }
341        let Some(hook) = ctx.operator.spawn_hook.clone() else {
342            return Ok(handle);
343        };
344
345        // Wrap the completion signal and call hook.after on finish.
346        // Pull the last Final from engine.output_tail as the value.
347        let ctx_clone = ctx.clone();
348        let engine_clone = engine.clone();
349        let task_id_clone = task_id.clone();
350        Ok(wrap_join(handle, move |signal| {
351            let hook = hook.clone();
352            let ctx_clone = ctx_clone.clone();
353            let engine_clone = engine_clone.clone();
354            let task_id_clone = task_id_clone.clone();
355            async move {
356                let v = match &signal {
357                    Ok(()) => pull_final_value_ok(&engine_clone, &task_id_clone, attempt)
358                        .await
359                        .map(|(v, _)| v)
360                        .unwrap_or(Value::Null),
361                    Err(e) => Value::String(e.to_string()),
362                };
363                let _ = hook.after(&ctx_clone, &v).await;
364                signal
365            }
366        }))
367    }
368}
369
370// ─── SeniorEscalationMiddleware ───────────────────────────────────────────
371//
372// When a spawn's completion is `ok=false` and `ctx.operator.senior_bridge` is
373// Some, this auxiliary layer calls `SeniorBridge.ask`, merges the answer into
374// `WorkerResult.value` under `"senior_answer"`, and upgrades the result to
375// `ok=true`. Retry / re-dispatch is the engine (operator) side's job; this
376// layer only injects fresh material for that decision.
377
378/// Hint layer: on `ok=false` completion with `ctx.operator.senior_bridge`
379/// set, asks the bridge for guidance and pushes an override `Final`
380/// (`ok=true`) carrying `senior_answer`. See the module comment above
381/// this type for the full contract.
382pub struct SeniorEscalationMiddleware;
383
384impl SeniorEscalationMiddleware {
385    /// Stateless constructor.
386    pub fn new() -> Self {
387        Self
388    }
389}
390
391impl Default for SeniorEscalationMiddleware {
392    fn default() -> Self {
393        Self::new()
394    }
395}
396
397impl SpawnerLayer for SeniorEscalationMiddleware {
398    fn wrap(&self, inner: Arc<dyn SpawnerAdapter>) -> Arc<dyn SpawnerAdapter> {
399        Arc::new(SeniorWrapped { inner })
400    }
401}
402
403struct SeniorWrapped {
404    inner: Arc<dyn SpawnerAdapter>,
405}
406
407#[async_trait]
408impl SpawnerAdapter for SeniorWrapped {
409    async fn spawn(
410        &self,
411        engine: &Engine,
412        ctx: &Ctx,
413        task_id: TaskId,
414        attempt: u32,
415        token: CapToken,
416    ) -> Result<Box<dyn Worker>, SpawnError> {
417        let bridge = ctx.operator.senior_bridge.clone();
418        let task_id_for_hook = task_id.clone();
419        let engine_clone = engine.clone();
420        let token_clone = token.clone();
421        let handle = self
422            .inner
423            .spawn(engine, ctx, task_id, attempt, token)
424            .await?;
425        let Some(bridge) = bridge else {
426            return Ok(handle);
427        };
428        Ok(wrap_join(handle, move |signal| {
429            let bridge = bridge.clone();
430            let task_id = task_id_for_hook.clone();
431            let engine = engine_clone.clone();
432            let token = token_clone.clone();
433            async move {
434                signal?;
435                // Read the existing Final.
436                let last = pull_final_value_ok(&engine, &task_id, attempt).await;
437                if let Some((value, false)) = last {
438                    // ok=false: escalate to senior and push an override Final.
439                    let question = serde_json::json!({
440                        "reason": "worker reported ok=false",
441                        "value": value.clone(),
442                    });
443                    if let Ok(answer) = bridge.ask(&task_id, question).await {
444                        let override_val = serde_json::json!({
445                            "original": value,
446                            "senior_answer": answer,
447                        });
448                        let _ = engine
449                            .submit_output(
450                                &token,
451                                &task_id,
452                                attempt,
453                                OutputEvent::Final {
454                                    content: ContentRef::Inline {
455                                        value: override_val,
456                                    },
457                                    ok: true,
458                                },
459                            )
460                            .await;
461                    }
462                }
463                Ok(())
464            }
465        }))
466    }
467}
468
469// ─── OperatorDelegateMiddleware (delegates the whole spawn to an external Operator when one is attached) ──
470
471/// When `ctx.operator.operator.is_some()` (the session has an Operator
472/// backend), **bypass** `inner.spawn`, call `operator.execute(ctx, prompt)`,
473/// and box the result up as a `WorkerHandle`. In other words: the path that
474/// hands "this spawn" to whatever external Operator backend the engine has
475/// registered.
476///
477/// # Independent of `OperatorKind` (Operator is a generic abstraction)
478///
479/// An earlier implementation gated on `kind == MainAi | Composite`, which
480/// tied the `Operator` abstraction to an "AI driver" assumption — a design
481/// weakness. The `Operator` trait is a generic **external processing backend**
482/// (LLM, human, external resource, side-effectful operation — anything), and
483/// is orthogonal to the kind axis.
484///
485/// The current implementation decides solely on `operator.is_some()`:
486/// - Automate session + operator backend registered → delegate
487///   (pure external-execution delegation).
488/// - MainAi session + operator backend registered → delegate.
489/// - Any kind + `operator` `None` → pass through (normal `inner.spawn`).
490///
491/// `kind` still matters as a firing condition for `SpawnHook`s over in
492/// `MainAIMiddleware`, but this middleware ignores it.
493///
494/// # Split of responsibilities with `OperatorSpawner`
495///
496/// The two axes exist for different reasons:
497///
498/// - **This middleware — the Blueprint-global (session) axis.** Delegate every
499///   agent to the same Operator backend. The `operator_backend_id` is set
500///   at session-attach time; `ctx.agent` is ignored and every spawn in that
501///   session is routed through the operator (e.g. a MainAI-wide driver, or a
502///   human-wide console). The Blueprint doesn't have to talk about `kind` —
503///   it just declares the capability hint `"operator_delegate"` (keeping the
504///   Blueprint clean).
505///
506/// - **`OperatorSpawner` — the AgentSpec axis.** Each `AgentDef` bakes its
507///   own Operator backend. `kind = Operator` `AgentDef`s pick a backend via
508///   `spec.operator_ref`; the compiler bakes an `Arc<dyn Operator>` into
509///   `routes[agent_name]`. Agents loaded via the `agent.md` loader come in
510///   through this path (their default is `kind = Operator`).
511///
512/// # Exclusivity
513///
514/// When both are effective — this middleware's hint is declared, the session
515/// has an operator backend, **and** the Blueprint has a `kind = Operator`
516/// `AgentDef` — this middleware sits at the outer end of the stack and
517/// **completely bypasses** `inner.spawn`. The `OperatorSpawner` is never
518/// reached, so a double fire cannot occur by construction; the AgentSpec
519/// axis is inert. Consistent use means picking one axis per use case.
520pub struct OperatorDelegateMiddleware;
521
522impl OperatorDelegateMiddleware {
523    /// Stateless constructor.
524    pub fn new() -> Self {
525        Self
526    }
527}
528
529impl Default for OperatorDelegateMiddleware {
530    fn default() -> Self {
531        Self::new()
532    }
533}
534
535impl SpawnerLayer for OperatorDelegateMiddleware {
536    fn wrap(&self, inner: Arc<dyn SpawnerAdapter>) -> Arc<dyn SpawnerAdapter> {
537        Arc::new(OperatorDelegateWrapped { inner })
538    }
539}
540
541struct OperatorDelegateWrapped {
542    inner: Arc<dyn SpawnerAdapter>,
543}
544
545#[async_trait]
546impl SpawnerAdapter for OperatorDelegateWrapped {
547    async fn spawn(
548        &self,
549        engine: &Engine,
550        ctx: &Ctx,
551        task_id: TaskId,
552        attempt: u32,
553        token: CapToken,
554    ) -> Result<Box<dyn Worker>, SpawnError> {
555        // Kind-independent: we decide purely on whether an operator backend is
556        // registered on the session. `kind` matters for SpawnHook-style layers
557        // (MainAIMiddleware); this middleware does not consult it.
558        let Some(operator) = ctx.operator.operator.clone() else {
559            return self.inner.spawn(engine, ctx, task_id, attempt, token).await;
560        };
561
562        // Delegate: same shape as OperatorSpawner — fetch_prompt + operator.execute + Final emit.
563        let prompt = engine
564            .fetch_prompt(&token, &task_id)
565            .await
566            .map_err(|e| SpawnError::Internal(format!("fetch_prompt: {e}")))?;
567
568        // Resolve the Blueprint-baked worker binding injected into
569        // `ctx.meta.runtime` by `WorkerBindingMiddleware` (launch-time layer,
570        // built from `AgentDef.profile.worker_binding`). Absent key = agent
571        // declared no binding → hand `None` and let binding-requiring
572        // backends fail loud (`requires_worker_binding`). A present-but-
573        // malformed value is a wiring bug, not a degrade case — fail here.
574        let worker: Option<crate::operator::WorkerBinding> = match ctx
575            .meta
576            .runtime
577            .get(crate::middleware::worker_binding::WORKER_BINDING_KEY)
578        {
579            Some(v) => Some(serde_json::from_value(v.clone()).map_err(|e| {
580                SpawnError::Internal(format!(
581                    "ctx.meta.runtime['{}'] for agent '{}' is malformed: {e}",
582                    crate::middleware::worker_binding::WORKER_BINDING_KEY,
583                    ctx.agent
584                ))
585            })?),
586            None => None,
587        };
588
589        let engine_clone = engine.clone();
590        let token_clone = token.clone();
591        let token_for_op = token.clone();
592        let task_id_clone = task_id.clone();
593        let ctx_clone = ctx.clone();
594        let (tx, rx) = tokio::sync::oneshot::channel();
595        let cancel = tokio_util::sync::CancellationToken::new();
596        let cancel_inner = cancel.clone();
597        let worker_id = crate::types::WorkerId::new();
598
599        tokio::spawn(async move {
600            let result: Result<
601                crate::worker::adapter::WorkerResult,
602                crate::worker::adapter::WorkerError,
603            > = tokio::select! {
604                // OperatorDelegateMiddleware = session-global Operator delegation.
605                // Baking per-AgentDef profile.system_prompt is OperatorSpawner's
606                // job; this path has no per-agent spawner, so system stays None.
607                // The worker binding, however, IS resolved on this axis now:
608                // `WorkerBindingMiddleware` (launch-time layer) injects the
609                // Blueprint-baked binding into ctx.meta.runtime and we forward
610                // it here — the delegate axis is a first-class variant-dispatch
611                // path, not a binding-less fallback (issue 45db42a7).
612                // We hand the capability token (Role::Worker, 600s TTL) to the
613                // operator as `worker_token` — thin-spawn operators (e.g. a
614                // WebSocket-backed operator session) forward it to the SubAgent
615                // via encode(), while Operator impls that call the LLM directly
616                // may ignore it.
617                r = operator.execute(&ctx_clone, None, prompt, worker, token_for_op) => r,
618                _ = cancel_inner.cancelled() => Err(crate::worker::adapter::WorkerError::Cancelled),
619            };
620            if let Ok(wr) = &result {
621                // If the SubAgent has already pushed a Final through
622                // /v1/worker/result or /v1/worker/submit POST, skip a second
623                // emit here — the POST value is the canonical one (protocol
624                // design intent). Operator impls that never POST (e.g. tests
625                // and inline Operators) still get the fallback emit.
626                let tail = engine_clone.output_tail(&task_id_clone, attempt).await;
627                let has_final = tail
628                    .iter()
629                    .any(|ev| matches!(ev, crate::worker::output::OutputEvent::Final { .. }));
630                if !has_final {
631                    let ev = crate::worker::output::OutputEvent::Final {
632                        content: crate::worker::output::ContentRef::Inline {
633                            value: wr.value.clone(),
634                        },
635                        ok: wr.ok,
636                    };
637                    let _ = engine_clone
638                        .submit_output(&token_clone, &task_id_clone, attempt, ev)
639                        .await;
640                }
641            }
642            let signal: Result<(), crate::worker::adapter::WorkerError> = result.map(|_| ());
643            let _ = tx.send(signal);
644        });
645
646        Ok(Box::new(MiddlewareWorker {
647            handler: WorkerJoinHandler {
648                worker_id,
649                cancel,
650                completion: rx,
651            },
652        }))
653    }
654}
655
656// ─── LongHoldMiddleware (warns on the EventLog if completion time exceeds default_hold) ─
657
658/// Base layer that emits `Event::TaskAttemptCompleted` with a
659/// `long_hold_warn` marker when a spawn's completion takes longer than
660/// `default_hold`. Purely observational — it never alters the signal or
661/// blocks completion.
662pub struct LongHoldMiddleware {
663    /// Threshold above which a completion is flagged as long-held.
664    pub default_hold: Duration,
665    /// Broadcast sender the EventLog subscribes to.
666    pub event_tx: broadcast::Sender<Event>,
667}
668
669impl LongHoldMiddleware {
670    /// Sets the hold threshold and the event sender to warn through.
671    pub fn new(default_hold: Duration, event_tx: broadcast::Sender<Event>) -> Self {
672        Self {
673            default_hold,
674            event_tx,
675        }
676    }
677}
678
679impl SpawnerLayer for LongHoldMiddleware {
680    fn wrap(&self, inner: Arc<dyn SpawnerAdapter>) -> Arc<dyn SpawnerAdapter> {
681        Arc::new(LongHoldWrapped {
682            inner,
683            default_hold: self.default_hold,
684            event_tx: self.event_tx.clone(),
685        })
686    }
687}
688
689struct LongHoldWrapped {
690    inner: Arc<dyn SpawnerAdapter>,
691    default_hold: Duration,
692    event_tx: broadcast::Sender<Event>,
693}
694
695#[async_trait]
696impl SpawnerAdapter for LongHoldWrapped {
697    async fn spawn(
698        &self,
699        engine: &Engine,
700        ctx: &Ctx,
701        task_id: TaskId,
702        attempt: u32,
703        token: CapToken,
704    ) -> Result<Box<dyn Worker>, SpawnError> {
705        let handle = self
706            .inner
707            .spawn(engine, ctx, task_id.clone(), attempt, token)
708            .await?;
709        let started = Instant::now();
710        let default_hold = self.default_hold;
711        let event_tx = self.event_tx.clone();
712        let task_id_inner = task_id.clone();
713        Ok(wrap_join(handle, move |signal| {
714            let elapsed = started.elapsed();
715            let default_hold = default_hold;
716            let event_tx = event_tx.clone();
717            let task_id_inner = task_id_inner.clone();
718            async move {
719                if elapsed > default_hold {
720                    let _ = event_tx.send(Event::TaskAttemptCompleted {
721                        task_id: task_id_inner,
722                        attempt,
723                        result: serde_json::json!({
724                            "long_hold_warn": true,
725                            "elapsed_ms": elapsed.as_millis() as u64,
726                            "default_hold_ms": default_hold.as_millis() as u64,
727                        }),
728                    });
729                }
730                signal
731            }
732        }))
733    }
734}
735
736// Boundary regression spec for the delegate-axis worker-binding handoff
737// (issue 45db42a7): OperatorDelegateMiddleware must forward the binding
738// injected into ctx.meta.runtime by WorkerBindingMiddleware — both the
739// hit path (Some(worker) reaches Operator::execute) and the absent path
740// (None reaches it), plus fail-loud on a malformed value.
741#[cfg(test)]
742mod operator_delegate_worker_binding_tests {
743    use super::*;
744    use crate::core::config::EngineCfg;
745    use crate::core::state::TaskSpec;
746    use crate::operator::WorkerBinding;
747    use crate::types::Role;
748    use crate::worker::adapter::{WorkerError, WorkerResult};
749    use std::sync::Mutex;
750
751    /// Operator stub recording the `worker` argument it was executed with.
752    struct RecordingOperator {
753        seen: Arc<Mutex<Option<Option<WorkerBinding>>>>,
754    }
755
756    #[async_trait]
757    impl crate::operator::Operator for RecordingOperator {
758        async fn execute(
759            &self,
760            _ctx: &Ctx,
761            _system: Option<String>,
762            _prompt: String,
763            worker: Option<WorkerBinding>,
764            _worker_token: CapToken,
765        ) -> Result<WorkerResult, WorkerError> {
766            *self.seen.lock().unwrap() = Some(worker);
767            Ok(WorkerResult {
768                value: Value::Null,
769                ok: true,
770            })
771        }
772    }
773
774    /// Inner spawner that must never be reached when an operator is attached.
775    struct MustNotSpawn;
776
777    #[async_trait]
778    impl SpawnerAdapter for MustNotSpawn {
779        async fn spawn(
780            &self,
781            _engine: &Engine,
782            _ctx: &Ctx,
783            _task_id: TaskId,
784            _attempt: u32,
785            _token: CapToken,
786        ) -> Result<Box<dyn Worker>, SpawnError> {
787            panic!("delegate axis must bypass inner.spawn when an operator is attached");
788        }
789    }
790
791    async fn seeded_engine() -> (Engine, CapToken, TaskId) {
792        let engine = Engine::new(EngineCfg::default());
793        let op_token = engine
794            .attach("ut-op", Role::Operator, Duration::from_secs(30))
795            .await
796            .expect("attach");
797        let task_id = engine
798            .start_task(
799                &op_token,
800                TaskSpec {
801                    agent: "planner".to_string(),
802                    initial_directive: "do the thing".to_string(),
803                },
804            )
805            .await
806            .expect("start_task");
807        // Mint + register a worker token the same way
808        // `dispatch_attempt_with` does — the spawner path runs with a
809        // `Role::Worker` token (FetchPrompt is worker-verb-gated).
810        let worker_token = engine.signer().session(
811            format!("worker-of-{task_id}"),
812            Role::Worker,
813            vec!["*".into()],
814            Duration::from_secs(600),
815        );
816        let nonce = worker_token.nonce.clone();
817        let record = crate::core::state::CapTokenRecord::from_worker_token(
818            worker_token.clone(),
819            task_id.clone(),
820        );
821        engine
822            .with_state("test.mint_worker", move |s| {
823                s.tokens.insert(nonce, record);
824            })
825            .await
826            .expect("mint worker token");
827        (engine, worker_token, task_id)
828    }
829
830    fn delegate_stack() -> Arc<dyn SpawnerAdapter> {
831        OperatorDelegateMiddleware::new().wrap(Arc::new(MustNotSpawn))
832    }
833
834    async fn recorded_worker(
835        seen: &Arc<Mutex<Option<Option<WorkerBinding>>>>,
836    ) -> Option<WorkerBinding> {
837        for _ in 0..100 {
838            if let Some(w) = seen.lock().unwrap().clone() {
839                return w;
840            }
841            tokio::time::sleep(Duration::from_millis(10)).await;
842        }
843        panic!("operator.execute was never called within 1s");
844    }
845
846    #[tokio::test]
847    async fn forwards_ctx_injected_binding_to_operator_execute() {
848        let (engine, token, task_id) = seeded_engine().await;
849        let seen = Arc::new(Mutex::new(None));
850        let op = Arc::new(RecordingOperator { seen: seen.clone() });
851
852        let mut ctx = Ctx::new(task_id.clone(), 1, "planner");
853        ctx.operator.operator = Some(op);
854        ctx.meta.runtime.insert(
855            crate::middleware::worker_binding::WORKER_BINDING_KEY.to_string(),
856            serde_json::to_value(WorkerBinding {
857                variant: "mse-worker-coder".to_string(),
858                tools: vec!["Edit".to_string()],
859            })
860            .unwrap(),
861        );
862
863        let _worker = delegate_stack()
864            .spawn(&engine, &ctx, task_id, 1, token)
865            .await
866            .expect("delegate spawn ok");
867
868        let got = recorded_worker(&seen).await.expect("binding forwarded");
869        assert_eq!(got.variant, "mse-worker-coder");
870        assert_eq!(got.tools, vec!["Edit".to_string()]);
871    }
872
873    #[tokio::test]
874    async fn absent_binding_stays_none_no_silent_default() {
875        let (engine, token, task_id) = seeded_engine().await;
876        let seen = Arc::new(Mutex::new(None));
877        let op = Arc::new(RecordingOperator { seen: seen.clone() });
878
879        let mut ctx = Ctx::new(task_id.clone(), 1, "planner");
880        ctx.operator.operator = Some(op);
881
882        let _worker = delegate_stack()
883            .spawn(&engine, &ctx, task_id, 1, token)
884            .await
885            .expect("delegate spawn ok");
886
887        assert!(
888            recorded_worker(&seen).await.is_none(),
889            "no binding declared must reach the operator as None (fail-loud stays downstream)"
890        );
891    }
892
893    #[tokio::test]
894    async fn malformed_binding_fails_loud_before_execute() {
895        let (engine, token, task_id) = seeded_engine().await;
896        let seen = Arc::new(Mutex::new(None));
897        let op = Arc::new(RecordingOperator { seen: seen.clone() });
898
899        let mut ctx = Ctx::new(task_id.clone(), 1, "planner");
900        ctx.operator.operator = Some(op);
901        ctx.meta.runtime.insert(
902            crate::middleware::worker_binding::WORKER_BINDING_KEY.to_string(),
903            serde_json::json!({ "not_a_binding": true }),
904        );
905
906        let err = match delegate_stack()
907            .spawn(&engine, &ctx, task_id, 1, token)
908            .await
909        {
910            Ok(_) => panic!("malformed binding must fail the spawn"),
911            Err(e) => e,
912        };
913        let msg = format!("{err:?}");
914        assert!(
915            msg.contains("worker_binding") && msg.contains("malformed"),
916            "error must name the malformed key: {msg}"
917        );
918        assert!(
919            seen.lock().unwrap().is_none(),
920            "operator.execute must not run on malformed binding"
921        );
922    }
923}