Skip to main content

mlua_swarm/
middleware.rs

1//! Middleware overlay — cross-cutting concerns (Audit / MainAI / Senior /
2//! LongHold).
3//!
4//! Ships four `SpawnerLayer` implementations plus the `SpawnerStack` builder.
5//! Some layers key off `Ctx.operator.kind` and only fire for
6//! `MainAi` / `Composite` sessions; others (`Audit` / `LongHold`) apply
7//! uniformly across every kind.
8//!
9//! # Extension discipline — this layer is THE extension point (canonical)
10//!
11//! Background: an earlier iteration grew a verdict-specialised machinery
12//! (`judgment.rs` canonical type + 3-form parser + `state.agent_verdicts`
13//! map + dedicated accessor) that re-interpreted agent output *inside the
14//! engine core* and banned string-literal conds in favour of a Blueprint
15//! compile-layer translation. That whole complex was dismantled: the value
16//! it added over plain data was zero, while it created an IN-side dialect
17//! that every consumer had to learn. The design conclusion is a
18//! three-principle layering:
19//!
20//! 1. **IN is immutable, canonical form is JSON.** `Blueprint` /
21//!    `mlua_flow_ir::Node` are plain serde data. No compile pass, no schema
22//!    field that the engine expands, no Rust helper that builds `Expr`s.
23//!    Flow control is written literally in Flow.ir:
24//!    `Eq(Path("$.<step>.verdict"), Lit("blocked"))` — domain verdicts are
25//!    plain strings inside step output, consumed by plain conds.
26//! 2. **Generation (authoring sugar) lives OUT**, on the consumer side
27//!    (e.g. a vendored pure-Lua builder that prints Blueprint JSON). It
28//!    never leaks into engine / schema crates, whatever language it is
29//!    written in — the ban is on the *placement*, not the language.
30//! 3. **Runtime extension lives HERE, as a `SpawnerLayer`.** A middleware
31//!    (or any future extension mechanism) may interpret the *results* of a
32//!    Flow.ir run — `Ctx`, the `output_tail`, `Final { ok }` — in its own
33//!    way and transform them. What it must NOT do:
34//!    - introduce a new dialect on the IN side (schema fields / node
35//!      rewriting / cond translation) — extensions read and transform, the
36//!      wire format stays plain Flow.ir + JSON;
37//!    - hide its effect: overrides are *appended* to the output tail
38//!      (e.g. `SeniorEscalationMiddleware` pushes an override `Final`
39//!      rather than mutating the recorded one), so the trace stays
40//!      replayable and the flow stays observable;
41//!    - accumulate private engine state keyed by its own semantics (the
42//!      `agent_verdicts` anti-pattern) — state lives in ctx / output store
43//!      as plain data.
44//!
45//! `AgentResolver`, `ProjectNameAliasMiddleware`, `SinkMiddleware`,
46//! `InputInjectMiddleware`, `LuaMiddleware`, `SeniorEscalationMiddleware`
47//! all follow this shape: edit `ctx` / wrap the worker, call the inner
48//! spawner, append observable output. Note `LuaMiddleware`'s scripts are
49//! host-constructed — embedding Lua source in a Blueprint is the IN-side
50//! dialect this discipline forbids, and would require its own guard
51//! design if ever revisited).
52
53pub mod input_inject;
54pub mod lua_layer;
55pub mod project_name_alias;
56pub mod resolver;
57pub mod sink;
58
59use crate::core::ctx::{Ctx, OperatorKind};
60use crate::core::engine::Engine;
61use crate::core::state::Event;
62use crate::types::{CapToken, TaskId};
63use crate::worker::adapter::{SpawnError, SpawnerAdapter};
64use crate::worker::output::{ContentRef, OutputEvent};
65use crate::worker::{wrap_join, MiddlewareWorker, Worker, WorkerJoinHandler};
66use async_trait::async_trait;
67use serde_json::Value;
68use std::sync::Arc;
69use std::time::{Duration, Instant};
70use tokio::sync::broadcast;
71
72/// Pull the terminal `Final` event's `(value, ok)` out of the tail (works
73/// for both `Inline` and `FileRef` content).
74async fn pull_final_value_ok(
75    engine: &Engine,
76    task_id: &TaskId,
77    attempt: u32,
78) -> Option<(Value, bool)> {
79    let tail = engine.output_tail(task_id, attempt).await;
80    tail.iter().rev().find_map(|ev| match ev {
81        OutputEvent::Final {
82            content: ContentRef::Inline { value },
83            ok,
84        } => Some((value.clone(), *ok)),
85        OutputEvent::Final {
86            content: ContentRef::FileRef { path, .. },
87            ok,
88        } => Some((serde_json::json!({"file_ref": path.to_string_lossy()}), *ok)),
89        _ => None,
90    })
91}
92
93/// Layer trait — one middleware stage wrapping a `SpawnerAdapter`.
94pub trait SpawnerLayer: Send + Sync + 'static {
95    /// Wraps `inner` in this layer's behaviour, returning a new
96    /// `SpawnerAdapter` that delegates to `inner` (directly or via
97    /// `wrap_join`) while adding this layer's cross-cutting effect.
98    fn wrap(&self, inner: Arc<dyn SpawnerAdapter>) -> Arc<dyn SpawnerAdapter>;
99}
100
101/// Stack builder that layers `SpawnerLayer`s on top of a base adapter.
102///
103/// Each `.layer(...)` call wraps a new **outer** stage — same ergonomics as
104/// `tower::ServiceBuilder`.
105pub struct SpawnerStack {
106    inner: Arc<dyn SpawnerAdapter>,
107}
108
109impl SpawnerStack {
110    /// Starts a stack with `base` as the innermost adapter.
111    pub fn new(base: Arc<dyn SpawnerAdapter>) -> Self {
112        Self { inner: base }
113    }
114
115    /// Wraps the current stack with a statically-typed `SpawnerLayer`,
116    /// becoming the new outermost stage.
117    pub fn layer<L: SpawnerLayer>(mut self, layer: L) -> Self {
118        self.inner = layer.wrap(self.inner);
119        self
120    }
121
122    /// Dynamically-typed variant taking `Arc<dyn SpawnerLayer>`. Used via
123    /// the `LayerRegistry` resolution path (where a factory returns
124    /// `Arc<dyn ...>`).
125    pub fn layer_dyn(mut self, layer: Arc<dyn SpawnerLayer>) -> Self {
126        self.inner = layer.wrap(self.inner);
127        self
128    }
129
130    /// Finishes the stack, returning the fully-wrapped adapter.
131    pub fn build(self) -> Arc<dyn SpawnerAdapter> {
132        self.inner
133    }
134}
135
136// ─── SpawnerLayerFactory + LayerRegistry ─────────────────────────────────
137//
138// # Design rationale
139//
140// Wiring is assembled per-launch through `TaskLaunchService.launch`:
141//
142//   Compiler.compile(bp) ─┬─→ compiled.router (CompiledAgentTable: agent name → SpawnerAdapter dispatch)
143//                         │
144//                         │   service::linker::link(router, bp.spawner_hints.layers, &engine)
145//                         │     internal:
146//                         │       SpawnerStack::new(router)
147//                         │         .layer_dyn(base_factory_n(engine))   ← every LayerRegistry.base entry
148//                         │         .layer_dyn(hint_factory(engine))     ← resolves each bp.spawner_hints.layers key
149//                         │         .build()
150//                         ▼
151//                   EngineDispatcher::with_spawner(engine, op_token, stacked)
152//                         ▼
153//                   engine.dispatch_attempt_with(op_token, task_id, &stacked)
154//
155// # base vs hint — when to use each
156//
157// - **base layer**: wrapped around every Blueprint. Example: AuditMiddleware
158//   (a mandatory EventLog audit). The caller registers with
159//   `LayerRegistry::with_base(|e| Arc::new(AuditMiddleware::new(e.event_tx())))`.
160//
161// - **hint layer**: wrapped **only when the Blueprint declares the key** in
162//   `spawner_hints.layers`. Examples: MainAIMiddleware /
163//   SeniorEscalationMiddleware / OperatorDelegateMiddleware. The Blueprint
164//   only declares a capability key (e.g. `"main_ai"`) without knowing the
165//   implementation; the engine-side LayerRegistry resolves key → factory,
166//   keeping the pure Flow layer separate from implementation details.
167//
168// # Factory pattern (handles layers that need Engine context)
169//
170// We do not hold `Arc<dyn SpawnerLayer>` directly because some layers
171// depend on the engine instance — for example AuditMiddleware needs
172// `engine.event_tx()` and can only be built after the engine exists. A
173// factory closure defers construction: the Layer instance is created only
174// when the engine is handed in.
175
176/// Factory closure for a `SpawnerLayer`. The caller registers these at
177/// startup, and they are called with the engine context at bind time.
178/// Stateless layers can use `|_engine| Arc::new(MyLayer)`; layers that need
179/// something like `event_tx` should do `|engine| Arc::new(MyLayer::new(engine.event_tx()))`.
180pub type LayerFactory =
181    Arc<dyn Fn(&crate::core::engine::Engine) -> Arc<dyn SpawnerLayer> + Send + Sync + 'static>;
182
183/// Registry of `LayerFactory`s, split into `base` (always applied) and
184/// `hints` (applied only when a Blueprint declares the matching key in
185/// `spawner_hints.layers`). See the module-level `# Factory pattern`
186/// notes above for why factories rather than pre-built layers.
187#[derive(Default, Clone)]
188pub struct LayerRegistry {
189    base: Vec<LayerFactory>,
190    hints: std::collections::HashMap<String, LayerFactory>,
191}
192
193impl LayerRegistry {
194    /// Empty registry (no base layers, no hint layers).
195    pub fn new() -> Self {
196        Self::default()
197    }
198
199    /// Register a base layer factory that is applied on every Blueprint bind
200    /// (for layers that must fire for every task — e.g. `AuditMiddleware`).
201    pub fn with_base<F>(mut self, factory: F) -> Self
202    where
203        F: Fn(&crate::core::engine::Engine) -> Arc<dyn SpawnerLayer> + Send + Sync + 'static,
204    {
205        self.base.push(Arc::new(factory));
206        self
207    }
208
209    /// Register a layer factory addressable by hint key. If
210    /// `Blueprint.spawner_hints.layers` lists the same key, it is wrapped at
211    /// bind time; otherwise it is a no-op.
212    pub fn with_hint<F>(mut self, key: impl Into<String>, factory: F) -> Self
213    where
214        F: Fn(&crate::core::engine::Engine) -> Arc<dyn SpawnerLayer> + Send + Sync + 'static,
215    {
216        self.hints.insert(key.into(), Arc::new(factory));
217        self
218    }
219
220    /// All registered base-layer factories, in registration order.
221    pub fn base_factories(&self) -> &[LayerFactory] {
222        &self.base
223    }
224
225    /// Looks up the hint-layer factory registered under `key`, if any.
226    pub fn lookup_hint(&self, key: &str) -> Option<&LayerFactory> {
227        self.hints.get(key)
228    }
229}
230
231// ─── AuditMiddleware (pushes into the EventLog broadcast path) ────────────
232
233/// Mandatory base layer that emits `Event::TaskAttemptStarted` on every
234/// spawn, before delegating. This is the audit trail's entry point into
235/// the EventLog broadcast channel.
236pub struct AuditMiddleware {
237    /// Broadcast sender the EventLog subscribes to.
238    pub event_tx: broadcast::Sender<Event>,
239}
240
241impl AuditMiddleware {
242    /// Wraps a broadcast sender to notify on every spawn.
243    pub fn new(event_tx: broadcast::Sender<Event>) -> Self {
244        Self { event_tx }
245    }
246}
247
248impl SpawnerLayer for AuditMiddleware {
249    fn wrap(&self, inner: Arc<dyn SpawnerAdapter>) -> Arc<dyn SpawnerAdapter> {
250        Arc::new(AuditWrapped {
251            inner,
252            event_tx: self.event_tx.clone(),
253        })
254    }
255}
256
257struct AuditWrapped {
258    inner: Arc<dyn SpawnerAdapter>,
259    event_tx: broadcast::Sender<Event>,
260}
261
262#[async_trait]
263impl SpawnerAdapter for AuditWrapped {
264    async fn spawn(
265        &self,
266        engine: &Engine,
267        ctx: &Ctx,
268        task_id: TaskId,
269        attempt: u32,
270        token: CapToken,
271    ) -> Result<Box<dyn Worker>, SpawnError> {
272        let _ = self.event_tx.send(Event::TaskAttemptStarted {
273            task_id: task_id.clone(),
274            attempt,
275        });
276        self.inner.spawn(engine, ctx, task_id, attempt, token).await
277    }
278}
279
280// ─── MainAIMiddleware (fires SpawnHook before/after for MainAI/Composite) ─
281
282/// Hint layer that fires `ctx.operator.spawn_hook.before`/`after` around
283/// a spawn, but only for `MainAi` / `Composite` sessions. No-op for
284/// other kinds (still delegates, just skips the hook calls).
285pub struct MainAIMiddleware;
286
287impl MainAIMiddleware {
288    /// Stateless constructor.
289    pub fn new() -> Self {
290        Self
291    }
292}
293
294impl Default for MainAIMiddleware {
295    fn default() -> Self {
296        Self::new()
297    }
298}
299
300impl SpawnerLayer for MainAIMiddleware {
301    fn wrap(&self, inner: Arc<dyn SpawnerAdapter>) -> Arc<dyn SpawnerAdapter> {
302        Arc::new(MainAIWrapped { inner })
303    }
304}
305
306struct MainAIWrapped {
307    inner: Arc<dyn SpawnerAdapter>,
308}
309
310#[async_trait]
311impl SpawnerAdapter for MainAIWrapped {
312    async fn spawn(
313        &self,
314        engine: &Engine,
315        ctx: &Ctx,
316        task_id: TaskId,
317        attempt: u32,
318        token: CapToken,
319    ) -> Result<Box<dyn Worker>, SpawnError> {
320        let mainai = matches!(
321            ctx.operator.kind,
322            OperatorKind::MainAi | OperatorKind::Composite
323        );
324        if mainai {
325            if let Some(hook) = &ctx.operator.spawn_hook {
326                hook.before(ctx)
327                    .await
328                    .map_err(SpawnError::RejectedByMiddleware)?;
329            }
330        }
331
332        let handle = self
333            .inner
334            .spawn(engine, ctx, task_id.clone(), attempt, token)
335            .await?;
336
337        if !mainai {
338            return Ok(handle);
339        }
340        let Some(hook) = ctx.operator.spawn_hook.clone() else {
341            return Ok(handle);
342        };
343
344        // Wrap the completion signal and call hook.after on finish.
345        // Pull the last Final from engine.output_tail as the value.
346        let ctx_clone = ctx.clone();
347        let engine_clone = engine.clone();
348        let task_id_clone = task_id.clone();
349        Ok(wrap_join(handle, move |signal| {
350            let hook = hook.clone();
351            let ctx_clone = ctx_clone.clone();
352            let engine_clone = engine_clone.clone();
353            let task_id_clone = task_id_clone.clone();
354            async move {
355                let v = match &signal {
356                    Ok(()) => pull_final_value_ok(&engine_clone, &task_id_clone, attempt)
357                        .await
358                        .map(|(v, _)| v)
359                        .unwrap_or(Value::Null),
360                    Err(e) => Value::String(e.to_string()),
361                };
362                let _ = hook.after(&ctx_clone, &v).await;
363                signal
364            }
365        }))
366    }
367}
368
369// ─── SeniorEscalationMiddleware ───────────────────────────────────────────
370//
371// When a spawn's completion is `ok=false` and `ctx.operator.senior_bridge` is
372// Some, this auxiliary layer calls `SeniorBridge.ask`, merges the answer into
373// `WorkerResult.value` under `"senior_answer"`, and upgrades the result to
374// `ok=true`. Retry / re-dispatch is the engine (operator) side's job; this
375// layer only injects fresh material for that decision.
376
377/// Hint layer: on `ok=false` completion with `ctx.operator.senior_bridge`
378/// set, asks the bridge for guidance and pushes an override `Final`
379/// (`ok=true`) carrying `senior_answer`. See the module comment above
380/// this type for the full contract.
381pub struct SeniorEscalationMiddleware;
382
383impl SeniorEscalationMiddleware {
384    /// Stateless constructor.
385    pub fn new() -> Self {
386        Self
387    }
388}
389
390impl Default for SeniorEscalationMiddleware {
391    fn default() -> Self {
392        Self::new()
393    }
394}
395
396impl SpawnerLayer for SeniorEscalationMiddleware {
397    fn wrap(&self, inner: Arc<dyn SpawnerAdapter>) -> Arc<dyn SpawnerAdapter> {
398        Arc::new(SeniorWrapped { inner })
399    }
400}
401
402struct SeniorWrapped {
403    inner: Arc<dyn SpawnerAdapter>,
404}
405
406#[async_trait]
407impl SpawnerAdapter for SeniorWrapped {
408    async fn spawn(
409        &self,
410        engine: &Engine,
411        ctx: &Ctx,
412        task_id: TaskId,
413        attempt: u32,
414        token: CapToken,
415    ) -> Result<Box<dyn Worker>, SpawnError> {
416        let bridge = ctx.operator.senior_bridge.clone();
417        let task_id_for_hook = task_id.clone();
418        let engine_clone = engine.clone();
419        let token_clone = token.clone();
420        let handle = self
421            .inner
422            .spawn(engine, ctx, task_id, attempt, token)
423            .await?;
424        let Some(bridge) = bridge else {
425            return Ok(handle);
426        };
427        Ok(wrap_join(handle, move |signal| {
428            let bridge = bridge.clone();
429            let task_id = task_id_for_hook.clone();
430            let engine = engine_clone.clone();
431            let token = token_clone.clone();
432            async move {
433                signal?;
434                // Read the existing Final.
435                let last = pull_final_value_ok(&engine, &task_id, attempt).await;
436                if let Some((value, false)) = last {
437                    // ok=false: escalate to senior and push an override Final.
438                    let question = serde_json::json!({
439                        "reason": "worker reported ok=false",
440                        "value": value.clone(),
441                    });
442                    if let Ok(answer) = bridge.ask(&task_id, question).await {
443                        let override_val = serde_json::json!({
444                            "original": value,
445                            "senior_answer": answer,
446                        });
447                        let _ = engine
448                            .submit_output(
449                                &token,
450                                &task_id,
451                                attempt,
452                                OutputEvent::Final {
453                                    content: ContentRef::Inline {
454                                        value: override_val,
455                                    },
456                                    ok: true,
457                                },
458                            )
459                            .await;
460                    }
461                }
462                Ok(())
463            }
464        }))
465    }
466}
467
468// ─── OperatorDelegateMiddleware (delegates the whole spawn to an external Operator when one is attached) ──
469
470/// When `ctx.operator.operator.is_some()` (the session has an Operator
471/// backend), **bypass** `inner.spawn`, call `operator.execute(ctx, prompt)`,
472/// and box the result up as a `WorkerHandle`. In other words: the path that
473/// hands "this spawn" to whatever external Operator backend the engine has
474/// registered.
475///
476/// # Independent of `OperatorKind` (Operator is a generic abstraction)
477///
478/// An earlier implementation gated on `kind == MainAi | Composite`, which
479/// tied the `Operator` abstraction to an "AI driver" assumption — a design
480/// weakness. The `Operator` trait is a generic **external processing backend**
481/// (LLM, human, external resource, side-effectful operation — anything), and
482/// is orthogonal to the kind axis.
483///
484/// The current implementation decides solely on `operator.is_some()`:
485/// - Automate session + operator backend registered → delegate
486///   (pure external-execution delegation).
487/// - MainAi session + operator backend registered → delegate.
488/// - Any kind + `operator` `None` → pass through (normal `inner.spawn`).
489///
490/// `kind` still matters as a firing condition for `SpawnHook`s over in
491/// `MainAIMiddleware`, but this middleware ignores it.
492///
493/// # Split of responsibilities with `OperatorSpawner`
494///
495/// The two axes exist for different reasons:
496///
497/// - **This middleware — the Blueprint-global (session) axis.** Delegate every
498///   agent to the same Operator backend. The `operator_backend_id` is set
499///   at session-attach time; `ctx.agent` is ignored and every spawn in that
500///   session is routed through the operator (e.g. a MainAI-wide driver, or a
501///   human-wide console). The Blueprint doesn't have to talk about `kind` —
502///   it just declares the capability hint `"operator_delegate"` (keeping the
503///   Blueprint clean).
504///
505/// - **`OperatorSpawner` — the AgentSpec axis.** Each `AgentDef` bakes its
506///   own Operator backend. `kind = Operator` `AgentDef`s pick a backend via
507///   `spec.operator_ref`; the compiler bakes an `Arc<dyn Operator>` into
508///   `routes[agent_name]`. Agents loaded via the `agent.md` loader come in
509///   through this path (their default is `kind = Operator`).
510///
511/// # Exclusivity
512///
513/// When both are effective — this middleware's hint is declared, the session
514/// has an operator backend, **and** the Blueprint has a `kind = Operator`
515/// `AgentDef` — this middleware sits at the outer end of the stack and
516/// **completely bypasses** `inner.spawn`. The `OperatorSpawner` is never
517/// reached, so a double fire cannot occur by construction; the AgentSpec
518/// axis is inert. Consistent use means picking one axis per use case.
519pub struct OperatorDelegateMiddleware;
520
521impl OperatorDelegateMiddleware {
522    /// Stateless constructor.
523    pub fn new() -> Self {
524        Self
525    }
526}
527
528impl Default for OperatorDelegateMiddleware {
529    fn default() -> Self {
530        Self::new()
531    }
532}
533
534impl SpawnerLayer for OperatorDelegateMiddleware {
535    fn wrap(&self, inner: Arc<dyn SpawnerAdapter>) -> Arc<dyn SpawnerAdapter> {
536        Arc::new(OperatorDelegateWrapped { inner })
537    }
538}
539
540struct OperatorDelegateWrapped {
541    inner: Arc<dyn SpawnerAdapter>,
542}
543
544#[async_trait]
545impl SpawnerAdapter for OperatorDelegateWrapped {
546    async fn spawn(
547        &self,
548        engine: &Engine,
549        ctx: &Ctx,
550        task_id: TaskId,
551        attempt: u32,
552        token: CapToken,
553    ) -> Result<Box<dyn Worker>, SpawnError> {
554        // Kind-independent: we decide purely on whether an operator backend is
555        // registered on the session. `kind` matters for SpawnHook-style layers
556        // (MainAIMiddleware); this middleware does not consult it.
557        let Some(operator) = ctx.operator.operator.clone() else {
558            return self.inner.spawn(engine, ctx, task_id, attempt, token).await;
559        };
560
561        // Delegate: same shape as OperatorSpawner — fetch_prompt + operator.execute + Final emit.
562        let prompt = engine
563            .fetch_prompt(&token, &task_id)
564            .await
565            .map_err(|e| SpawnError::Internal(format!("fetch_prompt: {e}")))?;
566
567        let engine_clone = engine.clone();
568        let token_clone = token.clone();
569        let token_for_op = token.clone();
570        let task_id_clone = task_id.clone();
571        let ctx_clone = ctx.clone();
572        let (tx, rx) = tokio::sync::oneshot::channel();
573        let cancel = tokio_util::sync::CancellationToken::new();
574        let cancel_inner = cancel.clone();
575        let worker_id = crate::types::WorkerId::new();
576
577        tokio::spawn(async move {
578            let result: Result<
579                crate::worker::adapter::WorkerResult,
580                crate::worker::adapter::WorkerError,
581            > = tokio::select! {
582                // OperatorDelegateMiddleware = session-global Operator delegation.
583                // Baking per-AgentDef profile.system_prompt is OperatorSpawner's job;
584                // this path has no profile (ctx.agent is ignored on this axis), so
585                // we execute with system=None and worker=None — there is no
586                // AgentDef.profile.worker_binding to resolve here.
587                // We hand the capability token (Role::Worker, 600s TTL) to the
588                // operator as `worker_token` — thin-spawn operators (e.g. a
589                // WebSocket-backed operator session) forward it to the SubAgent
590                // via encode(), while Operator impls that call the LLM directly
591                // may ignore it.
592                r = operator.execute(&ctx_clone, None, prompt, None, token_for_op) => r,
593                _ = cancel_inner.cancelled() => Err(crate::worker::adapter::WorkerError::Cancelled),
594            };
595            if let Ok(wr) = &result {
596                // If the SubAgent has already pushed a Final through
597                // /v1/worker/result or /v1/worker/submit POST, skip a second
598                // emit here — the POST value is the canonical one (protocol
599                // design intent). Operator impls that never POST (e.g. tests
600                // and inline Operators) still get the fallback emit.
601                let tail = engine_clone.output_tail(&task_id_clone, attempt).await;
602                let has_final = tail
603                    .iter()
604                    .any(|ev| matches!(ev, crate::worker::output::OutputEvent::Final { .. }));
605                if !has_final {
606                    let ev = crate::worker::output::OutputEvent::Final {
607                        content: crate::worker::output::ContentRef::Inline {
608                            value: wr.value.clone(),
609                        },
610                        ok: wr.ok,
611                    };
612                    let _ = engine_clone
613                        .submit_output(&token_clone, &task_id_clone, attempt, ev)
614                        .await;
615                }
616            }
617            let signal: Result<(), crate::worker::adapter::WorkerError> = result.map(|_| ());
618            let _ = tx.send(signal);
619        });
620
621        Ok(Box::new(MiddlewareWorker {
622            handler: WorkerJoinHandler {
623                worker_id,
624                cancel,
625                completion: rx,
626            },
627        }))
628    }
629}
630
631// ─── LongHoldMiddleware (warns on the EventLog if completion time exceeds default_hold) ─
632
633/// Base layer that emits `Event::TaskAttemptCompleted` with a
634/// `long_hold_warn` marker when a spawn's completion takes longer than
635/// `default_hold`. Purely observational — it never alters the signal or
636/// blocks completion.
637pub struct LongHoldMiddleware {
638    /// Threshold above which a completion is flagged as long-held.
639    pub default_hold: Duration,
640    /// Broadcast sender the EventLog subscribes to.
641    pub event_tx: broadcast::Sender<Event>,
642}
643
644impl LongHoldMiddleware {
645    /// Sets the hold threshold and the event sender to warn through.
646    pub fn new(default_hold: Duration, event_tx: broadcast::Sender<Event>) -> Self {
647        Self {
648            default_hold,
649            event_tx,
650        }
651    }
652}
653
654impl SpawnerLayer for LongHoldMiddleware {
655    fn wrap(&self, inner: Arc<dyn SpawnerAdapter>) -> Arc<dyn SpawnerAdapter> {
656        Arc::new(LongHoldWrapped {
657            inner,
658            default_hold: self.default_hold,
659            event_tx: self.event_tx.clone(),
660        })
661    }
662}
663
664struct LongHoldWrapped {
665    inner: Arc<dyn SpawnerAdapter>,
666    default_hold: Duration,
667    event_tx: broadcast::Sender<Event>,
668}
669
670#[async_trait]
671impl SpawnerAdapter for LongHoldWrapped {
672    async fn spawn(
673        &self,
674        engine: &Engine,
675        ctx: &Ctx,
676        task_id: TaskId,
677        attempt: u32,
678        token: CapToken,
679    ) -> Result<Box<dyn Worker>, SpawnError> {
680        let handle = self
681            .inner
682            .spawn(engine, ctx, task_id.clone(), attempt, token)
683            .await?;
684        let started = Instant::now();
685        let default_hold = self.default_hold;
686        let event_tx = self.event_tx.clone();
687        let task_id_inner = task_id.clone();
688        Ok(wrap_join(handle, move |signal| {
689            let elapsed = started.elapsed();
690            let default_hold = default_hold;
691            let event_tx = event_tx.clone();
692            let task_id_inner = task_id_inner.clone();
693            async move {
694                if elapsed > default_hold {
695                    let _ = event_tx.send(Event::TaskAttemptCompleted {
696                        task_id: task_id_inner,
697                        attempt,
698                        result: serde_json::json!({
699                            "long_hold_warn": true,
700                            "elapsed_ms": elapsed.as_millis() as u64,
701                            "default_hold_ms": default_hold.as_millis() as u64,
702                        }),
703                    });
704                }
705                signal
706            }
707        }))
708    }
709}