Skip to main content

mlua_swarm/blueprint/
compiler.rs

1//! Blueprint `Compiler`, `CompiledAgentTable`, and the three default
2//! `SpawnerFactory` implementations.
3//!
4//! ## Pipeline
5//!
6//! ```text
7//! Blueprint (= flow + agents + hints + strategy + spawner_hints)
8//!     │
9//!     │ Compiler.compile(&bp)          ← this module (AgentDef → SpawnerAdapter table)
10//!     ▼
11//! CompiledBlueprint {
12//!     router: Arc<CompiledAgentTable>, // ctx.agent → SpawnerAdapter lookup
13//!     flow:   FlowNode,                // the flow.ir source (evaluated via EngineDispatcher)
14//!     metadata: BlueprintMetadata,
15//! }
16//!     │
17//!     │ service::linker::link(router, blueprint.spawner_hints.layers, &engine)
18//!     ▼                                   ↑ Layer wrapping is done separately (src/service/linker.rs)
19//! `Arc<dyn SpawnerAdapter>`            (already wrapped with base + hint SpawnerLayers)
20//!     │
21//!     ▼ EngineDispatcher::with_spawner → engine.dispatch_attempt_with
22//! ```
23//!
24//! `CompiledAgentTable` is a thin table: it looks up `routes[name]` by
25//! `ctx.agent` and hands the spawn off to the matching `SpawnerAdapter`.
26//! The `routes` map is built at compile time through `SpawnerFactory`
27//! implementations. Layer wrapping is not part of this module — it lives
28//! in `service::linker::link`.
29
30use crate::blueprint::{AgentDef, AgentKind, Blueprint, BlueprintMetadata};
31use crate::core::ctx::Ctx;
32use crate::core::engine::Engine;
33use crate::operator::{Operator, OperatorSpawner, WorkerBinding};
34use crate::types::{CapToken, TaskId};
35use crate::worker::adapter::{InProcSpawner, SpawnError, SpawnerAdapter, WorkerFn};
36use crate::worker::process_spawner::{ProcessSpawner, StreamMode};
37use crate::worker::Worker;
38use async_trait::async_trait;
39use mlua_flow_ir::Node as FlowNode;
40use serde_json::Value;
41use std::collections::HashMap;
42use std::sync::Arc;
43use thiserror::Error;
44
45// ─── error ───────────────────────────────────────────────────────────────
46
47/// Everything that can go wrong while `Compiler::compile` turns a
48/// `Blueprint` into a `CompiledBlueprint`.
49#[derive(Debug, Error)]
50pub enum CompileError {
51    /// An `AgentDef.kind` has no matching entry in the `SpawnerRegistry`
52    /// and `Blueprint.strategy.strict_kind` is set.
53    #[error("unknown agent kind in SpawnerRegistry: {0:?}")]
54    UnknownKind(AgentKind),
55    /// The `AgentDef.spec` shape did not match what the factory for its
56    /// kind requires (missing/mistyped field, etc.).
57    #[error("agent '{name}' spec invalid: {msg}")]
58    InvalidSpec {
59        /// The offending agent's name.
60        name: String,
61        /// Human-readable description of what was wrong with the spec.
62        msg: String,
63    },
64    /// The flow references an agent name that has no corresponding
65    /// `AgentDef` (and no default spawner is configured).
66    #[error("flow references agent '{0}' but no AgentDef matches")]
67    UnresolvedRef(String),
68    /// Two `AgentDef`s in the same `Blueprint` share a name.
69    #[error("duplicate AgentDef name: {0}")]
70    DuplicateAgent(String),
71    /// A `kind = Operator` agent's `spec.operator_ref` does not match
72    /// any `OperatorDef.name` declared in `Blueprint.operators`.
73    #[error("agent '{agent}' operator_ref '{op_ref}' does not match any OperatorDef.name in Blueprint.operators (defined: {defined:?})")]
74    UnresolvedOperatorRef {
75        /// The agent whose `operator_ref` didn't resolve.
76        agent: String,
77        /// The `operator_ref` value that was looked up.
78        op_ref: String,
79        /// The `OperatorDef.name`s that *are* declared, for the error
80        /// message.
81        defined: Vec<String>,
82    },
83}
84
85// ─── SpawnerFactory + Registry ───────────────────────────────────────────
86
87/// Factory trait that interprets an `AgentDef` and builds the concrete
88/// `SpawnerAdapter`. Register one per kind. Parsing the spec,
89/// validating it, and baking the profile are the implementation's job.
90///
91/// The signature was widened in v9 from `(name, spec, hint)` to
92/// `(&AgentDef, hint)` so the profile can be passed through. Most
93/// implementations still just pull `&agent_def.name` and
94/// `&agent_def.spec`, but Operator-backend factories consume
95/// `agent_def.profile` to bake the persona in.
96pub trait SpawnerFactory: Send + Sync {
97    /// Build the concrete `SpawnerAdapter` for one `AgentDef`. `hint` is
98    /// the matching entry (if any) from `Blueprint.hints.per_agent`.
99    fn build(
100        &self,
101        agent_def: &AgentDef,
102        hint: Option<&Value>,
103    ) -> Result<Arc<dyn SpawnerAdapter>, CompileError>;
104}
105
106/// Companion trait that carries the **type-side source of truth** for
107/// the Adapter ↔ `AgentKind` correspondence.
108///
109/// The base [`SpawnerFactory`] trait deliberately does not carry an
110/// associated const so it stays dyn-compatible — that is, so it can be
111/// stored and dispatched as `Arc<dyn SpawnerFactory>`. This companion
112/// trait splits `const KIND: AgentKind` out, and
113/// [`SpawnerRegistry::register`] uses `F::KIND` as the `HashMap` key.
114/// That physically removes the string-lookup failure mode at the type
115/// layer.
116///
117/// The three built-in factories (`Shell` / `InProc` / `Operator`)
118/// implement this. Extension backends (say, `AgentBlockSpawnerFactory`)
119/// follow the same explicit two-step recipe: add a new `AgentKind`
120/// variant and implement this trait.
121pub trait SpawnerFactoryKind: SpawnerFactory {
122    /// The `AgentKind` this factory handles — used as the `HashMap` key
123    /// by `SpawnerRegistry::register`.
124    const KIND: AgentKind;
125    /// The concrete Worker type produced by this `AgentKind` — this
126    /// binds the type chain all the way from `AgentKind` down to `Worker`.
127    /// Every factory declares it so the `AgentKind → Worker` mapping is
128    /// explicit across all four layers. It is the source of truth for
129    /// preserving the concrete type right up until `SpawnerAdapter::spawn`
130    /// erases it into `Box<dyn Worker>`.
131    type Worker: crate::worker::Worker;
132}
133
134/// `AgentKind → SpawnerFactory` mapping. The compiler looks entries up
135/// during `compile()`.
136#[derive(Clone)]
137pub struct SpawnerRegistry {
138    factories: HashMap<AgentKind, Arc<dyn SpawnerFactory>>,
139}
140
141impl SpawnerRegistry {
142    /// Start with an empty `AgentKind → SpawnerFactory` mapping.
143    pub fn new() -> Self {
144        Self {
145            factories: HashMap::new(),
146        }
147    }
148    /// **Type-driven registration** — takes `F::KIND` and uses it as the
149    /// `HashMap` key.
150    ///
151    /// Callers use the form
152    /// `reg.register::<SubprocessProcessSpawnerFactory>(Arc::new(...))`
153    /// and never have to pass an `AgentKind` literal. The Adapter ↔ Kind
154    /// correspondence is enforced at the type layer, physically removing
155    /// the string / enum-literal lookup failure mode.
156    pub fn register<F: SpawnerFactoryKind + 'static>(&mut self, factory: Arc<F>) -> &mut Self {
157        let f: Arc<dyn SpawnerFactory> = factory;
158        self.factories.insert(F::KIND, f);
159        self
160    }
161}
162
163impl Default for SpawnerRegistry {
164    fn default() -> Self {
165        Self::new()
166    }
167}
168
169// ─── Compiler ────────────────────────────────────────────────────────────
170
171/// Turns a `Blueprint` into a `CompiledBlueprint` by resolving every
172/// `AgentDef` against a `SpawnerRegistry`. One-shot: build a fresh
173/// `Compiler` per `compile()` call (or reuse it — it holds no
174/// per-compile state).
175pub struct Compiler {
176    registry: SpawnerRegistry,
177    default_spawner: Option<Arc<dyn SpawnerAdapter>>,
178}
179
180/// The result of `Compiler::compile` — a routing table plus the
181/// unmodified flow and metadata, ready to hand to
182/// `EngineDispatcher::with_spawner` / `mlua_flow_ir::eval_async`.
183pub struct CompiledBlueprint {
184    /// `ctx.agent → SpawnerAdapter` lookup table.
185    pub router: Arc<CompiledAgentTable>,
186    /// The flow.ir source, copied verbatim from `Blueprint.flow`.
187    pub flow: FlowNode,
188    /// Copied verbatim from `Blueprint.metadata`.
189    pub metadata: BlueprintMetadata,
190}
191
192impl Compiler {
193    /// Build a `Compiler` around the given `SpawnerRegistry`, with no
194    /// default spawner (unresolved flow refs are an error unless
195    /// `with_default` is chained on).
196    pub fn new(registry: SpawnerRegistry) -> Self {
197        Self {
198            registry,
199            default_spawner: None,
200        }
201    }
202
203    /// Set a default spawner — used for flow refs (and unregistered
204    /// `AgentKind`s under non-strict strategy) that don't resolve
205    /// against any `AgentDef`/`SpawnerRegistry` entry.
206    pub fn with_default(mut self, sp: Arc<dyn SpawnerAdapter>) -> Self {
207        self.default_spawner = Some(sp);
208        self
209    }
210
211    /// Resolve every `Blueprint.agents` entry through the registry,
212    /// validate `operator_ref`s and flow refs per `Blueprint.strategy`,
213    /// and return the routing table alongside the untouched flow and
214    /// metadata.
215    pub fn compile(&self, bp: &Blueprint) -> Result<CompiledBlueprint, CompileError> {
216        let mut routes: HashMap<String, Arc<dyn SpawnerAdapter>> = HashMap::new();
217        let mut seen: HashMap<String, ()> = HashMap::new();
218
219        // Design-time validation (OperatorDef as a first-class value):
220        // every `kind = Operator` agent's `spec.operator_ref` must point at
221        // one of `bp.operators[].name`. A Blueprint with any Operator agent
222        // must therefore declare its operators up front; the empty-operators
223        // backward-compat bypass is retired.
224        let defined: Vec<String> = bp.operators.iter().map(|o| o.name.clone()).collect();
225        for ad in &bp.agents {
226            if !matches!(ad.kind, AgentKind::Operator) {
227                continue;
228            }
229            let op_ref = ad.spec.get("operator_ref").and_then(|v| v.as_str());
230            if let Some(op_ref) = op_ref {
231                if !defined.iter().any(|n| n == op_ref) {
232                    return Err(CompileError::UnresolvedOperatorRef {
233                        agent: ad.name.clone(),
234                        op_ref: op_ref.to_string(),
235                        defined: defined.clone(),
236                    });
237                }
238            }
239            // A missing `op_ref` is reported through OperatorSpawnerFactory.build under a different error.
240        }
241
242        for ad in &bp.agents {
243            if seen.contains_key(&ad.name) {
244                return Err(CompileError::DuplicateAgent(ad.name.clone()));
245            }
246            seen.insert(ad.name.clone(), ());
247
248            let factory = match self.registry.factories.get(&ad.kind) {
249                Some(f) => f.clone(),
250                None => {
251                    if bp.strategy.strict_kind {
252                        return Err(CompileError::UnknownKind(ad.kind.clone()));
253                    } else {
254                        tracing::warn!(
255                            agent = %ad.name,
256                            kind = ?ad.kind,
257                            "no spawner factory registered for agent kind; \
258                             dropping agent from routing table (strict_kind=false)"
259                        );
260                        continue;
261                    }
262                }
263            };
264            let hint = bp.hints.per_agent.get(&ad.name);
265            let spawner = factory.build(ad, hint)?;
266            routes.insert(ad.name.clone(), spawner);
267        }
268
269        if bp.strategy.strict_refs {
270            verify_refs(&bp.flow, &routes, self.default_spawner.is_some())?;
271        }
272
273        let router = Arc::new(CompiledAgentTable {
274            routes,
275            default: self.default_spawner.clone(),
276        });
277        Ok(CompiledBlueprint {
278            router,
279            flow: bp.flow.clone(),
280            metadata: bp.metadata.clone(),
281        })
282    }
283}
284
285/// Walk the flow `Node`, collect every `Step.ref`, and check that no ref
286/// is unresolved against `routes` (or the default, when one exists).
287fn verify_refs(
288    node: &FlowNode,
289    routes: &HashMap<String, Arc<dyn SpawnerAdapter>>,
290    has_default: bool,
291) -> Result<(), CompileError> {
292    let mut refs: Vec<String> = Vec::new();
293    collect_refs(node, &mut refs);
294    for r in refs {
295        if !routes.contains_key(&r) && !has_default {
296            return Err(CompileError::UnresolvedRef(r));
297        }
298    }
299    Ok(())
300}
301
302fn collect_refs(node: &FlowNode, out: &mut Vec<String>) {
303    match node {
304        FlowNode::Step { ref_, .. } => out.push(ref_.clone()),
305        FlowNode::Seq { children } => {
306            for c in children {
307                collect_refs(c, out);
308            }
309        }
310        FlowNode::Branch { then_, else_, .. } => {
311            collect_refs(then_, out);
312            collect_refs(else_, out);
313        }
314        FlowNode::Fanout { body, .. } => collect_refs(body, out),
315        FlowNode::Loop { body, .. } => collect_refs(body, out),
316        FlowNode::Try { body, catch, .. } => {
317            collect_refs(body, out);
318            collect_refs(catch, out);
319        }
320        FlowNode::Assign { .. } => {} // The Assign node carries no ref.
321    }
322}
323
324// ─── CompiledAgentTable ───────────────────────────────────────────────────────
325
326/// The compile result: an `agent name → SpawnerAdapter` lookup table.
327///
328/// Looks `routes` up by `ctx.agent` (the flow.ir `Step.ref`) and hands
329/// the spawn to the matching `SpawnerAdapter`. If the name is not
330/// registered and a `default` is configured, the default is used; if
331/// there is no default, `SpawnError::NotRegistered` is returned.
332///
333/// Layer wrapping (`AuditMiddleware` / `MainAIMiddleware` and friends) is
334/// not this type's concern — that is done separately in
335/// `service::linker::link`.
336pub struct CompiledAgentTable {
337    pub(crate) routes: HashMap<String, Arc<dyn SpawnerAdapter>>,
338    pub(crate) default: Option<Arc<dyn SpawnerAdapter>>,
339}
340
341impl CompiledAgentTable {
342    /// Whether the given agent name is registered in the table — i.e.,
343    /// whether its spawner has been resolved.
344    pub fn has_route(&self, agent: &str) -> bool {
345        self.routes.contains_key(agent)
346    }
347    /// List every resolved agent name.
348    pub fn routed_agents(&self) -> Vec<String> {
349        self.routes.keys().cloned().collect()
350    }
351}
352
353#[async_trait]
354impl SpawnerAdapter for CompiledAgentTable {
355    async fn spawn(
356        &self,
357        engine: &Engine,
358        ctx: &Ctx,
359        task_id: TaskId,
360        attempt: u32,
361        token: CapToken,
362    ) -> Result<Box<dyn Worker>, SpawnError> {
363        let sp = self
364            .routes
365            .get(&ctx.agent)
366            .cloned()
367            .or_else(|| self.default.clone())
368            .ok_or_else(|| SpawnError::NotRegistered(ctx.agent.clone()))?;
369        sp.spawn(engine, ctx, task_id, attempt, token).await
370    }
371}
372
373// ─── default factories (three variants) ───────────────────────────────────
374
375/// Factory for `AgentKind::Subprocess`. Turns the spec into a
376/// [`ProcessSpawner`].
377///
378/// Naming convention: `<WorkerIMPL><AdapterType>SpawnerFactory`. Factory
379/// names carry both the worker implementation and the host adapter so
380/// they are not confused with each other; the old
381/// `ShellSpawnerFactory` was renamed to this.
382///
383/// Spec shape:
384/// ```jsonc
385/// { "program": "agent-block", "args": ["-s","s.lua"],
386///   "use_stdin": true,                       // optional, default = true
387///   "stream_mode": "ndjson_lines" | "sse_events" | "length_prefixed" | null  // optional, default = null (plain)
388/// }
389/// ```
390pub struct SubprocessProcessSpawnerFactory;
391
392impl SpawnerFactoryKind for SubprocessProcessSpawnerFactory {
393    const KIND: AgentKind = AgentKind::Subprocess;
394    type Worker = crate::worker::process_spawner::ProcessWorker;
395}
396
397impl SpawnerFactory for SubprocessProcessSpawnerFactory {
398    fn build(
399        &self,
400        agent_def: &AgentDef,
401        _hint: Option<&Value>,
402    ) -> Result<Arc<dyn SpawnerAdapter>, CompileError> {
403        let agent_name = &agent_def.name;
404        let spec = &agent_def.spec;
405        let invalid = |msg: String| CompileError::InvalidSpec {
406            name: agent_name.to_string(),
407            msg,
408        };
409        let program = spec
410            .get("program")
411            .and_then(|v| v.as_str())
412            .ok_or_else(|| invalid("shell spec: 'program' (string) required".into()))?
413            .to_string();
414        let args: Vec<String> = spec
415            .get("args")
416            .and_then(|v| v.as_array())
417            .map(|a| {
418                a.iter()
419                    .filter_map(|x| x.as_str().map(|s| s.to_string()))
420                    .collect()
421            })
422            .unwrap_or_default();
423        let use_stdin = spec
424            .get("use_stdin")
425            .and_then(|v| v.as_bool())
426            .unwrap_or(true);
427        let stream_mode = match spec.get("stream_mode").and_then(|v| v.as_str()) {
428            Some("ndjson_lines") => Some(StreamMode::NdjsonLines),
429            Some("sse_events") => Some(StreamMode::SseEvents),
430            Some("length_prefixed") => Some(StreamMode::LengthPrefixed),
431            Some(other) => return Err(invalid(format!("unknown stream_mode: {other}"))),
432            None => None,
433        };
434
435        let mut sp = ProcessSpawner {
436            program,
437            args,
438            use_stdin,
439            stream_mode,
440        };
441        if let Some(mode) = sp.stream_mode.clone() {
442            sp = sp.stream_mode(mode);
443        }
444        Ok(Arc::new(sp))
445    }
446}
447
448/// Factory for `AgentKind::Lua`. At `build` time it looks the `fn_id`
449/// up in its internal registry and returns an [`InProcSpawner`] with the
450/// Lua-eval `WorkerFn` registered under `agent_name` — one `InProcSpawner`
451/// instance per agent.
452///
453/// Naming convention: `<WorkerIMPL><AdapterType>SpawnerFactory` (Lua
454/// worker on InProcess adapter). One half of the old
455/// `InProcSpawnerFactory`, split into Lua and RustFn variants.
456///
457/// Spec shape:
458/// ```jsonc
459/// { "fn_id": "patch-spawner" }     // Lua source id pre-registered with the factory
460/// ```
461pub struct LuaInProcessSpawnerFactory {
462    registry: HashMap<String, WorkerFn>,
463    bridges: HashMap<String, HostBridge>,
464}
465
466/// Rust-side bridge function callable from Lua.
467///
468/// Inputs and outputs are both `serde_json::Value` (i.e. JSON). Lua
469/// invokes it as `host.<name>(arg_table)`. If the implementation needs
470/// to call async Rust, the caller does the sync-ification (typically
471/// `tokio::runtime::Handle::current().block_on(...)`).
472///
473/// Design intent: keep Lua scripts focused on flow control and `ctx`
474/// walking, while the heavy lifting (LLM calls, RFC 6902 apply,
475/// verifiers, and so on) stays on the Rust side. Going "pure Lua" —
476/// removing the bridge — is a carry.
477#[derive(Clone)]
478pub struct HostBridge(
479    Arc<dyn Fn(serde_json::Value) -> Result<serde_json::Value, String> + Send + Sync>,
480);
481
482impl HostBridge {
483    /// Wrap a Rust closure as a bridge callable from Lua.
484    pub fn new<F>(f: F) -> Self
485    where
486        F: Fn(serde_json::Value) -> Result<serde_json::Value, String> + Send + Sync + 'static,
487    {
488        Self(Arc::new(f))
489    }
490
491    /// Invoke the bridge directly — a thin trampoline over the inner
492    /// `Fn`. The production path goes through the Lua runtime, but this
493    /// stays `pub` so unit tests can exercise the primitive directly.
494    pub fn call(&self, arg: serde_json::Value) -> Result<serde_json::Value, String> {
495        (self.0)(arg)
496    }
497}
498
499/// Carrier type for Lua script sources. Paths are not required — a
500/// source string plus an identifying label is all it holds.
501///
502/// Callers bring in the source (via `include_str!` or similar) and
503/// register it with the factory through
504/// [`LuaInProcessSpawnerFactory::register_lua`].
505#[derive(Clone)]
506pub struct LuaScriptSource {
507    /// The Lua chunk source.
508    pub source: String,
509    /// Label used in error messages — typically the script's logical id
510    /// (for example `"patch_spawner.lua"`).
511    pub label: String,
512}
513
514impl LuaScriptSource {
515    /// Wrap a Lua chunk source and its error-message label.
516    pub fn new(source: impl Into<String>, label: impl Into<String>) -> Self {
517        Self {
518            source: source.into(),
519            label: label.into(),
520        }
521    }
522}
523
524impl LuaInProcessSpawnerFactory {
525    /// Start with no registered scripts and no host bridges.
526    pub fn new() -> Self {
527        Self {
528            registry: HashMap::new(),
529            bridges: HashMap::new(),
530        }
531    }
532
533    /// Register a host bridge. Subsequent `register_lua` calls snapshot
534    /// the current bridge set.
535    ///
536    /// Ordering rule: register bridges first, then call `register_lua`;
537    /// bridges added after `register_lua` will not be visible to that
538    /// script.
539    pub fn with_bridge(mut self, name: impl Into<String>, bridge: HostBridge) -> Self {
540        self.bridges.insert(name.into(), bridge);
541        self
542    }
543
544    /// Register a **Lua-eval Worker** under `fn_id`.
545    ///
546    /// Each dispatch spins up a fresh `mlua::Lua` VM, injects globals
547    /// (`_PROMPT` / `_AGENT` / `_TASK_ID` / `_ATTEMPT` / `_CTX` — the last
548    /// is `_PROMPT` parsed as JSON, or `nil` if that fails), evaluates
549    /// the script, and marshals the returned table into a `WorkerResult`.
550    ///
551    /// Marshalling rules for the return value:
552    /// - `{ value = ..., ok = bool }` → `WorkerResult.value` /
553    ///   `WorkerResult.ok` verbatim.
554    /// - Anything else → `value = <returned value>`, `ok = true`.
555    ///
556    /// Execution runs on `tokio::task::spawn_blocking` because `mlua::Lua`
557    /// is `!Send` and needs to stay away from the tokio async context.
558    /// Host bridges (the Lua-to-Rust callback path) previously registered
559    /// with [`Self::with_bridge`] are snapshotted at call time and
560    /// injected into every dispatch inside `run_lua_worker`.
561    pub fn register_lua(mut self, fn_id: impl Into<String>, source: LuaScriptSource) -> Self {
562        let source = Arc::new(source);
563        let bridges = Arc::new(self.bridges.clone());
564        let wrapped: WorkerFn = Arc::new(move |inv| {
565            let source = source.clone();
566            let bridges = bridges.clone();
567            Box::pin(run_lua_worker(source, bridges, inv))
568        });
569        self.registry.insert(fn_id.into(), wrapped);
570        self
571    }
572}
573
574/// Body of a single Lua-eval invocation (called from `register_lua`).
575async fn run_lua_worker(
576    source: Arc<LuaScriptSource>,
577    bridges: Arc<HashMap<String, HostBridge>>,
578    inv: crate::worker::adapter::WorkerInvocation,
579) -> Result<crate::worker::adapter::WorkerResult, crate::worker::adapter::WorkerError> {
580    use crate::worker::adapter::WorkerError;
581    use mlua::LuaSerdeExt;
582
583    let label = source.label.clone();
584    let outcome =
585        tokio::task::spawn_blocking(move || -> Result<(serde_json::Value, bool), String> {
586            let lua = mlua::Lua::new();
587            let g = lua.globals();
588
589            // 1. Base globals.
590            g.set("_PROMPT", inv.prompt.clone())
591                .map_err(|e| format!("set _PROMPT: {e}"))?;
592            g.set("_AGENT", inv.agent.clone())
593                .map_err(|e| format!("set _AGENT: {e}"))?;
594            g.set("_TASK_ID", inv.task_id.to_string())
595                .map_err(|e| format!("set _TASK_ID: {e}"))?;
596            g.set("_ATTEMPT", inv.attempt as i64)
597                .map_err(|e| format!("set _ATTEMPT: {e}"))?;
598
599            // 2. _CTX = JSON parse(_PROMPT); nil on parse failure (co-exists with the plain-string prompt path).
600            if let Ok(json_val) = serde_json::from_str::<serde_json::Value>(&inv.prompt) {
601                let lua_val = lua
602                    .to_value(&json_val)
603                    .map_err(|e| format!("_CTX to_value: {e}"))?;
604                g.set("_CTX", lua_val)
605                    .map_err(|e| format!("set _CTX: {e}"))?;
606            }
607
608            // 3. Inject the host bridge (Lua can call `host.<name>(arg)`).
609            if !bridges.is_empty() {
610                let host = lua
611                    .create_table()
612                    .map_err(|e| format!("create host table: {e}"))?;
613                for (name, bridge) in bridges.iter() {
614                    let bridge = bridge.clone();
615                    let bname = name.clone();
616                    let f = lua
617                        .create_function(move |lua, arg: mlua::Value| {
618                            let json_arg: serde_json::Value = lua.from_value(arg).map_err(|e| {
619                                mlua::Error::external(format!("bridge {bname} arg → json: {e}"))
620                            })?;
621                            let result_json =
622                                bridge.call(json_arg).map_err(mlua::Error::external)?;
623                            lua.to_value(&result_json).map_err(|e| {
624                                mlua::Error::external(format!("bridge {bname} ret → lua: {e}"))
625                            })
626                        })
627                        .map_err(|e| format!("create_function {name}: {e}"))?;
628                    host.set(name.as_str(), f)
629                        .map_err(|e| format!("host.{name} set: {e}"))?;
630                }
631                g.set("host", host).map_err(|e| format!("set host: {e}"))?;
632            }
633
634            // 4. eval
635            let result: mlua::Value = lua
636                .load(&source.source)
637                .set_name(&source.label)
638                .eval()
639                .map_err(|e| format!("lua eval [{}]: {e}", source.label))?;
640
641            // 5. Marshal: shape `{ value=..., ok=true }` or raw value.
642            let json_result: serde_json::Value = lua
643                .from_value(result)
644                .map_err(|e| format!("lua → json [{}]: {e}", source.label))?;
645
646            let (value, ok) = match &json_result {
647                serde_json::Value::Object(map)
648                    if map.contains_key("value") || map.contains_key("ok") =>
649                {
650                    let ok = map.get("ok").and_then(|v| v.as_bool()).unwrap_or(true);
651                    let value = map.get("value").cloned().unwrap_or(json_result.clone());
652                    (value, ok)
653                }
654                _ => (json_result, true),
655            };
656            Ok((value, ok))
657        })
658        .await
659        .map_err(|e| WorkerError::Failed(format!("spawn_blocking join [{label}]: {e}")))?
660        .map_err(WorkerError::Failed)?;
661
662    Ok(crate::worker::adapter::WorkerResult {
663        value: outcome.0,
664        ok: outcome.1,
665    })
666}
667
668impl Default for LuaInProcessSpawnerFactory {
669    fn default() -> Self {
670        Self::new()
671    }
672}
673
674impl SpawnerFactoryKind for LuaInProcessSpawnerFactory {
675    const KIND: AgentKind = AgentKind::Lua;
676    type Worker = LuaWorker;
677}
678
679impl SpawnerFactory for LuaInProcessSpawnerFactory {
680    fn build(
681        &self,
682        agent_def: &AgentDef,
683        _hint: Option<&Value>,
684    ) -> Result<Arc<dyn SpawnerAdapter>, CompileError> {
685        build_inproc_from_registry::<LuaWorker>(&self.registry, agent_def, "lua")
686    }
687}
688
689/// Factory for `AgentKind::RustFn`. At `build` time it looks the `fn_id`
690/// up in its internal registry and returns an [`InProcSpawner`] with the
691/// Rust closure `WorkerFn` registered under `agent_name`.
692///
693/// Naming convention: `<WorkerIMPL><AdapterType>SpawnerFactory` (RustFn
694/// worker on InProcess adapter). Sibling to
695/// [`LuaInProcessSpawnerFactory`] — the Lua-worker half of the same
696/// split.
697///
698/// Spec shape:
699/// ```jsonc
700/// { "fn_id": "echo" }     // Rust closure id pre-registered with the factory
701/// ```
702pub struct RustFnInProcessSpawnerFactory {
703    registry: HashMap<String, WorkerFn>,
704}
705
706impl RustFnInProcessSpawnerFactory {
707    /// Start with no registered closures.
708    pub fn new() -> Self {
709        Self {
710            registry: HashMap::new(),
711        }
712    }
713
714    /// Register a Rust closure `WorkerFn` under `fn_id`, wrapping it so
715    /// it matches the `WorkerFn` signature (boxed, pinned future).
716    pub fn register_fn<F, Fut>(mut self, fn_id: impl Into<String>, f: F) -> Self
717    where
718        F: Fn(crate::worker::adapter::WorkerInvocation) -> Fut + Send + Sync + 'static,
719        Fut: std::future::Future<
720                Output = Result<
721                    crate::worker::adapter::WorkerResult,
722                    crate::worker::adapter::WorkerError,
723                >,
724            > + Send
725            + 'static,
726    {
727        let f = Arc::new(f);
728        let wrapped: WorkerFn = Arc::new(move |inv| {
729            let f = f.clone();
730            Box::pin(f(inv))
731        });
732        self.registry.insert(fn_id.into(), wrapped);
733        self
734    }
735}
736
737impl Default for RustFnInProcessSpawnerFactory {
738    fn default() -> Self {
739        Self::new()
740    }
741}
742
743impl SpawnerFactoryKind for RustFnInProcessSpawnerFactory {
744    const KIND: AgentKind = AgentKind::RustFn;
745    type Worker = RustFnWorker;
746}
747
748impl SpawnerFactory for RustFnInProcessSpawnerFactory {
749    fn build(
750        &self,
751        agent_def: &AgentDef,
752        _hint: Option<&Value>,
753    ) -> Result<Arc<dyn SpawnerAdapter>, CompileError> {
754        build_inproc_from_registry::<RustFnWorker>(&self.registry, agent_def, "rust_fn")
755    }
756}
757
758/// Shared build helper used by both the Lua and the RustFn factories —
759/// look `spec.fn_id` up in the registry and return an `InProcSpawner`.
760/// The generic type parameter `W` fixes the per-kind Worker concrete
761/// type at the type level (the build-site half of the trait's
762/// associated-type binding across the four-layer cascade).
763fn build_inproc_from_registry<W>(
764    registry: &HashMap<String, WorkerFn>,
765    agent_def: &AgentDef,
766    kind_label: &str,
767) -> Result<Arc<dyn SpawnerAdapter>, CompileError>
768where
769    W: crate::worker::Worker + From<crate::worker::WorkerJoinHandler> + Send + Sync + 'static,
770{
771    let agent_name = &agent_def.name;
772    let spec = &agent_def.spec;
773    let invalid = |msg: String| CompileError::InvalidSpec {
774        name: agent_name.to_string(),
775        msg,
776    };
777    let fn_id = spec
778        .get("fn_id")
779        .and_then(|v| v.as_str())
780        .ok_or_else(|| invalid(format!("{kind_label} spec: 'fn_id' (string) required")))?;
781    let f = registry
782        .get(fn_id)
783        .cloned()
784        .ok_or_else(|| invalid(format!("fn_id '{fn_id}' not registered in factory")))?;
785    let mut sp: InProcSpawner<W> = InProcSpawner::<W>::typed();
786    // Register under `agent_name` (the flow's `Step.ref`). Both
787    // `CompiledAgentTable` and the `InProcSpawner` look the function up
788    // by name, so the same key is needed at both layers.
789    sp.registry.insert(agent_name.to_string(), f);
790    Ok(Arc::new(sp))
791}
792
793/// Concrete Worker type for the Lua kind — a handle to a Lua-eval task
794/// inside an mlua VM. Embeds a `WorkerJoinHandler`. Reserved as the home
795/// for future Lua-specific extensions (an mlua VM cancellation
796/// mechanism, Lua-side error type retention, and so on).
797pub struct LuaWorker {
798    /// The join handle / cancellation token for the underlying task.
799    pub handler: crate::worker::WorkerJoinHandler,
800}
801
802impl From<crate::worker::WorkerJoinHandler> for LuaWorker {
803    fn from(handler: crate::worker::WorkerJoinHandler) -> Self {
804        Self { handler }
805    }
806}
807
808#[async_trait::async_trait]
809impl crate::worker::Worker for LuaWorker {
810    fn id(&self) -> &crate::types::WorkerId {
811        &self.handler.worker_id
812    }
813    fn cancel_token(&self) -> tokio_util::sync::CancellationToken {
814        self.handler.cancel.clone()
815    }
816    async fn join(self: Box<Self>) -> Result<(), crate::worker::adapter::WorkerError> {
817        self.handler.await_completion().await
818    }
819}
820
821/// Concrete Worker type for the RustFn kind — a handle to a task that
822/// directly calls a Rust closure. Embeds a `WorkerJoinHandler`. Being a
823/// pure function, there is minimal kind-specific extension surface here;
824/// the primary purpose is to nail down the type binding.
825pub struct RustFnWorker {
826    /// The join handle / cancellation token for the underlying task.
827    pub handler: crate::worker::WorkerJoinHandler,
828}
829
830impl From<crate::worker::WorkerJoinHandler> for RustFnWorker {
831    fn from(handler: crate::worker::WorkerJoinHandler) -> Self {
832        Self { handler }
833    }
834}
835
836#[async_trait::async_trait]
837impl crate::worker::Worker for RustFnWorker {
838    fn id(&self) -> &crate::types::WorkerId {
839        &self.handler.worker_id
840    }
841    fn cancel_token(&self) -> tokio_util::sync::CancellationToken {
842        self.handler.cancel.clone()
843    }
844    async fn join(self: Box<Self>) -> Result<(), crate::worker::adapter::WorkerError> {
845        self.handler.await_completion().await
846    }
847}
848
849/// Factory for `AgentKind::Operator`. Looks up the `Arc<dyn Operator>`
850/// pre-registered under `spec.operator_ref` and wraps it in an
851/// `OperatorSpawner`. Also resolves `AgentDef.profile.worker_binding` into
852/// a `WorkerBinding` at compile time and fails loud (`CompileError::InvalidSpec`)
853/// when the resolved operator's `Operator::requires_worker_binding` is `true`
854/// and no binding was declared.
855///
856/// Spec shape:
857/// ```jsonc
858/// { "operator_ref": "main_ai" }     // Operator id pre-registered with the factory
859/// ```
860///
861/// # Split of responsibilities with `OperatorDelegateMiddleware`
862///
863/// The two axes exist for different reasons:
864///
865/// - **This factory (`OperatorSpawnerFactory` → `OperatorSpawner`) — the
866///   AgentSpec axis.** Bakes a separate Operator backend into each
867///   `AgentDef`. A `kind = Operator` `AgentDef` names its backend through
868///   `spec.operator_ref`; at `compile()` time the `Arc<dyn Operator>` is
869///   baked into `routes[agent_name]`. Because the `agent.md` loader
870///   (`agent_md_loader`) defaults `kind` to `Operator`, agents that flow
871///   in through agent-profiles land here.
872///
873/// - **`OperatorDelegateMiddleware` — the Blueprint-global (session)
874///   axis.** Delegates every agent to the same Operator backend. At
875///   session-attach time you call `engine.register_operator(id, op)`
876///   plus `attach_with_ids(.., operator_backend_id = Some(id))` to bind
877///   it session-wide, and declare
878///   `spawner_hints.layers = ["operator_delegate"]` to opt in. `ctx.agent`
879///   is ignored; the operator handles every spawn in that session (a
880///   MainAI-wide driver, a human-wide console, that sort of thing).
881///
882/// # Exclusivity (a double fire is structurally impossible)
883///
884/// When both are effective — the hint is declared, the session has an
885/// operator backend, **and** the Blueprint has a `kind = Operator`
886/// `AgentDef` — `OperatorDelegateMiddleware` sits at the outer end of
887/// the stack and **completely bypasses** `inner.spawn`. The
888/// `OperatorSpawner` is never reached, so under those conditions this
889/// factory's routes entry is inert. This is not a double fire — the
890/// session axis is overriding the agent axis. Consistent usage means
891/// picking one axis per use case.
892///
893/// Interior mutability is provided by an `Arc<RwLock>`. Even after the
894/// factory has been stored as `Arc<dyn SpawnerFactory>` in
895/// `SpawnerRegistry`, a caller holding an `Arc` clone can still add
896/// Operator backends dynamically via `register_operator(&self, id, op)`.
897/// Typical uses: registering a `WSOperatorSession` under the session id
898/// on WebSocket connect, binding agents that arrive via the `agent.md`
899/// loader to arbitrary backends, and so on. `build()` performs a
900/// `read()` lookup each time.
901pub struct OperatorSpawnerFactory {
902    operators: Arc<std::sync::RwLock<HashMap<String, Arc<dyn Operator>>>>,
903}
904
905impl OperatorSpawnerFactory {
906    /// Start with no registered Operator backends.
907    pub fn new() -> Self {
908        Self {
909            operators: Arc::new(std::sync::RwLock::new(HashMap::new())),
910        }
911    }
912
913    /// Register an Operator backend dynamically through `&self`.
914    /// Overwrites are allowed — later wins. Callers can still reach this
915    /// after the factory has been stored as `Arc<dyn SpawnerFactory>` in
916    /// `SpawnerRegistry`, as long as they hold an `Arc` clone; interior
917    /// mutability is provided by the inner `RwLock`.
918    pub fn register_operator(&self, id: impl Into<String>, op: Arc<dyn Operator>) -> &Self {
919        self.operators
920            .write()
921            .expect("OperatorSpawnerFactory.operators RwLock poisoned")
922            .insert(id.into(), op);
923        self
924    }
925
926    /// Dynamically unregister an id (used to clean up when a WebSocket
927    /// disconnects, for example). A missing id is a no-op.
928    pub fn unregister_operator(&self, id: &str) -> &Self {
929        self.operators
930            .write()
931            .expect("OperatorSpawnerFactory.operators RwLock poisoned")
932            .remove(id);
933        self
934    }
935}
936
937impl Default for OperatorSpawnerFactory {
938    fn default() -> Self {
939        Self::new()
940    }
941}
942
943impl SpawnerFactoryKind for OperatorSpawnerFactory {
944    const KIND: AgentKind = AgentKind::Operator;
945    type Worker = crate::operator::OperatorWorker;
946}
947
948impl SpawnerFactory for OperatorSpawnerFactory {
949    fn build(
950        &self,
951        agent_def: &AgentDef,
952        _hint: Option<&Value>,
953    ) -> Result<Arc<dyn SpawnerAdapter>, CompileError> {
954        let agent_name = &agent_def.name;
955        let spec = &agent_def.spec;
956        // Bake AgentDef.profile.system_prompt into the OperatorSpawner at compile time.
957        // `Some` → adopted first at spawn time; `None` → falls back to fetch_prompt (initial_directive).
958        // Fallback path. Sibling: AgentBlockInProcessSpawnerFactory
959        // (agent_block/runtime.rs) does the same compile-time bake by stuffing
960        // the profile into BlockConfig.context.
961        let system_prompt = agent_def.profile.as_ref().map(|p| p.system_prompt.clone());
962        let invalid = |msg: String| CompileError::InvalidSpec {
963            name: agent_name.to_string(),
964            msg,
965        };
966        let op_ref = spec
967            .get("operator_ref")
968            .and_then(|v| v.as_str())
969            .ok_or_else(|| invalid("operator spec: 'operator_ref' (string) required".into()))?;
970        let operators = self
971            .operators
972            .read()
973            .expect("OperatorSpawnerFactory.operators RwLock poisoned");
974        let op = operators.get(op_ref).cloned().ok_or_else(|| {
975            let mut names: Vec<String> = operators.keys().cloned().collect();
976            names.sort();
977            let names_list = if names.is_empty() {
978                "<none>".to_string()
979            } else {
980                names.join(", ")
981            };
982            invalid(format!(
983                "operator_ref '{op_ref}' not registered in factory. \
984                 Registered sids: [{names_list}]. \
985                 Hint: call mse_operator_join(roles=[...]) to mint the sid first."
986            ))
987        })?;
988        drop(operators);
989
990        // Resolve the Blueprint-baked worker binding from
991        // `AgentDef.profile.worker_binding` — the SoT for the
992        // declaration↔executor binding (see `WorkerBinding` doc). Fail
993        // loud at compile time when the operator backend requires one
994        // and the Blueprint didn't declare it; this is a compile-time
995        // gate, not a runtime guess.
996        let worker_binding = agent_def
997            .profile
998            .as_ref()
999            .and_then(|p| p.worker_binding.as_ref())
1000            .map(|subagent_type| WorkerBinding {
1001                subagent_type: subagent_type.clone(),
1002                tools: agent_def
1003                    .profile
1004                    .as_ref()
1005                    .map(|p| p.tools.clone())
1006                    .unwrap_or_default(),
1007            });
1008        if op.requires_worker_binding() && worker_binding.is_none() {
1009            return Err(invalid(
1010                "profile.worker_binding is required for this operator backend; \
1011                 declare it in the agent .md frontmatter"
1012                    .into(),
1013            ));
1014        }
1015        Ok(Arc::new(OperatorSpawner::new(
1016            op,
1017            system_prompt,
1018            worker_binding,
1019        )))
1020    }
1021}
1022
1023#[cfg(test)]
1024mod operator_spawner_factory_worker_binding_tests {
1025    use super::*;
1026    use crate::blueprint::AgentProfile;
1027    use crate::core::ctx::Ctx;
1028    use crate::types::CapToken;
1029    use crate::worker::adapter::{WorkerError, WorkerResult};
1030
1031    /// Minimal `Operator` stub whose `requires_worker_binding` is
1032    /// configurable — enough to exercise the compile-time fail-loud gate
1033    /// without standing up a real backend (e.g. `WSOperatorSession`,
1034    /// which lives in a downstream crate).
1035    struct StubOperator {
1036        requires_binding: bool,
1037    }
1038
1039    #[async_trait]
1040    impl Operator for StubOperator {
1041        async fn execute(
1042            &self,
1043            _ctx: &Ctx,
1044            _system: Option<String>,
1045            _prompt: String,
1046            _worker: Option<WorkerBinding>,
1047            _worker_token: CapToken,
1048        ) -> Result<WorkerResult, WorkerError> {
1049            Ok(WorkerResult {
1050                value: Value::Null,
1051                ok: true,
1052            })
1053        }
1054
1055        fn requires_worker_binding(&self) -> bool {
1056            self.requires_binding
1057        }
1058    }
1059
1060    fn agent_def_with(profile: Option<AgentProfile>) -> AgentDef {
1061        AgentDef {
1062            name: "test-agent".to_string(),
1063            kind: AgentKind::Operator,
1064            spec: serde_json::json!({ "operator_ref": "op1" }),
1065            profile,
1066            meta: None,
1067        }
1068    }
1069
1070    #[test]
1071    fn build_fails_loud_when_binding_required_but_absent() {
1072        let factory = OperatorSpawnerFactory::new();
1073        factory.register_operator(
1074            "op1",
1075            Arc::new(StubOperator {
1076                requires_binding: true,
1077            }) as Arc<dyn Operator>,
1078        );
1079        let def = agent_def_with(Some(AgentProfile::default()));
1080        match factory.build(&def, None) {
1081            Err(CompileError::InvalidSpec { name, msg }) => {
1082                assert_eq!(name, "test-agent");
1083                assert!(
1084                    msg.contains("worker_binding is required"),
1085                    "unexpected message: {msg}"
1086                );
1087            }
1088            Err(other) => panic!("expected InvalidSpec, got: {other:?}"),
1089            Ok(_) => panic!("expected compile-time failure, got Ok"),
1090        }
1091    }
1092
1093    #[test]
1094    fn build_succeeds_when_binding_required_and_present() {
1095        let factory = OperatorSpawnerFactory::new();
1096        factory.register_operator(
1097            "op1",
1098            Arc::new(StubOperator {
1099                requires_binding: true,
1100            }) as Arc<dyn Operator>,
1101        );
1102        let profile = AgentProfile {
1103            worker_binding: Some("mse-worker-coder".to_string()),
1104            tools: vec!["Read".to_string(), "Edit".to_string()],
1105            ..Default::default()
1106        };
1107        let def = agent_def_with(Some(profile));
1108        assert!(
1109            factory.build(&def, None).is_ok(),
1110            "expected Ok when worker_binding is declared"
1111        );
1112    }
1113
1114    #[test]
1115    fn build_succeeds_when_binding_not_required_and_absent() {
1116        let factory = OperatorSpawnerFactory::new();
1117        factory.register_operator(
1118            "op1",
1119            Arc::new(StubOperator {
1120                requires_binding: false,
1121            }) as Arc<dyn Operator>,
1122        );
1123        let def = agent_def_with(Some(AgentProfile::default()));
1124        assert!(
1125            factory.build(&def, None).is_ok(),
1126            "backends that don't require a binding must not be gated by its absence"
1127        );
1128    }
1129}