mlua_swarm/blueprint/compiler.rs
1//! Blueprint `Compiler`, `CompiledAgentTable`, and the three default
2//! `SpawnerFactory` implementations.
3//!
4//! ## Pipeline
5//!
6//! ```text
7//! Blueprint (= flow + agents + hints + strategy + spawner_hints)
8//! │
9//! │ Compiler.compile(&bp) ← this module (AgentDef → SpawnerAdapter table)
10//! ▼
11//! CompiledBlueprint {
12//! router: Arc<CompiledAgentTable>, // ctx.agent → SpawnerAdapter lookup
13//! flow: FlowNode, // the flow.ir source (evaluated via EngineDispatcher)
14//! metadata: BlueprintMetadata,
15//! }
16//! │
17//! │ service::linker::link(router, blueprint.spawner_hints.layers, &engine)
18//! ▼ ↑ Layer wrapping is done separately (src/service/linker.rs)
19//! `Arc<dyn SpawnerAdapter>` (already wrapped with base + hint SpawnerLayers)
20//! │
21//! ▼ EngineDispatcher::with_spawner → engine.dispatch_attempt_with
22//! ```
23//!
24//! `CompiledAgentTable` is a thin table: it looks up `routes[name]` by
25//! `ctx.agent` and hands the spawn off to the matching `SpawnerAdapter`.
26//! The `routes` map is built at compile time through `SpawnerFactory`
27//! implementations. Layer wrapping is not part of this module — it lives
28//! in `service::linker::link`.
29
30use crate::blueprint::{AgentDef, AgentKind, Blueprint, BlueprintMetadata};
31use crate::core::ctx::Ctx;
32use crate::core::engine::Engine;
33use crate::operator::{Operator, OperatorSpawner, WorkerBinding};
34use crate::types::{CapToken, TaskId};
35use crate::worker::adapter::{InProcSpawner, SpawnError, SpawnerAdapter, WorkerFn};
36use crate::worker::process_spawner::{ProcessSpawner, StreamMode};
37use crate::worker::Worker;
38use async_trait::async_trait;
39use mlua_flow_ir::Node as FlowNode;
40use serde_json::Value;
41use std::collections::HashMap;
42use std::sync::Arc;
43use thiserror::Error;
44
45// ─── error ───────────────────────────────────────────────────────────────
46
47/// Everything that can go wrong while `Compiler::compile` turns a
48/// `Blueprint` into a `CompiledBlueprint`.
49#[derive(Debug, Error)]
50pub enum CompileError {
51 /// An `AgentDef.kind` has no matching entry in the `SpawnerRegistry`
52 /// and `Blueprint.strategy.strict_kind` is set.
53 #[error("unknown agent kind in SpawnerRegistry: {0:?}")]
54 UnknownKind(AgentKind),
55 /// The `AgentDef.spec` shape did not match what the factory for its
56 /// kind requires (missing/mistyped field, etc.).
57 #[error("agent '{name}' spec invalid: {msg}")]
58 InvalidSpec {
59 /// The offending agent's name.
60 name: String,
61 /// Human-readable description of what was wrong with the spec.
62 msg: String,
63 },
64 /// The flow references an agent name that has no corresponding
65 /// `AgentDef` (and no default spawner is configured).
66 #[error("flow references agent '{0}' but no AgentDef matches")]
67 UnresolvedRef(String),
68 /// Two `AgentDef`s in the same `Blueprint` share a name.
69 #[error("duplicate AgentDef name: {0}")]
70 DuplicateAgent(String),
71 /// A `kind = Operator` agent's `spec.operator_ref` does not match
72 /// any `OperatorDef.name` declared in `Blueprint.operators`.
73 #[error("agent '{agent}' operator_ref '{op_ref}' does not match any OperatorDef.name in Blueprint.operators (defined: {defined:?})")]
74 UnresolvedOperatorRef {
75 /// The agent whose `operator_ref` didn't resolve.
76 agent: String,
77 /// The `operator_ref` value that was looked up.
78 op_ref: String,
79 /// The `OperatorDef.name`s that *are* declared, for the error
80 /// message.
81 defined: Vec<String>,
82 },
83}
84
85// ─── SpawnerFactory + Registry ───────────────────────────────────────────
86
87/// Factory trait that interprets an `AgentDef` and builds the concrete
88/// `SpawnerAdapter`. Register one per kind. Parsing the spec,
89/// validating it, and baking the profile are the implementation's job.
90///
91/// The signature was widened in v9 from `(name, spec, hint)` to
92/// `(&AgentDef, hint)` so the profile can be passed through. Most
93/// implementations still just pull `&agent_def.name` and
94/// `&agent_def.spec`, but Operator-backend factories consume
95/// `agent_def.profile` to bake the persona in.
96pub trait SpawnerFactory: Send + Sync {
97 /// Build the concrete `SpawnerAdapter` for one `AgentDef`. `hint` is
98 /// the matching entry (if any) from `Blueprint.hints.per_agent`.
99 fn build(
100 &self,
101 agent_def: &AgentDef,
102 hint: Option<&Value>,
103 ) -> Result<Arc<dyn SpawnerAdapter>, CompileError>;
104}
105
106/// Companion trait that carries the **type-side source of truth** for
107/// the Adapter ↔ `AgentKind` correspondence.
108///
109/// The base [`SpawnerFactory`] trait deliberately does not carry an
110/// associated const so it stays dyn-compatible — that is, so it can be
111/// stored and dispatched as `Arc<dyn SpawnerFactory>`. This companion
112/// trait splits `const KIND: AgentKind` out, and
113/// [`SpawnerRegistry::register`] uses `F::KIND` as the `HashMap` key.
114/// That physically removes the string-lookup failure mode at the type
115/// layer.
116///
117/// The three built-in factories (`Shell` / `InProc` / `Operator`)
118/// implement this. Extension backends (say, `AgentBlockSpawnerFactory`)
119/// follow the same explicit two-step recipe: add a new `AgentKind`
120/// variant and implement this trait.
121pub trait SpawnerFactoryKind: SpawnerFactory {
122 /// The `AgentKind` this factory handles — used as the `HashMap` key
123 /// by `SpawnerRegistry::register`.
124 const KIND: AgentKind;
125 /// The concrete Worker type produced by this `AgentKind` — this
126 /// binds the type chain all the way from `AgentKind` down to `Worker`.
127 /// Every factory declares it so the `AgentKind → Worker` mapping is
128 /// explicit across all four layers. It is the source of truth for
129 /// preserving the concrete type right up until `SpawnerAdapter::spawn`
130 /// erases it into `Box<dyn Worker>`.
131 type Worker: crate::worker::Worker;
132}
133
134/// `AgentKind → SpawnerFactory` mapping. The compiler looks entries up
135/// during `compile()`.
136#[derive(Clone)]
137pub struct SpawnerRegistry {
138 factories: HashMap<AgentKind, Arc<dyn SpawnerFactory>>,
139}
140
141impl SpawnerRegistry {
142 /// Start with an empty `AgentKind → SpawnerFactory` mapping.
143 pub fn new() -> Self {
144 Self {
145 factories: HashMap::new(),
146 }
147 }
148 /// **Type-driven registration** — takes `F::KIND` and uses it as the
149 /// `HashMap` key.
150 ///
151 /// Callers use the form
152 /// `reg.register::<SubprocessProcessSpawnerFactory>(Arc::new(...))`
153 /// and never have to pass an `AgentKind` literal. The Adapter ↔ Kind
154 /// correspondence is enforced at the type layer, physically removing
155 /// the string / enum-literal lookup failure mode.
156 pub fn register<F: SpawnerFactoryKind + 'static>(&mut self, factory: Arc<F>) -> &mut Self {
157 let f: Arc<dyn SpawnerFactory> = factory;
158 self.factories.insert(F::KIND, f);
159 self
160 }
161}
162
163impl Default for SpawnerRegistry {
164 fn default() -> Self {
165 Self::new()
166 }
167}
168
169// ─── Compiler ────────────────────────────────────────────────────────────
170
171/// Turns a `Blueprint` into a `CompiledBlueprint` by resolving every
172/// `AgentDef` against a `SpawnerRegistry`. One-shot: build a fresh
173/// `Compiler` per `compile()` call (or reuse it — it holds no
174/// per-compile state).
175pub struct Compiler {
176 registry: SpawnerRegistry,
177 default_spawner: Option<Arc<dyn SpawnerAdapter>>,
178}
179
180/// The result of `Compiler::compile` — a routing table plus the
181/// unmodified flow and metadata, ready to hand to
182/// `EngineDispatcher::with_spawner` / `mlua_flow_ir::eval_async`.
183pub struct CompiledBlueprint {
184 /// `ctx.agent → SpawnerAdapter` lookup table.
185 pub router: Arc<CompiledAgentTable>,
186 /// The flow.ir source, copied verbatim from `Blueprint.flow`.
187 pub flow: FlowNode,
188 /// Copied verbatim from `Blueprint.metadata`.
189 pub metadata: BlueprintMetadata,
190}
191
192impl Compiler {
193 /// Build a `Compiler` around the given `SpawnerRegistry`, with no
194 /// default spawner (unresolved flow refs are an error unless
195 /// `with_default` is chained on).
196 pub fn new(registry: SpawnerRegistry) -> Self {
197 Self {
198 registry,
199 default_spawner: None,
200 }
201 }
202
203 /// Set a default spawner — used for flow refs (and unregistered
204 /// `AgentKind`s under non-strict strategy) that don't resolve
205 /// against any `AgentDef`/`SpawnerRegistry` entry.
206 pub fn with_default(mut self, sp: Arc<dyn SpawnerAdapter>) -> Self {
207 self.default_spawner = Some(sp);
208 self
209 }
210
211 /// Resolve every `Blueprint.agents` entry through the registry,
212 /// validate `operator_ref`s and flow refs per `Blueprint.strategy`,
213 /// and return the routing table alongside the untouched flow and
214 /// metadata.
215 pub fn compile(&self, bp: &Blueprint) -> Result<CompiledBlueprint, CompileError> {
216 let mut routes: HashMap<String, Arc<dyn SpawnerAdapter>> = HashMap::new();
217 let mut seen: HashMap<String, ()> = HashMap::new();
218
219 // Design-time validation (OperatorDef as a first-class value):
220 // every `kind = Operator` agent's `spec.operator_ref` must point at
221 // one of `bp.operators[].name`. A Blueprint with any Operator agent
222 // must therefore declare its operators up front; the empty-operators
223 // backward-compat bypass is retired.
224 let defined: Vec<String> = bp.operators.iter().map(|o| o.name.clone()).collect();
225 for ad in &bp.agents {
226 if !matches!(ad.kind, AgentKind::Operator) {
227 continue;
228 }
229 let op_ref = ad.spec.get("operator_ref").and_then(|v| v.as_str());
230 if let Some(op_ref) = op_ref {
231 if !defined.iter().any(|n| n == op_ref) {
232 return Err(CompileError::UnresolvedOperatorRef {
233 agent: ad.name.clone(),
234 op_ref: op_ref.to_string(),
235 defined: defined.clone(),
236 });
237 }
238 }
239 // A missing `op_ref` is reported through OperatorSpawnerFactory.build under a different error.
240 }
241
242 for ad in &bp.agents {
243 if seen.contains_key(&ad.name) {
244 return Err(CompileError::DuplicateAgent(ad.name.clone()));
245 }
246 seen.insert(ad.name.clone(), ());
247
248 let factory = match self.registry.factories.get(&ad.kind) {
249 Some(f) => f.clone(),
250 None => {
251 if bp.strategy.strict_kind {
252 return Err(CompileError::UnknownKind(ad.kind.clone()));
253 } else {
254 tracing::warn!(
255 agent = %ad.name,
256 kind = ?ad.kind,
257 "no spawner factory registered for agent kind; \
258 dropping agent from routing table (strict_kind=false)"
259 );
260 continue;
261 }
262 }
263 };
264 let hint = bp.hints.per_agent.get(&ad.name);
265 let spawner = factory.build(ad, hint)?;
266 routes.insert(ad.name.clone(), spawner);
267 }
268
269 if bp.strategy.strict_refs {
270 verify_refs(&bp.flow, &routes, self.default_spawner.is_some())?;
271 }
272
273 let router = Arc::new(CompiledAgentTable {
274 routes,
275 default: self.default_spawner.clone(),
276 });
277 Ok(CompiledBlueprint {
278 router,
279 flow: bp.flow.clone(),
280 metadata: bp.metadata.clone(),
281 })
282 }
283}
284
285/// Walk the flow `Node`, collect every `Step.ref`, and check that no ref
286/// is unresolved against `routes` (or the default, when one exists).
287fn verify_refs(
288 node: &FlowNode,
289 routes: &HashMap<String, Arc<dyn SpawnerAdapter>>,
290 has_default: bool,
291) -> Result<(), CompileError> {
292 let mut refs: Vec<String> = Vec::new();
293 collect_refs(node, &mut refs);
294 for r in refs {
295 if !routes.contains_key(&r) && !has_default {
296 return Err(CompileError::UnresolvedRef(r));
297 }
298 }
299 Ok(())
300}
301
302fn collect_refs(node: &FlowNode, out: &mut Vec<String>) {
303 match node {
304 FlowNode::Step { ref_, .. } => out.push(ref_.clone()),
305 FlowNode::Seq { children } => {
306 for c in children {
307 collect_refs(c, out);
308 }
309 }
310 FlowNode::Branch { then_, else_, .. } => {
311 collect_refs(then_, out);
312 collect_refs(else_, out);
313 }
314 FlowNode::Fanout { body, .. } => collect_refs(body, out),
315 FlowNode::Loop { body, .. } => collect_refs(body, out),
316 FlowNode::Try { body, catch, .. } => {
317 collect_refs(body, out);
318 collect_refs(catch, out);
319 }
320 FlowNode::Assign { .. } => {} // The Assign node carries no ref.
321 }
322}
323
324// ─── CompiledAgentTable ───────────────────────────────────────────────────────
325
326/// The compile result: an `agent name → SpawnerAdapter` lookup table.
327///
328/// Looks `routes` up by `ctx.agent` (the flow.ir `Step.ref`) and hands
329/// the spawn to the matching `SpawnerAdapter`. If the name is not
330/// registered and a `default` is configured, the default is used; if
331/// there is no default, `SpawnError::NotRegistered` is returned.
332///
333/// Layer wrapping (`AuditMiddleware` / `MainAIMiddleware` and friends) is
334/// not this type's concern — that is done separately in
335/// `service::linker::link`.
336pub struct CompiledAgentTable {
337 pub(crate) routes: HashMap<String, Arc<dyn SpawnerAdapter>>,
338 pub(crate) default: Option<Arc<dyn SpawnerAdapter>>,
339}
340
341impl CompiledAgentTable {
342 /// Whether the given agent name is registered in the table — i.e.,
343 /// whether its spawner has been resolved.
344 pub fn has_route(&self, agent: &str) -> bool {
345 self.routes.contains_key(agent)
346 }
347 /// List every resolved agent name.
348 pub fn routed_agents(&self) -> Vec<String> {
349 self.routes.keys().cloned().collect()
350 }
351}
352
353#[async_trait]
354impl SpawnerAdapter for CompiledAgentTable {
355 async fn spawn(
356 &self,
357 engine: &Engine,
358 ctx: &Ctx,
359 task_id: TaskId,
360 attempt: u32,
361 token: CapToken,
362 ) -> Result<Box<dyn Worker>, SpawnError> {
363 let sp = self
364 .routes
365 .get(&ctx.agent)
366 .cloned()
367 .or_else(|| self.default.clone())
368 .ok_or_else(|| SpawnError::NotRegistered(ctx.agent.clone()))?;
369 sp.spawn(engine, ctx, task_id, attempt, token).await
370 }
371}
372
373// ─── default factories (three variants) ───────────────────────────────────
374
375/// Factory for `AgentKind::Subprocess`. Turns the spec into a
376/// [`ProcessSpawner`].
377///
378/// Naming convention: `<WorkerIMPL><AdapterType>SpawnerFactory`. Factory
379/// names carry both the worker implementation and the host adapter so
380/// they are not confused with each other; the old
381/// `ShellSpawnerFactory` was renamed to this.
382///
383/// Spec shape:
384/// ```jsonc
385/// { "program": "agent-block", "args": ["-s","s.lua"],
386/// "use_stdin": true, // optional, default = true
387/// "stream_mode": "ndjson_lines" | "sse_events" | "length_prefixed" | null // optional, default = null (plain)
388/// }
389/// ```
390pub struct SubprocessProcessSpawnerFactory;
391
392impl SpawnerFactoryKind for SubprocessProcessSpawnerFactory {
393 const KIND: AgentKind = AgentKind::Subprocess;
394 type Worker = crate::worker::process_spawner::ProcessWorker;
395}
396
397impl SpawnerFactory for SubprocessProcessSpawnerFactory {
398 fn build(
399 &self,
400 agent_def: &AgentDef,
401 _hint: Option<&Value>,
402 ) -> Result<Arc<dyn SpawnerAdapter>, CompileError> {
403 let agent_name = &agent_def.name;
404 let spec = &agent_def.spec;
405 let invalid = |msg: String| CompileError::InvalidSpec {
406 name: agent_name.to_string(),
407 msg,
408 };
409 let program = spec
410 .get("program")
411 .and_then(|v| v.as_str())
412 .ok_or_else(|| invalid("shell spec: 'program' (string) required".into()))?
413 .to_string();
414 let args: Vec<String> = spec
415 .get("args")
416 .and_then(|v| v.as_array())
417 .map(|a| {
418 a.iter()
419 .filter_map(|x| x.as_str().map(|s| s.to_string()))
420 .collect()
421 })
422 .unwrap_or_default();
423 let use_stdin = spec
424 .get("use_stdin")
425 .and_then(|v| v.as_bool())
426 .unwrap_or(true);
427 let stream_mode = match spec.get("stream_mode").and_then(|v| v.as_str()) {
428 Some("ndjson_lines") => Some(StreamMode::NdjsonLines),
429 Some("sse_events") => Some(StreamMode::SseEvents),
430 Some("length_prefixed") => Some(StreamMode::LengthPrefixed),
431 Some(other) => return Err(invalid(format!("unknown stream_mode: {other}"))),
432 None => None,
433 };
434
435 let mut sp = ProcessSpawner {
436 program,
437 args,
438 use_stdin,
439 stream_mode,
440 };
441 if let Some(mode) = sp.stream_mode.clone() {
442 sp = sp.stream_mode(mode);
443 }
444 Ok(Arc::new(sp))
445 }
446}
447
448/// Factory for `AgentKind::Lua`. At `build` time it looks the `fn_id`
449/// up in its internal registry and returns an [`InProcSpawner`] with the
450/// Lua-eval `WorkerFn` registered under `agent_name` — one `InProcSpawner`
451/// instance per agent.
452///
453/// Naming convention: `<WorkerIMPL><AdapterType>SpawnerFactory` (Lua
454/// worker on InProcess adapter). One half of the old
455/// `InProcSpawnerFactory`, split into Lua and RustFn variants.
456///
457/// Spec shape:
458/// ```jsonc
459/// { "fn_id": "patch-spawner" } // Lua source id pre-registered with the factory
460/// ```
461pub struct LuaInProcessSpawnerFactory {
462 registry: HashMap<String, WorkerFn>,
463 bridges: HashMap<String, HostBridge>,
464}
465
466/// Rust-side bridge function callable from Lua.
467///
468/// Inputs and outputs are both `serde_json::Value` (i.e. JSON). Lua
469/// invokes it as `host.<name>(arg_table)`. If the implementation needs
470/// to call async Rust, the caller does the sync-ification (typically
471/// `tokio::runtime::Handle::current().block_on(...)`).
472///
473/// Design intent: keep Lua scripts focused on flow control and `ctx`
474/// walking, while the heavy lifting (LLM calls, RFC 6902 apply,
475/// verifiers, and so on) stays on the Rust side. Going "pure Lua" —
476/// removing the bridge — is a carry.
477#[derive(Clone)]
478pub struct HostBridge(
479 Arc<dyn Fn(serde_json::Value) -> Result<serde_json::Value, String> + Send + Sync>,
480);
481
482impl HostBridge {
483 /// Wrap a Rust closure as a bridge callable from Lua.
484 pub fn new<F>(f: F) -> Self
485 where
486 F: Fn(serde_json::Value) -> Result<serde_json::Value, String> + Send + Sync + 'static,
487 {
488 Self(Arc::new(f))
489 }
490
491 /// Invoke the bridge directly — a thin trampoline over the inner
492 /// `Fn`. The production path goes through the Lua runtime, but this
493 /// stays `pub` so unit tests can exercise the primitive directly.
494 pub fn call(&self, arg: serde_json::Value) -> Result<serde_json::Value, String> {
495 (self.0)(arg)
496 }
497}
498
499/// Carrier type for Lua script sources. Paths are not required — a
500/// source string plus an identifying label is all it holds.
501///
502/// Callers bring in the source (via `include_str!` or similar) and
503/// register it with the factory through
504/// [`LuaInProcessSpawnerFactory::register_lua`].
505#[derive(Clone)]
506pub struct LuaScriptSource {
507 /// The Lua chunk source.
508 pub source: String,
509 /// Label used in error messages — typically the script's logical id
510 /// (for example `"patch_spawner.lua"`).
511 pub label: String,
512}
513
514impl LuaScriptSource {
515 /// Wrap a Lua chunk source and its error-message label.
516 pub fn new(source: impl Into<String>, label: impl Into<String>) -> Self {
517 Self {
518 source: source.into(),
519 label: label.into(),
520 }
521 }
522}
523
524impl LuaInProcessSpawnerFactory {
525 /// Start with no registered scripts and no host bridges.
526 pub fn new() -> Self {
527 Self {
528 registry: HashMap::new(),
529 bridges: HashMap::new(),
530 }
531 }
532
533 /// Register a host bridge. Subsequent `register_lua` calls snapshot
534 /// the current bridge set.
535 ///
536 /// Ordering rule: register bridges first, then call `register_lua`;
537 /// bridges added after `register_lua` will not be visible to that
538 /// script.
539 pub fn with_bridge(mut self, name: impl Into<String>, bridge: HostBridge) -> Self {
540 self.bridges.insert(name.into(), bridge);
541 self
542 }
543
544 /// Register a **Lua-eval Worker** under `fn_id`.
545 ///
546 /// Each dispatch spins up a fresh `mlua::Lua` VM, injects globals
547 /// (`_PROMPT` / `_AGENT` / `_TASK_ID` / `_ATTEMPT` / `_CTX` — the last
548 /// is `_PROMPT` parsed as JSON, or `nil` if that fails), evaluates
549 /// the script, and marshals the returned table into a `WorkerResult`.
550 ///
551 /// Marshalling rules for the return value:
552 /// - `{ value = ..., ok = bool }` → `WorkerResult.value` /
553 /// `WorkerResult.ok` verbatim.
554 /// - Anything else → `value = <returned value>`, `ok = true`.
555 ///
556 /// Execution runs on `tokio::task::spawn_blocking` because `mlua::Lua`
557 /// is `!Send` and needs to stay away from the tokio async context.
558 /// Host bridges (the Lua-to-Rust callback path) previously registered
559 /// with [`Self::with_bridge`] are snapshotted at call time and
560 /// injected into every dispatch inside `run_lua_worker`.
561 pub fn register_lua(mut self, fn_id: impl Into<String>, source: LuaScriptSource) -> Self {
562 let source = Arc::new(source);
563 let bridges = Arc::new(self.bridges.clone());
564 let wrapped: WorkerFn = Arc::new(move |inv| {
565 let source = source.clone();
566 let bridges = bridges.clone();
567 Box::pin(run_lua_worker(source, bridges, inv))
568 });
569 self.registry.insert(fn_id.into(), wrapped);
570 self
571 }
572}
573
574/// Body of a single Lua-eval invocation (called from `register_lua`).
575async fn run_lua_worker(
576 source: Arc<LuaScriptSource>,
577 bridges: Arc<HashMap<String, HostBridge>>,
578 inv: crate::worker::adapter::WorkerInvocation,
579) -> Result<crate::worker::adapter::WorkerResult, crate::worker::adapter::WorkerError> {
580 use crate::worker::adapter::WorkerError;
581 use mlua::LuaSerdeExt;
582
583 let label = source.label.clone();
584 let outcome =
585 tokio::task::spawn_blocking(move || -> Result<(serde_json::Value, bool), String> {
586 let lua = mlua::Lua::new();
587 let g = lua.globals();
588
589 // 1. Base globals.
590 g.set("_PROMPT", inv.prompt.clone())
591 .map_err(|e| format!("set _PROMPT: {e}"))?;
592 g.set("_AGENT", inv.agent.clone())
593 .map_err(|e| format!("set _AGENT: {e}"))?;
594 g.set("_TASK_ID", inv.task_id.to_string())
595 .map_err(|e| format!("set _TASK_ID: {e}"))?;
596 g.set("_ATTEMPT", inv.attempt as i64)
597 .map_err(|e| format!("set _ATTEMPT: {e}"))?;
598
599 // 2. _CTX = JSON parse(_PROMPT); nil on parse failure (co-exists with the plain-string prompt path).
600 if let Ok(json_val) = serde_json::from_str::<serde_json::Value>(&inv.prompt) {
601 let lua_val = lua
602 .to_value(&json_val)
603 .map_err(|e| format!("_CTX to_value: {e}"))?;
604 g.set("_CTX", lua_val)
605 .map_err(|e| format!("set _CTX: {e}"))?;
606 }
607
608 // 3. Inject the host bridge (Lua can call `host.<name>(arg)`).
609 if !bridges.is_empty() {
610 let host = lua
611 .create_table()
612 .map_err(|e| format!("create host table: {e}"))?;
613 for (name, bridge) in bridges.iter() {
614 let bridge = bridge.clone();
615 let bname = name.clone();
616 let f = lua
617 .create_function(move |lua, arg: mlua::Value| {
618 let json_arg: serde_json::Value = lua.from_value(arg).map_err(|e| {
619 mlua::Error::external(format!("bridge {bname} arg → json: {e}"))
620 })?;
621 let result_json =
622 bridge.call(json_arg).map_err(mlua::Error::external)?;
623 lua.to_value(&result_json).map_err(|e| {
624 mlua::Error::external(format!("bridge {bname} ret → lua: {e}"))
625 })
626 })
627 .map_err(|e| format!("create_function {name}: {e}"))?;
628 host.set(name.as_str(), f)
629 .map_err(|e| format!("host.{name} set: {e}"))?;
630 }
631 g.set("host", host).map_err(|e| format!("set host: {e}"))?;
632 }
633
634 // 4. eval
635 let result: mlua::Value = lua
636 .load(&source.source)
637 .set_name(&source.label)
638 .eval()
639 .map_err(|e| format!("lua eval [{}]: {e}", source.label))?;
640
641 // 5. Marshal: shape `{ value=..., ok=true }` or raw value.
642 let json_result: serde_json::Value = lua
643 .from_value(result)
644 .map_err(|e| format!("lua → json [{}]: {e}", source.label))?;
645
646 let (value, ok) = match &json_result {
647 serde_json::Value::Object(map)
648 if map.contains_key("value") || map.contains_key("ok") =>
649 {
650 let ok = map.get("ok").and_then(|v| v.as_bool()).unwrap_or(true);
651 let value = map.get("value").cloned().unwrap_or(json_result.clone());
652 (value, ok)
653 }
654 _ => (json_result, true),
655 };
656 Ok((value, ok))
657 })
658 .await
659 .map_err(|e| WorkerError::Failed(format!("spawn_blocking join [{label}]: {e}")))?
660 .map_err(WorkerError::Failed)?;
661
662 Ok(crate::worker::adapter::WorkerResult {
663 value: outcome.0,
664 ok: outcome.1,
665 })
666}
667
668impl Default for LuaInProcessSpawnerFactory {
669 fn default() -> Self {
670 Self::new()
671 }
672}
673
674impl SpawnerFactoryKind for LuaInProcessSpawnerFactory {
675 const KIND: AgentKind = AgentKind::Lua;
676 type Worker = LuaWorker;
677}
678
679impl SpawnerFactory for LuaInProcessSpawnerFactory {
680 fn build(
681 &self,
682 agent_def: &AgentDef,
683 _hint: Option<&Value>,
684 ) -> Result<Arc<dyn SpawnerAdapter>, CompileError> {
685 build_inproc_from_registry::<LuaWorker>(&self.registry, agent_def, "lua")
686 }
687}
688
689/// Factory for `AgentKind::RustFn`. At `build` time it looks the `fn_id`
690/// up in its internal registry and returns an [`InProcSpawner`] with the
691/// Rust closure `WorkerFn` registered under `agent_name`.
692///
693/// Naming convention: `<WorkerIMPL><AdapterType>SpawnerFactory` (RustFn
694/// worker on InProcess adapter). Sibling to
695/// [`LuaInProcessSpawnerFactory`] — the Lua-worker half of the same
696/// split.
697///
698/// Spec shape:
699/// ```jsonc
700/// { "fn_id": "echo" } // Rust closure id pre-registered with the factory
701/// ```
702pub struct RustFnInProcessSpawnerFactory {
703 registry: HashMap<String, WorkerFn>,
704}
705
706impl RustFnInProcessSpawnerFactory {
707 /// Start with no registered closures.
708 pub fn new() -> Self {
709 Self {
710 registry: HashMap::new(),
711 }
712 }
713
714 /// Register a Rust closure `WorkerFn` under `fn_id`, wrapping it so
715 /// it matches the `WorkerFn` signature (boxed, pinned future).
716 pub fn register_fn<F, Fut>(mut self, fn_id: impl Into<String>, f: F) -> Self
717 where
718 F: Fn(crate::worker::adapter::WorkerInvocation) -> Fut + Send + Sync + 'static,
719 Fut: std::future::Future<
720 Output = Result<
721 crate::worker::adapter::WorkerResult,
722 crate::worker::adapter::WorkerError,
723 >,
724 > + Send
725 + 'static,
726 {
727 let f = Arc::new(f);
728 let wrapped: WorkerFn = Arc::new(move |inv| {
729 let f = f.clone();
730 Box::pin(f(inv))
731 });
732 self.registry.insert(fn_id.into(), wrapped);
733 self
734 }
735}
736
737impl Default for RustFnInProcessSpawnerFactory {
738 fn default() -> Self {
739 Self::new()
740 }
741}
742
743impl SpawnerFactoryKind for RustFnInProcessSpawnerFactory {
744 const KIND: AgentKind = AgentKind::RustFn;
745 type Worker = RustFnWorker;
746}
747
748impl SpawnerFactory for RustFnInProcessSpawnerFactory {
749 fn build(
750 &self,
751 agent_def: &AgentDef,
752 _hint: Option<&Value>,
753 ) -> Result<Arc<dyn SpawnerAdapter>, CompileError> {
754 build_inproc_from_registry::<RustFnWorker>(&self.registry, agent_def, "rust_fn")
755 }
756}
757
758/// Shared build helper used by both the Lua and the RustFn factories —
759/// look `spec.fn_id` up in the registry and return an `InProcSpawner`.
760/// The generic type parameter `W` fixes the per-kind Worker concrete
761/// type at the type level (the build-site half of the trait's
762/// associated-type binding across the four-layer cascade).
763fn build_inproc_from_registry<W>(
764 registry: &HashMap<String, WorkerFn>,
765 agent_def: &AgentDef,
766 kind_label: &str,
767) -> Result<Arc<dyn SpawnerAdapter>, CompileError>
768where
769 W: crate::worker::Worker + From<crate::worker::WorkerJoinHandler> + Send + Sync + 'static,
770{
771 let agent_name = &agent_def.name;
772 let spec = &agent_def.spec;
773 let invalid = |msg: String| CompileError::InvalidSpec {
774 name: agent_name.to_string(),
775 msg,
776 };
777 let fn_id = spec
778 .get("fn_id")
779 .and_then(|v| v.as_str())
780 .ok_or_else(|| invalid(format!("{kind_label} spec: 'fn_id' (string) required")))?;
781 let f = registry
782 .get(fn_id)
783 .cloned()
784 .ok_or_else(|| invalid(format!("fn_id '{fn_id}' not registered in factory")))?;
785 let mut sp: InProcSpawner<W> = InProcSpawner::<W>::typed();
786 // Register under `agent_name` (the flow's `Step.ref`). Both
787 // `CompiledAgentTable` and the `InProcSpawner` look the function up
788 // by name, so the same key is needed at both layers.
789 sp.registry.insert(agent_name.to_string(), f);
790 Ok(Arc::new(sp))
791}
792
793/// Concrete Worker type for the Lua kind — a handle to a Lua-eval task
794/// inside an mlua VM. Embeds a `WorkerJoinHandler`. Reserved as the home
795/// for future Lua-specific extensions (an mlua VM cancellation
796/// mechanism, Lua-side error type retention, and so on).
797pub struct LuaWorker {
798 /// The join handle / cancellation token for the underlying task.
799 pub handler: crate::worker::WorkerJoinHandler,
800}
801
802impl From<crate::worker::WorkerJoinHandler> for LuaWorker {
803 fn from(handler: crate::worker::WorkerJoinHandler) -> Self {
804 Self { handler }
805 }
806}
807
808#[async_trait::async_trait]
809impl crate::worker::Worker for LuaWorker {
810 fn id(&self) -> &crate::types::WorkerId {
811 &self.handler.worker_id
812 }
813 fn cancel_token(&self) -> tokio_util::sync::CancellationToken {
814 self.handler.cancel.clone()
815 }
816 async fn join(self: Box<Self>) -> Result<(), crate::worker::adapter::WorkerError> {
817 self.handler.await_completion().await
818 }
819}
820
821/// Concrete Worker type for the RustFn kind — a handle to a task that
822/// directly calls a Rust closure. Embeds a `WorkerJoinHandler`. Being a
823/// pure function, there is minimal kind-specific extension surface here;
824/// the primary purpose is to nail down the type binding.
825pub struct RustFnWorker {
826 /// The join handle / cancellation token for the underlying task.
827 pub handler: crate::worker::WorkerJoinHandler,
828}
829
830impl From<crate::worker::WorkerJoinHandler> for RustFnWorker {
831 fn from(handler: crate::worker::WorkerJoinHandler) -> Self {
832 Self { handler }
833 }
834}
835
836#[async_trait::async_trait]
837impl crate::worker::Worker for RustFnWorker {
838 fn id(&self) -> &crate::types::WorkerId {
839 &self.handler.worker_id
840 }
841 fn cancel_token(&self) -> tokio_util::sync::CancellationToken {
842 self.handler.cancel.clone()
843 }
844 async fn join(self: Box<Self>) -> Result<(), crate::worker::adapter::WorkerError> {
845 self.handler.await_completion().await
846 }
847}
848
849/// Factory for `AgentKind::Operator`. Looks up the `Arc<dyn Operator>`
850/// pre-registered under `spec.operator_ref` and wraps it in an
851/// `OperatorSpawner`. Also resolves `AgentDef.profile.worker_binding` into
852/// a `WorkerBinding` at compile time and fails loud (`CompileError::InvalidSpec`)
853/// when the resolved operator's `Operator::requires_worker_binding` is `true`
854/// and no binding was declared.
855///
856/// Spec shape:
857/// ```jsonc
858/// { "operator_ref": "main_ai" } // Operator id pre-registered with the factory
859/// ```
860///
861/// # Split of responsibilities with `OperatorDelegateMiddleware`
862///
863/// The two axes exist for different reasons:
864///
865/// - **This factory (`OperatorSpawnerFactory` → `OperatorSpawner`) — the
866/// AgentSpec axis.** Bakes a separate Operator backend into each
867/// `AgentDef`. A `kind = Operator` `AgentDef` names its backend through
868/// `spec.operator_ref`; at `compile()` time the `Arc<dyn Operator>` is
869/// baked into `routes[agent_name]`. Because the `agent.md` loader
870/// (`agent_md_loader`) defaults `kind` to `Operator`, agents that flow
871/// in through agent-profiles land here.
872///
873/// - **`OperatorDelegateMiddleware` — the Blueprint-global (session)
874/// axis.** Delegates every agent to the same Operator backend. At
875/// session-attach time you call `engine.register_operator(id, op)`
876/// plus `attach_with_ids(.., operator_backend_id = Some(id))` to bind
877/// it session-wide, and declare
878/// `spawner_hints.layers = ["operator_delegate"]` to opt in. `ctx.agent`
879/// is ignored; the operator handles every spawn in that session (a
880/// MainAI-wide driver, a human-wide console, that sort of thing).
881///
882/// # Exclusivity (a double fire is structurally impossible)
883///
884/// When both are effective — the hint is declared, the session has an
885/// operator backend, **and** the Blueprint has a `kind = Operator`
886/// `AgentDef` — `OperatorDelegateMiddleware` sits at the outer end of
887/// the stack and **completely bypasses** `inner.spawn`. The
888/// `OperatorSpawner` is never reached, so under those conditions this
889/// factory's routes entry is inert. This is not a double fire — the
890/// session axis is overriding the agent axis. Consistent usage means
891/// picking one axis per use case.
892///
893/// Interior mutability is provided by an `Arc<RwLock>`. Even after the
894/// factory has been stored as `Arc<dyn SpawnerFactory>` in
895/// `SpawnerRegistry`, a caller holding an `Arc` clone can still add
896/// Operator backends dynamically via `register_operator(&self, id, op)`.
897/// Typical uses: registering a `WSOperatorSession` under the session id
898/// on WebSocket connect, binding agents that arrive via the `agent.md`
899/// loader to arbitrary backends, and so on. `build()` performs a
900/// `read()` lookup each time.
901pub struct OperatorSpawnerFactory {
902 operators: Arc<std::sync::RwLock<HashMap<String, Arc<dyn Operator>>>>,
903}
904
905impl OperatorSpawnerFactory {
906 /// Start with no registered Operator backends.
907 pub fn new() -> Self {
908 Self {
909 operators: Arc::new(std::sync::RwLock::new(HashMap::new())),
910 }
911 }
912
913 /// Register an Operator backend dynamically through `&self`.
914 /// Overwrites are allowed — later wins. Callers can still reach this
915 /// after the factory has been stored as `Arc<dyn SpawnerFactory>` in
916 /// `SpawnerRegistry`, as long as they hold an `Arc` clone; interior
917 /// mutability is provided by the inner `RwLock`.
918 pub fn register_operator(&self, id: impl Into<String>, op: Arc<dyn Operator>) -> &Self {
919 self.operators
920 .write()
921 .expect("OperatorSpawnerFactory.operators RwLock poisoned")
922 .insert(id.into(), op);
923 self
924 }
925
926 /// Dynamically unregister an id (used to clean up when a WebSocket
927 /// disconnects, for example). A missing id is a no-op.
928 pub fn unregister_operator(&self, id: &str) -> &Self {
929 self.operators
930 .write()
931 .expect("OperatorSpawnerFactory.operators RwLock poisoned")
932 .remove(id);
933 self
934 }
935}
936
937impl Default for OperatorSpawnerFactory {
938 fn default() -> Self {
939 Self::new()
940 }
941}
942
943impl SpawnerFactoryKind for OperatorSpawnerFactory {
944 const KIND: AgentKind = AgentKind::Operator;
945 type Worker = crate::operator::OperatorWorker;
946}
947
948impl SpawnerFactory for OperatorSpawnerFactory {
949 fn build(
950 &self,
951 agent_def: &AgentDef,
952 _hint: Option<&Value>,
953 ) -> Result<Arc<dyn SpawnerAdapter>, CompileError> {
954 let agent_name = &agent_def.name;
955 let spec = &agent_def.spec;
956 // Bake AgentDef.profile.system_prompt into the OperatorSpawner at compile time.
957 // `Some` → adopted first at spawn time; `None` → falls back to fetch_prompt (initial_directive).
958 // Fallback path. Sibling: AgentBlockInProcessSpawnerFactory
959 // (agent_block/runtime.rs) does the same compile-time bake by stuffing
960 // the profile into BlockConfig.context.
961 let system_prompt = agent_def.profile.as_ref().map(|p| p.system_prompt.clone());
962 let invalid = |msg: String| CompileError::InvalidSpec {
963 name: agent_name.to_string(),
964 msg,
965 };
966 let op_ref = spec
967 .get("operator_ref")
968 .and_then(|v| v.as_str())
969 .ok_or_else(|| invalid("operator spec: 'operator_ref' (string) required".into()))?;
970 let operators = self
971 .operators
972 .read()
973 .expect("OperatorSpawnerFactory.operators RwLock poisoned");
974 let op = operators.get(op_ref).cloned().ok_or_else(|| {
975 let mut names: Vec<String> = operators.keys().cloned().collect();
976 names.sort();
977 let names_list = if names.is_empty() {
978 "<none>".to_string()
979 } else {
980 names.join(", ")
981 };
982 invalid(format!(
983 "operator_ref '{op_ref}' not registered in factory. \
984 Registered sids: [{names_list}]. \
985 Hint: call mse_operator_join(roles=[...]) to mint the sid first."
986 ))
987 })?;
988 drop(operators);
989
990 // Resolve the Blueprint-baked worker binding from
991 // `AgentDef.profile.worker_binding` — the SoT for the
992 // declaration↔executor binding (see `WorkerBinding` doc). Fail
993 // loud at compile time when the operator backend requires one
994 // and the Blueprint didn't declare it; this is a compile-time
995 // gate, not a runtime guess.
996 let worker_binding = agent_def
997 .profile
998 .as_ref()
999 .and_then(|p| p.worker_binding.as_ref())
1000 .map(|subagent_type| WorkerBinding {
1001 subagent_type: subagent_type.clone(),
1002 tools: agent_def
1003 .profile
1004 .as_ref()
1005 .map(|p| p.tools.clone())
1006 .unwrap_or_default(),
1007 });
1008 if op.requires_worker_binding() && worker_binding.is_none() {
1009 return Err(invalid(
1010 "profile.worker_binding is required for this operator backend; \
1011 declare it in the agent .md frontmatter"
1012 .into(),
1013 ));
1014 }
1015 Ok(Arc::new(OperatorSpawner::new(
1016 op,
1017 system_prompt,
1018 worker_binding,
1019 )))
1020 }
1021}
1022
1023#[cfg(test)]
1024mod operator_spawner_factory_worker_binding_tests {
1025 use super::*;
1026 use crate::blueprint::AgentProfile;
1027 use crate::core::ctx::Ctx;
1028 use crate::types::CapToken;
1029 use crate::worker::adapter::{WorkerError, WorkerResult};
1030
1031 /// Minimal `Operator` stub whose `requires_worker_binding` is
1032 /// configurable — enough to exercise the compile-time fail-loud gate
1033 /// without standing up a real backend (e.g. `WSOperatorSession`,
1034 /// which lives in a downstream crate).
1035 struct StubOperator {
1036 requires_binding: bool,
1037 }
1038
1039 #[async_trait]
1040 impl Operator for StubOperator {
1041 async fn execute(
1042 &self,
1043 _ctx: &Ctx,
1044 _system: Option<String>,
1045 _prompt: String,
1046 _worker: Option<WorkerBinding>,
1047 _worker_token: CapToken,
1048 ) -> Result<WorkerResult, WorkerError> {
1049 Ok(WorkerResult {
1050 value: Value::Null,
1051 ok: true,
1052 })
1053 }
1054
1055 fn requires_worker_binding(&self) -> bool {
1056 self.requires_binding
1057 }
1058 }
1059
1060 fn agent_def_with(profile: Option<AgentProfile>) -> AgentDef {
1061 AgentDef {
1062 name: "test-agent".to_string(),
1063 kind: AgentKind::Operator,
1064 spec: serde_json::json!({ "operator_ref": "op1" }),
1065 profile,
1066 meta: None,
1067 }
1068 }
1069
1070 #[test]
1071 fn build_fails_loud_when_binding_required_but_absent() {
1072 let factory = OperatorSpawnerFactory::new();
1073 factory.register_operator(
1074 "op1",
1075 Arc::new(StubOperator {
1076 requires_binding: true,
1077 }) as Arc<dyn Operator>,
1078 );
1079 let def = agent_def_with(Some(AgentProfile::default()));
1080 match factory.build(&def, None) {
1081 Err(CompileError::InvalidSpec { name, msg }) => {
1082 assert_eq!(name, "test-agent");
1083 assert!(
1084 msg.contains("worker_binding is required"),
1085 "unexpected message: {msg}"
1086 );
1087 }
1088 Err(other) => panic!("expected InvalidSpec, got: {other:?}"),
1089 Ok(_) => panic!("expected compile-time failure, got Ok"),
1090 }
1091 }
1092
1093 #[test]
1094 fn build_succeeds_when_binding_required_and_present() {
1095 let factory = OperatorSpawnerFactory::new();
1096 factory.register_operator(
1097 "op1",
1098 Arc::new(StubOperator {
1099 requires_binding: true,
1100 }) as Arc<dyn Operator>,
1101 );
1102 let profile = AgentProfile {
1103 worker_binding: Some("mse-worker-coder".to_string()),
1104 tools: vec!["Read".to_string(), "Edit".to_string()],
1105 ..Default::default()
1106 };
1107 let def = agent_def_with(Some(profile));
1108 assert!(
1109 factory.build(&def, None).is_ok(),
1110 "expected Ok when worker_binding is declared"
1111 );
1112 }
1113
1114 #[test]
1115 fn build_succeeds_when_binding_not_required_and_absent() {
1116 let factory = OperatorSpawnerFactory::new();
1117 factory.register_operator(
1118 "op1",
1119 Arc::new(StubOperator {
1120 requires_binding: false,
1121 }) as Arc<dyn Operator>,
1122 );
1123 let def = agent_def_with(Some(AgentProfile::default()));
1124 assert!(
1125 factory.build(&def, None).is_ok(),
1126 "backends that don't require a binding must not be gated by its absence"
1127 );
1128 }
1129}