mlua_swarm/middleware.rs
1//! Middleware overlay — cross-cutting concerns (Audit / MainAI / Senior /
2//! LongHold).
3//!
4//! Ships four `SpawnerLayer` implementations plus the `SpawnerStack` builder.
5//! Some layers key off `Ctx.operator.kind` and only fire for
6//! `MainAi` / `Composite` sessions; others (`Audit` / `LongHold`) apply
7//! uniformly across every kind.
8//!
9//! # Extension discipline — this layer is THE extension point (canonical)
10//!
11//! Background: an earlier iteration grew a verdict-specialised machinery
12//! (`judgment.rs` canonical type + 3-form parser + `state.agent_verdicts`
13//! map + dedicated accessor) that re-interpreted agent output *inside the
14//! engine core* and banned string-literal conds in favour of a Blueprint
15//! compile-layer translation. That whole complex was dismantled: the value
16//! it added over plain data was zero, while it created an IN-side dialect
17//! that every consumer had to learn. The design conclusion is a
18//! three-principle layering:
19//!
20//! 1. **IN is immutable, canonical form is JSON.** `Blueprint` /
21//! `mlua_flow_ir::Node` are plain serde data. No compile pass, no schema
22//! field that the engine expands, no Rust helper that builds `Expr`s.
23//! Flow control is written literally in Flow.ir:
24//! `Eq(Path("$.<step>.verdict"), Lit("blocked"))` — domain verdicts are
25//! plain strings inside step output, consumed by plain conds.
26//! 2. **Generation (authoring sugar) lives OUT**, on the consumer side
27//! (e.g. a vendored pure-Lua builder that prints Blueprint JSON). It
28//! never leaks into engine / schema crates, whatever language it is
29//! written in — the ban is on the *placement*, not the language.
30//! 3. **Runtime extension lives HERE, as a `SpawnerLayer`.** A middleware
31//! (or any future extension mechanism) may interpret the *results* of a
32//! Flow.ir run — `Ctx`, the `output_tail`, `Final { ok }` — in its own
33//! way and transform them. What it must NOT do:
34//! - introduce a new dialect on the IN side (schema fields / node
35//! rewriting / cond translation) — extensions read and transform, the
36//! wire format stays plain Flow.ir + JSON;
37//! - hide its effect: overrides are *appended* to the output tail
38//! (e.g. `SeniorEscalationMiddleware` pushes an override `Final`
39//! rather than mutating the recorded one), so the trace stays
40//! replayable and the flow stays observable;
41//! - accumulate private engine state keyed by its own semantics (the
42//! `agent_verdicts` anti-pattern) — state lives in ctx / output store
43//! as plain data.
44//!
45//! `AgentResolver`, `ProjectNameAliasMiddleware`, `SinkMiddleware`,
46//! `InputInjectMiddleware`, `LuaMiddleware`, `SeniorEscalationMiddleware`
47//! all follow this shape: edit `ctx` / wrap the worker, call the inner
48//! spawner, append observable output. Note `LuaMiddleware`'s scripts are
49//! host-constructed — embedding Lua source in a Blueprint is the IN-side
50//! dialect this discipline forbids, and would require its own guard
51//! design if ever revisited).
52
53pub mod input_inject;
54pub mod lua_layer;
55pub mod project_name_alias;
56pub mod resolver;
57pub mod sink;
58
59use crate::core::ctx::{Ctx, OperatorKind};
60use crate::core::engine::Engine;
61use crate::core::state::Event;
62use crate::types::{CapToken, TaskId};
63use crate::worker::adapter::{SpawnError, SpawnerAdapter};
64use crate::worker::output::{ContentRef, OutputEvent};
65use crate::worker::{wrap_join, MiddlewareWorker, Worker, WorkerJoinHandler};
66use async_trait::async_trait;
67use serde_json::Value;
68use std::sync::Arc;
69use std::time::{Duration, Instant};
70use tokio::sync::broadcast;
71
72/// Pull the terminal `Final` event's `(value, ok)` out of the tail (works
73/// for both `Inline` and `FileRef` content).
74async fn pull_final_value_ok(
75 engine: &Engine,
76 task_id: &TaskId,
77 attempt: u32,
78) -> Option<(Value, bool)> {
79 let tail = engine.output_tail(task_id, attempt).await;
80 tail.iter().rev().find_map(|ev| match ev {
81 OutputEvent::Final {
82 content: ContentRef::Inline { value },
83 ok,
84 } => Some((value.clone(), *ok)),
85 OutputEvent::Final {
86 content: ContentRef::FileRef { path, .. },
87 ok,
88 } => Some((serde_json::json!({"file_ref": path.to_string_lossy()}), *ok)),
89 _ => None,
90 })
91}
92
93/// Layer trait — one middleware stage wrapping a `SpawnerAdapter`.
94pub trait SpawnerLayer: Send + Sync + 'static {
95 /// Wraps `inner` in this layer's behaviour, returning a new
96 /// `SpawnerAdapter` that delegates to `inner` (directly or via
97 /// `wrap_join`) while adding this layer's cross-cutting effect.
98 fn wrap(&self, inner: Arc<dyn SpawnerAdapter>) -> Arc<dyn SpawnerAdapter>;
99}
100
101/// Stack builder that layers `SpawnerLayer`s on top of a base adapter.
102///
103/// Each `.layer(...)` call wraps a new **outer** stage — same ergonomics as
104/// `tower::ServiceBuilder`.
105pub struct SpawnerStack {
106 inner: Arc<dyn SpawnerAdapter>,
107}
108
109impl SpawnerStack {
110 /// Starts a stack with `base` as the innermost adapter.
111 pub fn new(base: Arc<dyn SpawnerAdapter>) -> Self {
112 Self { inner: base }
113 }
114
115 /// Wraps the current stack with a statically-typed `SpawnerLayer`,
116 /// becoming the new outermost stage.
117 pub fn layer<L: SpawnerLayer>(mut self, layer: L) -> Self {
118 self.inner = layer.wrap(self.inner);
119 self
120 }
121
122 /// Dynamically-typed variant taking `Arc<dyn SpawnerLayer>`. Used via
123 /// the `LayerRegistry` resolution path (where a factory returns
124 /// `Arc<dyn ...>`).
125 pub fn layer_dyn(mut self, layer: Arc<dyn SpawnerLayer>) -> Self {
126 self.inner = layer.wrap(self.inner);
127 self
128 }
129
130 /// Finishes the stack, returning the fully-wrapped adapter.
131 pub fn build(self) -> Arc<dyn SpawnerAdapter> {
132 self.inner
133 }
134}
135
136// ─── SpawnerLayerFactory + LayerRegistry ─────────────────────────────────
137//
138// # Design rationale
139//
140// Wiring is assembled per-launch through `TaskLaunchService.launch`:
141//
142// Compiler.compile(bp) ─┬─→ compiled.router (CompiledAgentTable: agent name → SpawnerAdapter dispatch)
143// │
144// │ service::linker::link(router, bp.spawner_hints.layers, &engine)
145// │ internal:
146// │ SpawnerStack::new(router)
147// │ .layer_dyn(base_factory_n(engine)) ← every LayerRegistry.base entry
148// │ .layer_dyn(hint_factory(engine)) ← resolves each bp.spawner_hints.layers key
149// │ .build()
150// ▼
151// EngineDispatcher::with_spawner(engine, op_token, stacked)
152// ▼
153// engine.dispatch_attempt_with(op_token, task_id, &stacked)
154//
155// # base vs hint — when to use each
156//
157// - **base layer**: wrapped around every Blueprint. Example: AuditMiddleware
158// (a mandatory EventLog audit). The caller registers with
159// `LayerRegistry::with_base(|e| Arc::new(AuditMiddleware::new(e.event_tx())))`.
160//
161// - **hint layer**: wrapped **only when the Blueprint declares the key** in
162// `spawner_hints.layers`. Examples: MainAIMiddleware /
163// SeniorEscalationMiddleware / OperatorDelegateMiddleware. The Blueprint
164// only declares a capability key (e.g. `"main_ai"`) without knowing the
165// implementation; the engine-side LayerRegistry resolves key → factory,
166// keeping the pure Flow layer separate from implementation details.
167//
168// # Factory pattern (handles layers that need Engine context)
169//
170// We do not hold `Arc<dyn SpawnerLayer>` directly because some layers
171// depend on the engine instance — for example AuditMiddleware needs
172// `engine.event_tx()` and can only be built after the engine exists. A
173// factory closure defers construction: the Layer instance is created only
174// when the engine is handed in.
175
176/// Factory closure for a `SpawnerLayer`. The caller registers these at
177/// startup, and they are called with the engine context at bind time.
178/// Stateless layers can use `|_engine| Arc::new(MyLayer)`; layers that need
179/// something like `event_tx` should do `|engine| Arc::new(MyLayer::new(engine.event_tx()))`.
180pub type LayerFactory =
181 Arc<dyn Fn(&crate::core::engine::Engine) -> Arc<dyn SpawnerLayer> + Send + Sync + 'static>;
182
183/// Registry of `LayerFactory`s, split into `base` (always applied) and
184/// `hints` (applied only when a Blueprint declares the matching key in
185/// `spawner_hints.layers`). See the module-level `# Factory pattern`
186/// notes above for why factories rather than pre-built layers.
187#[derive(Default, Clone)]
188pub struct LayerRegistry {
189 base: Vec<LayerFactory>,
190 hints: std::collections::HashMap<String, LayerFactory>,
191}
192
193impl LayerRegistry {
194 /// Empty registry (no base layers, no hint layers).
195 pub fn new() -> Self {
196 Self::default()
197 }
198
199 /// Register a base layer factory that is applied on every Blueprint bind
200 /// (for layers that must fire for every task — e.g. `AuditMiddleware`).
201 pub fn with_base<F>(mut self, factory: F) -> Self
202 where
203 F: Fn(&crate::core::engine::Engine) -> Arc<dyn SpawnerLayer> + Send + Sync + 'static,
204 {
205 self.base.push(Arc::new(factory));
206 self
207 }
208
209 /// Register a layer factory addressable by hint key. If
210 /// `Blueprint.spawner_hints.layers` lists the same key, it is wrapped at
211 /// bind time; otherwise it is a no-op.
212 pub fn with_hint<F>(mut self, key: impl Into<String>, factory: F) -> Self
213 where
214 F: Fn(&crate::core::engine::Engine) -> Arc<dyn SpawnerLayer> + Send + Sync + 'static,
215 {
216 self.hints.insert(key.into(), Arc::new(factory));
217 self
218 }
219
220 /// All registered base-layer factories, in registration order.
221 pub fn base_factories(&self) -> &[LayerFactory] {
222 &self.base
223 }
224
225 /// Looks up the hint-layer factory registered under `key`, if any.
226 pub fn lookup_hint(&self, key: &str) -> Option<&LayerFactory> {
227 self.hints.get(key)
228 }
229}
230
231// ─── AuditMiddleware (pushes into the EventLog broadcast path) ────────────
232
233/// Mandatory base layer that emits `Event::TaskAttemptStarted` on every
234/// spawn, before delegating. This is the audit trail's entry point into
235/// the EventLog broadcast channel.
236pub struct AuditMiddleware {
237 /// Broadcast sender the EventLog subscribes to.
238 pub event_tx: broadcast::Sender<Event>,
239}
240
241impl AuditMiddleware {
242 /// Wraps a broadcast sender to notify on every spawn.
243 pub fn new(event_tx: broadcast::Sender<Event>) -> Self {
244 Self { event_tx }
245 }
246}
247
248impl SpawnerLayer for AuditMiddleware {
249 fn wrap(&self, inner: Arc<dyn SpawnerAdapter>) -> Arc<dyn SpawnerAdapter> {
250 Arc::new(AuditWrapped {
251 inner,
252 event_tx: self.event_tx.clone(),
253 })
254 }
255}
256
257struct AuditWrapped {
258 inner: Arc<dyn SpawnerAdapter>,
259 event_tx: broadcast::Sender<Event>,
260}
261
262#[async_trait]
263impl SpawnerAdapter for AuditWrapped {
264 async fn spawn(
265 &self,
266 engine: &Engine,
267 ctx: &Ctx,
268 task_id: TaskId,
269 attempt: u32,
270 token: CapToken,
271 ) -> Result<Box<dyn Worker>, SpawnError> {
272 let _ = self.event_tx.send(Event::TaskAttemptStarted {
273 task_id: task_id.clone(),
274 attempt,
275 });
276 self.inner.spawn(engine, ctx, task_id, attempt, token).await
277 }
278}
279
280// ─── MainAIMiddleware (fires SpawnHook before/after for MainAI/Composite) ─
281
282/// Hint layer that fires `ctx.operator.spawn_hook.before`/`after` around
283/// a spawn, but only for `MainAi` / `Composite` sessions. No-op for
284/// other kinds (still delegates, just skips the hook calls).
285pub struct MainAIMiddleware;
286
287impl MainAIMiddleware {
288 /// Stateless constructor.
289 pub fn new() -> Self {
290 Self
291 }
292}
293
294impl Default for MainAIMiddleware {
295 fn default() -> Self {
296 Self::new()
297 }
298}
299
300impl SpawnerLayer for MainAIMiddleware {
301 fn wrap(&self, inner: Arc<dyn SpawnerAdapter>) -> Arc<dyn SpawnerAdapter> {
302 Arc::new(MainAIWrapped { inner })
303 }
304}
305
306struct MainAIWrapped {
307 inner: Arc<dyn SpawnerAdapter>,
308}
309
310#[async_trait]
311impl SpawnerAdapter for MainAIWrapped {
312 async fn spawn(
313 &self,
314 engine: &Engine,
315 ctx: &Ctx,
316 task_id: TaskId,
317 attempt: u32,
318 token: CapToken,
319 ) -> Result<Box<dyn Worker>, SpawnError> {
320 let mainai = matches!(
321 ctx.operator.kind,
322 OperatorKind::MainAi | OperatorKind::Composite
323 );
324 if mainai {
325 if let Some(hook) = &ctx.operator.spawn_hook {
326 hook.before(ctx)
327 .await
328 .map_err(SpawnError::RejectedByMiddleware)?;
329 }
330 }
331
332 let handle = self
333 .inner
334 .spawn(engine, ctx, task_id.clone(), attempt, token)
335 .await?;
336
337 if !mainai {
338 return Ok(handle);
339 }
340 let Some(hook) = ctx.operator.spawn_hook.clone() else {
341 return Ok(handle);
342 };
343
344 // Wrap the completion signal and call hook.after on finish.
345 // Pull the last Final from engine.output_tail as the value.
346 let ctx_clone = ctx.clone();
347 let engine_clone = engine.clone();
348 let task_id_clone = task_id.clone();
349 Ok(wrap_join(handle, move |signal| {
350 let hook = hook.clone();
351 let ctx_clone = ctx_clone.clone();
352 let engine_clone = engine_clone.clone();
353 let task_id_clone = task_id_clone.clone();
354 async move {
355 let v = match &signal {
356 Ok(()) => pull_final_value_ok(&engine_clone, &task_id_clone, attempt)
357 .await
358 .map(|(v, _)| v)
359 .unwrap_or(Value::Null),
360 Err(e) => Value::String(e.to_string()),
361 };
362 let _ = hook.after(&ctx_clone, &v).await;
363 signal
364 }
365 }))
366 }
367}
368
369// ─── SeniorEscalationMiddleware ───────────────────────────────────────────
370//
371// When a spawn's completion is `ok=false` and `ctx.operator.senior_bridge` is
372// Some, this auxiliary layer calls `SeniorBridge.ask`, merges the answer into
373// `WorkerResult.value` under `"senior_answer"`, and upgrades the result to
374// `ok=true`. Retry / re-dispatch is the engine (operator) side's job; this
375// layer only injects fresh material for that decision.
376
377/// Hint layer: on `ok=false` completion with `ctx.operator.senior_bridge`
378/// set, asks the bridge for guidance and pushes an override `Final`
379/// (`ok=true`) carrying `senior_answer`. See the module comment above
380/// this type for the full contract.
381pub struct SeniorEscalationMiddleware;
382
383impl SeniorEscalationMiddleware {
384 /// Stateless constructor.
385 pub fn new() -> Self {
386 Self
387 }
388}
389
390impl Default for SeniorEscalationMiddleware {
391 fn default() -> Self {
392 Self::new()
393 }
394}
395
396impl SpawnerLayer for SeniorEscalationMiddleware {
397 fn wrap(&self, inner: Arc<dyn SpawnerAdapter>) -> Arc<dyn SpawnerAdapter> {
398 Arc::new(SeniorWrapped { inner })
399 }
400}
401
402struct SeniorWrapped {
403 inner: Arc<dyn SpawnerAdapter>,
404}
405
406#[async_trait]
407impl SpawnerAdapter for SeniorWrapped {
408 async fn spawn(
409 &self,
410 engine: &Engine,
411 ctx: &Ctx,
412 task_id: TaskId,
413 attempt: u32,
414 token: CapToken,
415 ) -> Result<Box<dyn Worker>, SpawnError> {
416 let bridge = ctx.operator.senior_bridge.clone();
417 let task_id_for_hook = task_id.clone();
418 let engine_clone = engine.clone();
419 let token_clone = token.clone();
420 let handle = self
421 .inner
422 .spawn(engine, ctx, task_id, attempt, token)
423 .await?;
424 let Some(bridge) = bridge else {
425 return Ok(handle);
426 };
427 Ok(wrap_join(handle, move |signal| {
428 let bridge = bridge.clone();
429 let task_id = task_id_for_hook.clone();
430 let engine = engine_clone.clone();
431 let token = token_clone.clone();
432 async move {
433 signal?;
434 // Read the existing Final.
435 let last = pull_final_value_ok(&engine, &task_id, attempt).await;
436 if let Some((value, false)) = last {
437 // ok=false: escalate to senior and push an override Final.
438 let question = serde_json::json!({
439 "reason": "worker reported ok=false",
440 "value": value.clone(),
441 });
442 if let Ok(answer) = bridge.ask(&task_id, question).await {
443 let override_val = serde_json::json!({
444 "original": value,
445 "senior_answer": answer,
446 });
447 let _ = engine
448 .submit_output(
449 &token,
450 &task_id,
451 attempt,
452 OutputEvent::Final {
453 content: ContentRef::Inline {
454 value: override_val,
455 },
456 ok: true,
457 },
458 )
459 .await;
460 }
461 }
462 Ok(())
463 }
464 }))
465 }
466}
467
468// ─── OperatorDelegateMiddleware (delegates the whole spawn to an external Operator when one is attached) ──
469
470/// When `ctx.operator.operator.is_some()` (the session has an Operator
471/// backend), **bypass** `inner.spawn`, call `operator.execute(ctx, prompt)`,
472/// and box the result up as a `WorkerHandle`. In other words: the path that
473/// hands "this spawn" to whatever external Operator backend the engine has
474/// registered.
475///
476/// # Independent of `OperatorKind` (Operator is a generic abstraction)
477///
478/// An earlier implementation gated on `kind == MainAi | Composite`, which
479/// tied the `Operator` abstraction to an "AI driver" assumption — a design
480/// weakness. The `Operator` trait is a generic **external processing backend**
481/// (LLM, human, external resource, side-effectful operation — anything), and
482/// is orthogonal to the kind axis.
483///
484/// The current implementation decides solely on `operator.is_some()`:
485/// - Automate session + operator backend registered → delegate
486/// (pure external-execution delegation).
487/// - MainAi session + operator backend registered → delegate.
488/// - Any kind + `operator` `None` → pass through (normal `inner.spawn`).
489///
490/// `kind` still matters as a firing condition for `SpawnHook`s over in
491/// `MainAIMiddleware`, but this middleware ignores it.
492///
493/// # Split of responsibilities with `OperatorSpawner`
494///
495/// The two axes exist for different reasons:
496///
497/// - **This middleware — the Blueprint-global (session) axis.** Delegate every
498/// agent to the same Operator backend. The `operator_backend_id` is set
499/// at session-attach time; `ctx.agent` is ignored and every spawn in that
500/// session is routed through the operator (e.g. a MainAI-wide driver, or a
501/// human-wide console). The Blueprint doesn't have to talk about `kind` —
502/// it just declares the capability hint `"operator_delegate"` (keeping the
503/// Blueprint clean).
504///
505/// - **`OperatorSpawner` — the AgentSpec axis.** Each `AgentDef` bakes its
506/// own Operator backend. `kind = Operator` `AgentDef`s pick a backend via
507/// `spec.operator_ref`; the compiler bakes an `Arc<dyn Operator>` into
508/// `routes[agent_name]`. Agents loaded via the `agent.md` loader come in
509/// through this path (their default is `kind = Operator`).
510///
511/// # Exclusivity
512///
513/// When both are effective — this middleware's hint is declared, the session
514/// has an operator backend, **and** the Blueprint has a `kind = Operator`
515/// `AgentDef` — this middleware sits at the outer end of the stack and
516/// **completely bypasses** `inner.spawn`. The `OperatorSpawner` is never
517/// reached, so a double fire cannot occur by construction; the AgentSpec
518/// axis is inert. Consistent use means picking one axis per use case.
519pub struct OperatorDelegateMiddleware;
520
521impl OperatorDelegateMiddleware {
522 /// Stateless constructor.
523 pub fn new() -> Self {
524 Self
525 }
526}
527
528impl Default for OperatorDelegateMiddleware {
529 fn default() -> Self {
530 Self::new()
531 }
532}
533
534impl SpawnerLayer for OperatorDelegateMiddleware {
535 fn wrap(&self, inner: Arc<dyn SpawnerAdapter>) -> Arc<dyn SpawnerAdapter> {
536 Arc::new(OperatorDelegateWrapped { inner })
537 }
538}
539
540struct OperatorDelegateWrapped {
541 inner: Arc<dyn SpawnerAdapter>,
542}
543
544#[async_trait]
545impl SpawnerAdapter for OperatorDelegateWrapped {
546 async fn spawn(
547 &self,
548 engine: &Engine,
549 ctx: &Ctx,
550 task_id: TaskId,
551 attempt: u32,
552 token: CapToken,
553 ) -> Result<Box<dyn Worker>, SpawnError> {
554 // Kind-independent: we decide purely on whether an operator backend is
555 // registered on the session. `kind` matters for SpawnHook-style layers
556 // (MainAIMiddleware); this middleware does not consult it.
557 let Some(operator) = ctx.operator.operator.clone() else {
558 return self.inner.spawn(engine, ctx, task_id, attempt, token).await;
559 };
560
561 // Delegate: same shape as OperatorSpawner — fetch_prompt + operator.execute + Final emit.
562 let prompt = engine
563 .fetch_prompt(&token, &task_id)
564 .await
565 .map_err(|e| SpawnError::Internal(format!("fetch_prompt: {e}")))?;
566
567 let engine_clone = engine.clone();
568 let token_clone = token.clone();
569 let token_for_op = token.clone();
570 let task_id_clone = task_id.clone();
571 let ctx_clone = ctx.clone();
572 let (tx, rx) = tokio::sync::oneshot::channel();
573 let cancel = tokio_util::sync::CancellationToken::new();
574 let cancel_inner = cancel.clone();
575 let worker_id = crate::types::WorkerId::new();
576
577 tokio::spawn(async move {
578 let result: Result<
579 crate::worker::adapter::WorkerResult,
580 crate::worker::adapter::WorkerError,
581 > = tokio::select! {
582 // OperatorDelegateMiddleware = session-global Operator delegation.
583 // Baking per-AgentDef profile.system_prompt is OperatorSpawner's job;
584 // this path has no profile (ctx.agent is ignored on this axis), so
585 // we execute with system=None and worker=None — there is no
586 // AgentDef.profile.worker_binding to resolve here.
587 // We hand the capability token (Role::Worker, 600s TTL) to the
588 // operator as `worker_token` — thin-spawn operators (e.g. a
589 // WebSocket-backed operator session) forward it to the SubAgent
590 // via encode(), while Operator impls that call the LLM directly
591 // may ignore it.
592 r = operator.execute(&ctx_clone, None, prompt, None, token_for_op) => r,
593 _ = cancel_inner.cancelled() => Err(crate::worker::adapter::WorkerError::Cancelled),
594 };
595 if let Ok(wr) = &result {
596 // If the SubAgent has already pushed a Final through
597 // /v1/worker/result or /v1/worker/submit POST, skip a second
598 // emit here — the POST value is the canonical one (protocol
599 // design intent). Operator impls that never POST (e.g. tests
600 // and inline Operators) still get the fallback emit.
601 let tail = engine_clone.output_tail(&task_id_clone, attempt).await;
602 let has_final = tail
603 .iter()
604 .any(|ev| matches!(ev, crate::worker::output::OutputEvent::Final { .. }));
605 if !has_final {
606 let ev = crate::worker::output::OutputEvent::Final {
607 content: crate::worker::output::ContentRef::Inline {
608 value: wr.value.clone(),
609 },
610 ok: wr.ok,
611 };
612 let _ = engine_clone
613 .submit_output(&token_clone, &task_id_clone, attempt, ev)
614 .await;
615 }
616 }
617 let signal: Result<(), crate::worker::adapter::WorkerError> = result.map(|_| ());
618 let _ = tx.send(signal);
619 });
620
621 Ok(Box::new(MiddlewareWorker {
622 handler: WorkerJoinHandler {
623 worker_id,
624 cancel,
625 completion: rx,
626 },
627 }))
628 }
629}
630
631// ─── LongHoldMiddleware (warns on the EventLog if completion time exceeds default_hold) ─
632
633/// Base layer that emits `Event::TaskAttemptCompleted` with a
634/// `long_hold_warn` marker when a spawn's completion takes longer than
635/// `default_hold`. Purely observational — it never alters the signal or
636/// blocks completion.
637pub struct LongHoldMiddleware {
638 /// Threshold above which a completion is flagged as long-held.
639 pub default_hold: Duration,
640 /// Broadcast sender the EventLog subscribes to.
641 pub event_tx: broadcast::Sender<Event>,
642}
643
644impl LongHoldMiddleware {
645 /// Sets the hold threshold and the event sender to warn through.
646 pub fn new(default_hold: Duration, event_tx: broadcast::Sender<Event>) -> Self {
647 Self {
648 default_hold,
649 event_tx,
650 }
651 }
652}
653
654impl SpawnerLayer for LongHoldMiddleware {
655 fn wrap(&self, inner: Arc<dyn SpawnerAdapter>) -> Arc<dyn SpawnerAdapter> {
656 Arc::new(LongHoldWrapped {
657 inner,
658 default_hold: self.default_hold,
659 event_tx: self.event_tx.clone(),
660 })
661 }
662}
663
664struct LongHoldWrapped {
665 inner: Arc<dyn SpawnerAdapter>,
666 default_hold: Duration,
667 event_tx: broadcast::Sender<Event>,
668}
669
670#[async_trait]
671impl SpawnerAdapter for LongHoldWrapped {
672 async fn spawn(
673 &self,
674 engine: &Engine,
675 ctx: &Ctx,
676 task_id: TaskId,
677 attempt: u32,
678 token: CapToken,
679 ) -> Result<Box<dyn Worker>, SpawnError> {
680 let handle = self
681 .inner
682 .spawn(engine, ctx, task_id.clone(), attempt, token)
683 .await?;
684 let started = Instant::now();
685 let default_hold = self.default_hold;
686 let event_tx = self.event_tx.clone();
687 let task_id_inner = task_id.clone();
688 Ok(wrap_join(handle, move |signal| {
689 let elapsed = started.elapsed();
690 let default_hold = default_hold;
691 let event_tx = event_tx.clone();
692 let task_id_inner = task_id_inner.clone();
693 async move {
694 if elapsed > default_hold {
695 let _ = event_tx.send(Event::TaskAttemptCompleted {
696 task_id: task_id_inner,
697 attempt,
698 result: serde_json::json!({
699 "long_hold_warn": true,
700 "elapsed_ms": elapsed.as_millis() as u64,
701 "default_hold_ms": default_hold.as_millis() as u64,
702 }),
703 });
704 }
705 signal
706 }
707 }))
708 }
709}