mlua_swarm/middleware.rs
1//! Middleware overlay — cross-cutting concerns (Audit / MainAI / Senior /
2//! LongHold).
3//!
4//! Ships four `SpawnerLayer` implementations plus the `SpawnerStack` builder.
5//! Some layers key off `Ctx.operator.kind` and only fire for
6//! `MainAi` / `Composite` sessions; others (`Audit` / `LongHold`) apply
7//! uniformly across every kind.
8//!
9//! # Extension discipline — this layer is THE extension point (canonical)
10//!
11//! Background: an earlier iteration grew a verdict-specialised machinery
12//! (`judgment.rs` canonical type + 3-form parser + `state.agent_verdicts`
13//! map + dedicated accessor) that re-interpreted agent output *inside the
14//! engine core* and banned string-literal conds in favour of a Blueprint
15//! compile-layer translation. That whole complex was dismantled: the value
16//! it added over plain data was zero, while it created an IN-side dialect
17//! that every consumer had to learn. The design conclusion is a
18//! three-principle layering:
19//!
20//! 1. **IN is immutable, canonical form is JSON.** `Blueprint` /
21//! `mlua_flow_ir::Node` are plain serde data. No compile pass, no schema
22//! field that the engine expands, no Rust helper that builds `Expr`s.
23//! Flow control is written literally in Flow.ir:
24//! `Eq(Path("$.<step>.verdict"), Lit("blocked"))` — domain verdicts are
25//! plain strings inside step output, consumed by plain conds.
26//! 2. **Generation (authoring sugar) lives OUT**, on the consumer side
27//! (e.g. a vendored pure-Lua builder that prints Blueprint JSON). It
28//! never leaks into engine / schema crates, whatever language it is
29//! written in — the ban is on the *placement*, not the language.
30//! 3. **Runtime extension lives HERE, as a `SpawnerLayer`.** A middleware
31//! (or any future extension mechanism) may interpret the *results* of a
32//! Flow.ir run — `Ctx`, the `output_tail`, `Final { ok }` — in its own
33//! way and transform them. What it must NOT do:
34//! - introduce a new dialect on the IN side (schema fields / node
35//! rewriting / cond translation) — extensions read and transform, the
36//! wire format stays plain Flow.ir + JSON;
37//! - hide its effect: overrides are *appended* to the output tail
38//! (e.g. `SeniorEscalationMiddleware` pushes an override `Final`
39//! rather than mutating the recorded one), so the trace stays
40//! replayable and the flow stays observable;
41//! - accumulate private engine state keyed by its own semantics (the
42//! `agent_verdicts` anti-pattern) — state lives in ctx / output store
43//! as plain data.
44//!
45//! `AgentResolver`, `ProjectNameAliasMiddleware`, `SinkMiddleware`,
46//! `InputInjectMiddleware`, `LuaMiddleware`, `SeniorEscalationMiddleware`
47//! all follow this shape: edit `ctx` / wrap the worker, call the inner
48//! spawner, append observable output. Note `LuaMiddleware`'s scripts are
49//! host-constructed — embedding Lua source in a Blueprint is the IN-side
50//! dialect this discipline forbids, and would require its own guard
51//! design if ever revisited).
52
53pub mod input_inject;
54pub mod lua_layer;
55pub mod project_name_alias;
56pub mod resolver;
57pub mod sink;
58pub mod worker_binding;
59
60use crate::core::ctx::{Ctx, OperatorKind};
61use crate::core::engine::Engine;
62use crate::core::state::Event;
63use crate::types::{CapToken, TaskId};
64use crate::worker::adapter::{SpawnError, SpawnerAdapter};
65use crate::worker::output::{ContentRef, OutputEvent};
66use crate::worker::{wrap_join, MiddlewareWorker, Worker, WorkerJoinHandler};
67use async_trait::async_trait;
68use serde_json::Value;
69use std::sync::Arc;
70use std::time::{Duration, Instant};
71use tokio::sync::broadcast;
72
73/// Pull the terminal `Final` event's `(value, ok)` out of the tail (works
74/// for both `Inline` and `FileRef` content).
75async fn pull_final_value_ok(
76 engine: &Engine,
77 task_id: &TaskId,
78 attempt: u32,
79) -> Option<(Value, bool)> {
80 let tail = engine.output_tail(task_id, attempt).await;
81 tail.iter().rev().find_map(|ev| match ev {
82 OutputEvent::Final {
83 content: ContentRef::Inline { value },
84 ok,
85 } => Some((value.clone(), *ok)),
86 OutputEvent::Final {
87 content: ContentRef::FileRef { path, .. },
88 ok,
89 } => Some((serde_json::json!({"file_ref": path.to_string_lossy()}), *ok)),
90 _ => None,
91 })
92}
93
94/// Layer trait — one middleware stage wrapping a `SpawnerAdapter`.
95pub trait SpawnerLayer: Send + Sync + 'static {
96 /// Wraps `inner` in this layer's behaviour, returning a new
97 /// `SpawnerAdapter` that delegates to `inner` (directly or via
98 /// `wrap_join`) while adding this layer's cross-cutting effect.
99 fn wrap(&self, inner: Arc<dyn SpawnerAdapter>) -> Arc<dyn SpawnerAdapter>;
100}
101
102/// Stack builder that layers `SpawnerLayer`s on top of a base adapter.
103///
104/// Each `.layer(...)` call wraps a new **outer** stage — same ergonomics as
105/// `tower::ServiceBuilder`.
106pub struct SpawnerStack {
107 inner: Arc<dyn SpawnerAdapter>,
108}
109
110impl SpawnerStack {
111 /// Starts a stack with `base` as the innermost adapter.
112 pub fn new(base: Arc<dyn SpawnerAdapter>) -> Self {
113 Self { inner: base }
114 }
115
116 /// Wraps the current stack with a statically-typed `SpawnerLayer`,
117 /// becoming the new outermost stage.
118 pub fn layer<L: SpawnerLayer>(mut self, layer: L) -> Self {
119 self.inner = layer.wrap(self.inner);
120 self
121 }
122
123 /// Dynamically-typed variant taking `Arc<dyn SpawnerLayer>`. Used via
124 /// the `LayerRegistry` resolution path (where a factory returns
125 /// `Arc<dyn ...>`).
126 pub fn layer_dyn(mut self, layer: Arc<dyn SpawnerLayer>) -> Self {
127 self.inner = layer.wrap(self.inner);
128 self
129 }
130
131 /// Finishes the stack, returning the fully-wrapped adapter.
132 pub fn build(self) -> Arc<dyn SpawnerAdapter> {
133 self.inner
134 }
135}
136
137// ─── SpawnerLayerFactory + LayerRegistry ─────────────────────────────────
138//
139// # Design rationale
140//
141// Wiring is assembled per-launch through `TaskLaunchService.launch`:
142//
143// Compiler.compile(bp) ─┬─→ compiled.router (CompiledAgentTable: agent name → SpawnerAdapter dispatch)
144// │
145// │ service::linker::link(router, bp.spawner_hints.layers, &engine)
146// │ internal:
147// │ SpawnerStack::new(router)
148// │ .layer_dyn(base_factory_n(engine)) ← every LayerRegistry.base entry
149// │ .layer_dyn(hint_factory(engine)) ← resolves each bp.spawner_hints.layers key
150// │ .build()
151// ▼
152// EngineDispatcher::with_spawner(engine, op_token, stacked)
153// ▼
154// engine.dispatch_attempt_with(op_token, task_id, &stacked)
155//
156// # base vs hint — when to use each
157//
158// - **base layer**: wrapped around every Blueprint. Example: AuditMiddleware
159// (a mandatory EventLog audit). The caller registers with
160// `LayerRegistry::with_base(|e| Arc::new(AuditMiddleware::new(e.event_tx())))`.
161//
162// - **hint layer**: wrapped **only when the Blueprint declares the key** in
163// `spawner_hints.layers`. Examples: MainAIMiddleware /
164// SeniorEscalationMiddleware / OperatorDelegateMiddleware. The Blueprint
165// only declares a capability key (e.g. `"main_ai"`) without knowing the
166// implementation; the engine-side LayerRegistry resolves key → factory,
167// keeping the pure Flow layer separate from implementation details.
168//
169// # Factory pattern (handles layers that need Engine context)
170//
171// We do not hold `Arc<dyn SpawnerLayer>` directly because some layers
172// depend on the engine instance — for example AuditMiddleware needs
173// `engine.event_tx()` and can only be built after the engine exists. A
174// factory closure defers construction: the Layer instance is created only
175// when the engine is handed in.
176
177/// Factory closure for a `SpawnerLayer`. The caller registers these at
178/// startup, and they are called with the engine context at bind time.
179/// Stateless layers can use `|_engine| Arc::new(MyLayer)`; layers that need
180/// something like `event_tx` should do `|engine| Arc::new(MyLayer::new(engine.event_tx()))`.
181pub type LayerFactory =
182 Arc<dyn Fn(&crate::core::engine::Engine) -> Arc<dyn SpawnerLayer> + Send + Sync + 'static>;
183
184/// Registry of `LayerFactory`s, split into `base` (always applied) and
185/// `hints` (applied only when a Blueprint declares the matching key in
186/// `spawner_hints.layers`). See the module-level `# Factory pattern`
187/// notes above for why factories rather than pre-built layers.
188#[derive(Default, Clone)]
189pub struct LayerRegistry {
190 base: Vec<LayerFactory>,
191 hints: std::collections::HashMap<String, LayerFactory>,
192}
193
194impl LayerRegistry {
195 /// Empty registry (no base layers, no hint layers).
196 pub fn new() -> Self {
197 Self::default()
198 }
199
200 /// Register a base layer factory that is applied on every Blueprint bind
201 /// (for layers that must fire for every task — e.g. `AuditMiddleware`).
202 pub fn with_base<F>(mut self, factory: F) -> Self
203 where
204 F: Fn(&crate::core::engine::Engine) -> Arc<dyn SpawnerLayer> + Send + Sync + 'static,
205 {
206 self.base.push(Arc::new(factory));
207 self
208 }
209
210 /// Register a layer factory addressable by hint key. If
211 /// `Blueprint.spawner_hints.layers` lists the same key, it is wrapped at
212 /// bind time; otherwise it is a no-op.
213 pub fn with_hint<F>(mut self, key: impl Into<String>, factory: F) -> Self
214 where
215 F: Fn(&crate::core::engine::Engine) -> Arc<dyn SpawnerLayer> + Send + Sync + 'static,
216 {
217 self.hints.insert(key.into(), Arc::new(factory));
218 self
219 }
220
221 /// All registered base-layer factories, in registration order.
222 pub fn base_factories(&self) -> &[LayerFactory] {
223 &self.base
224 }
225
226 /// Looks up the hint-layer factory registered under `key`, if any.
227 pub fn lookup_hint(&self, key: &str) -> Option<&LayerFactory> {
228 self.hints.get(key)
229 }
230}
231
232// ─── AuditMiddleware (pushes into the EventLog broadcast path) ────────────
233
234/// Mandatory base layer that emits `Event::TaskAttemptStarted` on every
235/// spawn, before delegating. This is the audit trail's entry point into
236/// the EventLog broadcast channel.
237pub struct AuditMiddleware {
238 /// Broadcast sender the EventLog subscribes to.
239 pub event_tx: broadcast::Sender<Event>,
240}
241
242impl AuditMiddleware {
243 /// Wraps a broadcast sender to notify on every spawn.
244 pub fn new(event_tx: broadcast::Sender<Event>) -> Self {
245 Self { event_tx }
246 }
247}
248
249impl SpawnerLayer for AuditMiddleware {
250 fn wrap(&self, inner: Arc<dyn SpawnerAdapter>) -> Arc<dyn SpawnerAdapter> {
251 Arc::new(AuditWrapped {
252 inner,
253 event_tx: self.event_tx.clone(),
254 })
255 }
256}
257
258struct AuditWrapped {
259 inner: Arc<dyn SpawnerAdapter>,
260 event_tx: broadcast::Sender<Event>,
261}
262
263#[async_trait]
264impl SpawnerAdapter for AuditWrapped {
265 async fn spawn(
266 &self,
267 engine: &Engine,
268 ctx: &Ctx,
269 task_id: TaskId,
270 attempt: u32,
271 token: CapToken,
272 ) -> Result<Box<dyn Worker>, SpawnError> {
273 let _ = self.event_tx.send(Event::TaskAttemptStarted {
274 task_id: task_id.clone(),
275 attempt,
276 });
277 self.inner.spawn(engine, ctx, task_id, attempt, token).await
278 }
279}
280
281// ─── MainAIMiddleware (fires SpawnHook before/after for MainAI/Composite) ─
282
283/// Hint layer that fires `ctx.operator.spawn_hook.before`/`after` around
284/// a spawn, but only for `MainAi` / `Composite` sessions. No-op for
285/// other kinds (still delegates, just skips the hook calls).
286pub struct MainAIMiddleware;
287
288impl MainAIMiddleware {
289 /// Stateless constructor.
290 pub fn new() -> Self {
291 Self
292 }
293}
294
295impl Default for MainAIMiddleware {
296 fn default() -> Self {
297 Self::new()
298 }
299}
300
301impl SpawnerLayer for MainAIMiddleware {
302 fn wrap(&self, inner: Arc<dyn SpawnerAdapter>) -> Arc<dyn SpawnerAdapter> {
303 Arc::new(MainAIWrapped { inner })
304 }
305}
306
307struct MainAIWrapped {
308 inner: Arc<dyn SpawnerAdapter>,
309}
310
311#[async_trait]
312impl SpawnerAdapter for MainAIWrapped {
313 async fn spawn(
314 &self,
315 engine: &Engine,
316 ctx: &Ctx,
317 task_id: TaskId,
318 attempt: u32,
319 token: CapToken,
320 ) -> Result<Box<dyn Worker>, SpawnError> {
321 let mainai = matches!(
322 ctx.operator.kind,
323 OperatorKind::MainAi | OperatorKind::Composite
324 );
325 if mainai {
326 if let Some(hook) = &ctx.operator.spawn_hook {
327 hook.before(ctx)
328 .await
329 .map_err(SpawnError::RejectedByMiddleware)?;
330 }
331 }
332
333 let handle = self
334 .inner
335 .spawn(engine, ctx, task_id.clone(), attempt, token)
336 .await?;
337
338 if !mainai {
339 return Ok(handle);
340 }
341 let Some(hook) = ctx.operator.spawn_hook.clone() else {
342 return Ok(handle);
343 };
344
345 // Wrap the completion signal and call hook.after on finish.
346 // Pull the last Final from engine.output_tail as the value.
347 let ctx_clone = ctx.clone();
348 let engine_clone = engine.clone();
349 let task_id_clone = task_id.clone();
350 Ok(wrap_join(handle, move |signal| {
351 let hook = hook.clone();
352 let ctx_clone = ctx_clone.clone();
353 let engine_clone = engine_clone.clone();
354 let task_id_clone = task_id_clone.clone();
355 async move {
356 let v = match &signal {
357 Ok(()) => pull_final_value_ok(&engine_clone, &task_id_clone, attempt)
358 .await
359 .map(|(v, _)| v)
360 .unwrap_or(Value::Null),
361 Err(e) => Value::String(e.to_string()),
362 };
363 let _ = hook.after(&ctx_clone, &v).await;
364 signal
365 }
366 }))
367 }
368}
369
370// ─── SeniorEscalationMiddleware ───────────────────────────────────────────
371//
372// When a spawn's completion is `ok=false` and `ctx.operator.senior_bridge` is
373// Some, this auxiliary layer calls `SeniorBridge.ask`, merges the answer into
374// `WorkerResult.value` under `"senior_answer"`, and upgrades the result to
375// `ok=true`. Retry / re-dispatch is the engine (operator) side's job; this
376// layer only injects fresh material for that decision.
377
378/// Hint layer: on `ok=false` completion with `ctx.operator.senior_bridge`
379/// set, asks the bridge for guidance and pushes an override `Final`
380/// (`ok=true`) carrying `senior_answer`. See the module comment above
381/// this type for the full contract.
382pub struct SeniorEscalationMiddleware;
383
384impl SeniorEscalationMiddleware {
385 /// Stateless constructor.
386 pub fn new() -> Self {
387 Self
388 }
389}
390
391impl Default for SeniorEscalationMiddleware {
392 fn default() -> Self {
393 Self::new()
394 }
395}
396
397impl SpawnerLayer for SeniorEscalationMiddleware {
398 fn wrap(&self, inner: Arc<dyn SpawnerAdapter>) -> Arc<dyn SpawnerAdapter> {
399 Arc::new(SeniorWrapped { inner })
400 }
401}
402
403struct SeniorWrapped {
404 inner: Arc<dyn SpawnerAdapter>,
405}
406
407#[async_trait]
408impl SpawnerAdapter for SeniorWrapped {
409 async fn spawn(
410 &self,
411 engine: &Engine,
412 ctx: &Ctx,
413 task_id: TaskId,
414 attempt: u32,
415 token: CapToken,
416 ) -> Result<Box<dyn Worker>, SpawnError> {
417 let bridge = ctx.operator.senior_bridge.clone();
418 let task_id_for_hook = task_id.clone();
419 let engine_clone = engine.clone();
420 let token_clone = token.clone();
421 let handle = self
422 .inner
423 .spawn(engine, ctx, task_id, attempt, token)
424 .await?;
425 let Some(bridge) = bridge else {
426 return Ok(handle);
427 };
428 Ok(wrap_join(handle, move |signal| {
429 let bridge = bridge.clone();
430 let task_id = task_id_for_hook.clone();
431 let engine = engine_clone.clone();
432 let token = token_clone.clone();
433 async move {
434 signal?;
435 // Read the existing Final.
436 let last = pull_final_value_ok(&engine, &task_id, attempt).await;
437 if let Some((value, false)) = last {
438 // ok=false: escalate to senior and push an override Final.
439 let question = serde_json::json!({
440 "reason": "worker reported ok=false",
441 "value": value.clone(),
442 });
443 if let Ok(answer) = bridge.ask(&task_id, question).await {
444 let override_val = serde_json::json!({
445 "original": value,
446 "senior_answer": answer,
447 });
448 let _ = engine
449 .submit_output(
450 &token,
451 &task_id,
452 attempt,
453 OutputEvent::Final {
454 content: ContentRef::Inline {
455 value: override_val,
456 },
457 ok: true,
458 },
459 )
460 .await;
461 }
462 }
463 Ok(())
464 }
465 }))
466 }
467}
468
469// ─── OperatorDelegateMiddleware (delegates the whole spawn to an external Operator when one is attached) ──
470
471/// When `ctx.operator.operator.is_some()` (the session has an Operator
472/// backend), **bypass** `inner.spawn`, call `operator.execute(ctx, prompt)`,
473/// and box the result up as a `WorkerHandle`. In other words: the path that
474/// hands "this spawn" to whatever external Operator backend the engine has
475/// registered.
476///
477/// # Independent of `OperatorKind` (Operator is a generic abstraction)
478///
479/// An earlier implementation gated on `kind == MainAi | Composite`, which
480/// tied the `Operator` abstraction to an "AI driver" assumption — a design
481/// weakness. The `Operator` trait is a generic **external processing backend**
482/// (LLM, human, external resource, side-effectful operation — anything), and
483/// is orthogonal to the kind axis.
484///
485/// The current implementation decides solely on `operator.is_some()`:
486/// - Automate session + operator backend registered → delegate
487/// (pure external-execution delegation).
488/// - MainAi session + operator backend registered → delegate.
489/// - Any kind + `operator` `None` → pass through (normal `inner.spawn`).
490///
491/// `kind` still matters as a firing condition for `SpawnHook`s over in
492/// `MainAIMiddleware`, but this middleware ignores it.
493///
494/// # Split of responsibilities with `OperatorSpawner`
495///
496/// The two axes exist for different reasons:
497///
498/// - **This middleware — the Blueprint-global (session) axis.** Delegate every
499/// agent to the same Operator backend. The `operator_backend_id` is set
500/// at session-attach time; `ctx.agent` is ignored and every spawn in that
501/// session is routed through the operator (e.g. a MainAI-wide driver, or a
502/// human-wide console). The Blueprint doesn't have to talk about `kind` —
503/// it just declares the capability hint `"operator_delegate"` (keeping the
504/// Blueprint clean).
505///
506/// - **`OperatorSpawner` — the AgentSpec axis.** Each `AgentDef` bakes its
507/// own Operator backend. `kind = Operator` `AgentDef`s pick a backend via
508/// `spec.operator_ref`; the compiler bakes an `Arc<dyn Operator>` into
509/// `routes[agent_name]`. Agents loaded via the `agent.md` loader come in
510/// through this path (their default is `kind = Operator`).
511///
512/// # Exclusivity
513///
514/// When both are effective — this middleware's hint is declared, the session
515/// has an operator backend, **and** the Blueprint has a `kind = Operator`
516/// `AgentDef` — this middleware sits at the outer end of the stack and
517/// **completely bypasses** `inner.spawn`. The `OperatorSpawner` is never
518/// reached, so a double fire cannot occur by construction; the AgentSpec
519/// axis is inert. Consistent use means picking one axis per use case.
520pub struct OperatorDelegateMiddleware;
521
522impl OperatorDelegateMiddleware {
523 /// Stateless constructor.
524 pub fn new() -> Self {
525 Self
526 }
527}
528
529impl Default for OperatorDelegateMiddleware {
530 fn default() -> Self {
531 Self::new()
532 }
533}
534
535impl SpawnerLayer for OperatorDelegateMiddleware {
536 fn wrap(&self, inner: Arc<dyn SpawnerAdapter>) -> Arc<dyn SpawnerAdapter> {
537 Arc::new(OperatorDelegateWrapped { inner })
538 }
539}
540
541struct OperatorDelegateWrapped {
542 inner: Arc<dyn SpawnerAdapter>,
543}
544
545#[async_trait]
546impl SpawnerAdapter for OperatorDelegateWrapped {
547 async fn spawn(
548 &self,
549 engine: &Engine,
550 ctx: &Ctx,
551 task_id: TaskId,
552 attempt: u32,
553 token: CapToken,
554 ) -> Result<Box<dyn Worker>, SpawnError> {
555 // Kind-independent: we decide purely on whether an operator backend is
556 // registered on the session. `kind` matters for SpawnHook-style layers
557 // (MainAIMiddleware); this middleware does not consult it.
558 let Some(operator) = ctx.operator.operator.clone() else {
559 return self.inner.spawn(engine, ctx, task_id, attempt, token).await;
560 };
561
562 // Delegate: same shape as OperatorSpawner — fetch_prompt + operator.execute + Final emit.
563 let prompt = engine
564 .fetch_prompt(&token, &task_id)
565 .await
566 .map_err(|e| SpawnError::Internal(format!("fetch_prompt: {e}")))?;
567
568 // Resolve the Blueprint-baked worker binding injected into
569 // `ctx.meta.runtime` by `WorkerBindingMiddleware` (launch-time layer,
570 // built from `AgentDef.profile.worker_binding`). Absent key = agent
571 // declared no binding → hand `None` and let binding-requiring
572 // backends fail loud (`requires_worker_binding`). A present-but-
573 // malformed value is a wiring bug, not a degrade case — fail here.
574 let worker: Option<crate::operator::WorkerBinding> = match ctx
575 .meta
576 .runtime
577 .get(crate::middleware::worker_binding::WORKER_BINDING_KEY)
578 {
579 Some(v) => Some(serde_json::from_value(v.clone()).map_err(|e| {
580 SpawnError::Internal(format!(
581 "ctx.meta.runtime['{}'] for agent '{}' is malformed: {e}",
582 crate::middleware::worker_binding::WORKER_BINDING_KEY,
583 ctx.agent
584 ))
585 })?),
586 None => None,
587 };
588
589 let engine_clone = engine.clone();
590 let token_clone = token.clone();
591 let token_for_op = token.clone();
592 let task_id_clone = task_id.clone();
593 let ctx_clone = ctx.clone();
594 let (tx, rx) = tokio::sync::oneshot::channel();
595 let cancel = tokio_util::sync::CancellationToken::new();
596 let cancel_inner = cancel.clone();
597 let worker_id = crate::types::WorkerId::new();
598
599 tokio::spawn(async move {
600 let result: Result<
601 crate::worker::adapter::WorkerResult,
602 crate::worker::adapter::WorkerError,
603 > = tokio::select! {
604 // OperatorDelegateMiddleware = session-global Operator delegation.
605 // Baking per-AgentDef profile.system_prompt is OperatorSpawner's
606 // job; this path has no per-agent spawner, so system stays None.
607 // The worker binding, however, IS resolved on this axis now:
608 // `WorkerBindingMiddleware` (launch-time layer) injects the
609 // Blueprint-baked binding into ctx.meta.runtime and we forward
610 // it here — the delegate axis is a first-class variant-dispatch
611 // path, not a binding-less fallback (issue 45db42a7).
612 // We hand the capability token (Role::Worker, 600s TTL) to the
613 // operator as `worker_token` — thin-spawn operators (e.g. a
614 // WebSocket-backed operator session) forward it to the SubAgent
615 // via encode(), while Operator impls that call the LLM directly
616 // may ignore it.
617 r = operator.execute(&ctx_clone, None, prompt, worker, token_for_op) => r,
618 _ = cancel_inner.cancelled() => Err(crate::worker::adapter::WorkerError::Cancelled),
619 };
620 if let Ok(wr) = &result {
621 // If the SubAgent has already pushed a Final through
622 // /v1/worker/result or /v1/worker/submit POST, skip a second
623 // emit here — the POST value is the canonical one (protocol
624 // design intent). Operator impls that never POST (e.g. tests
625 // and inline Operators) still get the fallback emit.
626 let tail = engine_clone.output_tail(&task_id_clone, attempt).await;
627 let has_final = tail
628 .iter()
629 .any(|ev| matches!(ev, crate::worker::output::OutputEvent::Final { .. }));
630 if !has_final {
631 let ev = crate::worker::output::OutputEvent::Final {
632 content: crate::worker::output::ContentRef::Inline {
633 value: wr.value.clone(),
634 },
635 ok: wr.ok,
636 };
637 let _ = engine_clone
638 .submit_output(&token_clone, &task_id_clone, attempt, ev)
639 .await;
640 }
641 }
642 let signal: Result<(), crate::worker::adapter::WorkerError> = result.map(|_| ());
643 let _ = tx.send(signal);
644 });
645
646 Ok(Box::new(MiddlewareWorker {
647 handler: WorkerJoinHandler {
648 worker_id,
649 cancel,
650 completion: rx,
651 },
652 }))
653 }
654}
655
656// ─── LongHoldMiddleware (warns on the EventLog if completion time exceeds default_hold) ─
657
658/// Base layer that emits `Event::TaskAttemptCompleted` with a
659/// `long_hold_warn` marker when a spawn's completion takes longer than
660/// `default_hold`. Purely observational — it never alters the signal or
661/// blocks completion.
662pub struct LongHoldMiddleware {
663 /// Threshold above which a completion is flagged as long-held.
664 pub default_hold: Duration,
665 /// Broadcast sender the EventLog subscribes to.
666 pub event_tx: broadcast::Sender<Event>,
667}
668
669impl LongHoldMiddleware {
670 /// Sets the hold threshold and the event sender to warn through.
671 pub fn new(default_hold: Duration, event_tx: broadcast::Sender<Event>) -> Self {
672 Self {
673 default_hold,
674 event_tx,
675 }
676 }
677}
678
679impl SpawnerLayer for LongHoldMiddleware {
680 fn wrap(&self, inner: Arc<dyn SpawnerAdapter>) -> Arc<dyn SpawnerAdapter> {
681 Arc::new(LongHoldWrapped {
682 inner,
683 default_hold: self.default_hold,
684 event_tx: self.event_tx.clone(),
685 })
686 }
687}
688
689struct LongHoldWrapped {
690 inner: Arc<dyn SpawnerAdapter>,
691 default_hold: Duration,
692 event_tx: broadcast::Sender<Event>,
693}
694
695#[async_trait]
696impl SpawnerAdapter for LongHoldWrapped {
697 async fn spawn(
698 &self,
699 engine: &Engine,
700 ctx: &Ctx,
701 task_id: TaskId,
702 attempt: u32,
703 token: CapToken,
704 ) -> Result<Box<dyn Worker>, SpawnError> {
705 let handle = self
706 .inner
707 .spawn(engine, ctx, task_id.clone(), attempt, token)
708 .await?;
709 let started = Instant::now();
710 let default_hold = self.default_hold;
711 let event_tx = self.event_tx.clone();
712 let task_id_inner = task_id.clone();
713 Ok(wrap_join(handle, move |signal| {
714 let elapsed = started.elapsed();
715 let default_hold = default_hold;
716 let event_tx = event_tx.clone();
717 let task_id_inner = task_id_inner.clone();
718 async move {
719 if elapsed > default_hold {
720 let _ = event_tx.send(Event::TaskAttemptCompleted {
721 task_id: task_id_inner,
722 attempt,
723 result: serde_json::json!({
724 "long_hold_warn": true,
725 "elapsed_ms": elapsed.as_millis() as u64,
726 "default_hold_ms": default_hold.as_millis() as u64,
727 }),
728 });
729 }
730 signal
731 }
732 }))
733 }
734}
735
736// Boundary regression spec for the delegate-axis worker-binding handoff
737// (issue 45db42a7): OperatorDelegateMiddleware must forward the binding
738// injected into ctx.meta.runtime by WorkerBindingMiddleware — both the
739// hit path (Some(worker) reaches Operator::execute) and the absent path
740// (None reaches it), plus fail-loud on a malformed value.
741#[cfg(test)]
742mod operator_delegate_worker_binding_tests {
743 use super::*;
744 use crate::core::config::EngineCfg;
745 use crate::core::state::TaskSpec;
746 use crate::operator::WorkerBinding;
747 use crate::types::Role;
748 use crate::worker::adapter::{WorkerError, WorkerResult};
749 use std::sync::Mutex;
750
751 /// Operator stub recording the `worker` argument it was executed with.
752 struct RecordingOperator {
753 seen: Arc<Mutex<Option<Option<WorkerBinding>>>>,
754 }
755
756 #[async_trait]
757 impl crate::operator::Operator for RecordingOperator {
758 async fn execute(
759 &self,
760 _ctx: &Ctx,
761 _system: Option<String>,
762 _prompt: String,
763 worker: Option<WorkerBinding>,
764 _worker_token: CapToken,
765 ) -> Result<WorkerResult, WorkerError> {
766 *self.seen.lock().unwrap() = Some(worker);
767 Ok(WorkerResult {
768 value: Value::Null,
769 ok: true,
770 })
771 }
772 }
773
774 /// Inner spawner that must never be reached when an operator is attached.
775 struct MustNotSpawn;
776
777 #[async_trait]
778 impl SpawnerAdapter for MustNotSpawn {
779 async fn spawn(
780 &self,
781 _engine: &Engine,
782 _ctx: &Ctx,
783 _task_id: TaskId,
784 _attempt: u32,
785 _token: CapToken,
786 ) -> Result<Box<dyn Worker>, SpawnError> {
787 panic!("delegate axis must bypass inner.spawn when an operator is attached");
788 }
789 }
790
791 async fn seeded_engine() -> (Engine, CapToken, TaskId) {
792 let engine = Engine::new(EngineCfg::default());
793 let op_token = engine
794 .attach("ut-op", Role::Operator, Duration::from_secs(30))
795 .await
796 .expect("attach");
797 let task_id = engine
798 .start_task(
799 &op_token,
800 TaskSpec {
801 agent: "planner".to_string(),
802 initial_directive: "do the thing".to_string(),
803 },
804 )
805 .await
806 .expect("start_task");
807 // Mint + register a worker token the same way
808 // `dispatch_attempt_with` does — the spawner path runs with a
809 // `Role::Worker` token (FetchPrompt is worker-verb-gated).
810 let worker_token = engine.signer().session(
811 format!("worker-of-{task_id}"),
812 Role::Worker,
813 vec!["*".into()],
814 Duration::from_secs(600),
815 );
816 let nonce = worker_token.nonce.clone();
817 let record = crate::core::state::CapTokenRecord::from_worker_token(
818 worker_token.clone(),
819 task_id.clone(),
820 );
821 engine
822 .with_state("test.mint_worker", move |s| {
823 s.tokens.insert(nonce, record);
824 })
825 .await
826 .expect("mint worker token");
827 (engine, worker_token, task_id)
828 }
829
830 fn delegate_stack() -> Arc<dyn SpawnerAdapter> {
831 OperatorDelegateMiddleware::new().wrap(Arc::new(MustNotSpawn))
832 }
833
834 async fn recorded_worker(
835 seen: &Arc<Mutex<Option<Option<WorkerBinding>>>>,
836 ) -> Option<WorkerBinding> {
837 for _ in 0..100 {
838 if let Some(w) = seen.lock().unwrap().clone() {
839 return w;
840 }
841 tokio::time::sleep(Duration::from_millis(10)).await;
842 }
843 panic!("operator.execute was never called within 1s");
844 }
845
846 #[tokio::test]
847 async fn forwards_ctx_injected_binding_to_operator_execute() {
848 let (engine, token, task_id) = seeded_engine().await;
849 let seen = Arc::new(Mutex::new(None));
850 let op = Arc::new(RecordingOperator { seen: seen.clone() });
851
852 let mut ctx = Ctx::new(task_id.clone(), 1, "planner");
853 ctx.operator.operator = Some(op);
854 ctx.meta.runtime.insert(
855 crate::middleware::worker_binding::WORKER_BINDING_KEY.to_string(),
856 serde_json::to_value(WorkerBinding {
857 variant: "mse-worker-coder".to_string(),
858 tools: vec!["Edit".to_string()],
859 })
860 .unwrap(),
861 );
862
863 let _worker = delegate_stack()
864 .spawn(&engine, &ctx, task_id, 1, token)
865 .await
866 .expect("delegate spawn ok");
867
868 let got = recorded_worker(&seen).await.expect("binding forwarded");
869 assert_eq!(got.variant, "mse-worker-coder");
870 assert_eq!(got.tools, vec!["Edit".to_string()]);
871 }
872
873 #[tokio::test]
874 async fn absent_binding_stays_none_no_silent_default() {
875 let (engine, token, task_id) = seeded_engine().await;
876 let seen = Arc::new(Mutex::new(None));
877 let op = Arc::new(RecordingOperator { seen: seen.clone() });
878
879 let mut ctx = Ctx::new(task_id.clone(), 1, "planner");
880 ctx.operator.operator = Some(op);
881
882 let _worker = delegate_stack()
883 .spawn(&engine, &ctx, task_id, 1, token)
884 .await
885 .expect("delegate spawn ok");
886
887 assert!(
888 recorded_worker(&seen).await.is_none(),
889 "no binding declared must reach the operator as None (fail-loud stays downstream)"
890 );
891 }
892
893 #[tokio::test]
894 async fn malformed_binding_fails_loud_before_execute() {
895 let (engine, token, task_id) = seeded_engine().await;
896 let seen = Arc::new(Mutex::new(None));
897 let op = Arc::new(RecordingOperator { seen: seen.clone() });
898
899 let mut ctx = Ctx::new(task_id.clone(), 1, "planner");
900 ctx.operator.operator = Some(op);
901 ctx.meta.runtime.insert(
902 crate::middleware::worker_binding::WORKER_BINDING_KEY.to_string(),
903 serde_json::json!({ "not_a_binding": true }),
904 );
905
906 let err = match delegate_stack()
907 .spawn(&engine, &ctx, task_id, 1, token)
908 .await
909 {
910 Ok(_) => panic!("malformed binding must fail the spawn"),
911 Err(e) => e,
912 };
913 let msg = format!("{err:?}");
914 assert!(
915 msg.contains("worker_binding") && msg.contains("malformed"),
916 "error must name the malformed key: {msg}"
917 );
918 assert!(
919 seen.lock().unwrap().is_none(),
920 "operator.execute must not run on malformed binding"
921 );
922 }
923}