use crate::blueprint::{AgentDef, AgentKind, Blueprint, BlueprintMetadata};
use crate::core::ctx::Ctx;
use crate::core::engine::Engine;
use crate::operator::{Operator, OperatorSpawner};
use crate::types::{CapToken, TaskId};
use crate::worker::adapter::{InProcSpawner, SpawnError, SpawnerAdapter, WorkerFn};
use crate::worker::process_spawner::{ProcessSpawner, StreamMode};
use crate::worker::Worker;
use async_trait::async_trait;
use mlua_flow_ir::Node as FlowNode;
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum CompileError {
#[error("unknown agent kind in SpawnerRegistry: {0:?}")]
UnknownKind(AgentKind),
#[error("agent '{name}' spec invalid: {msg}")]
InvalidSpec {
name: String,
msg: String,
},
#[error("flow references agent '{0}' but no AgentDef matches")]
UnresolvedRef(String),
#[error("duplicate AgentDef name: {0}")]
DuplicateAgent(String),
#[error("agent '{agent}' operator_ref '{op_ref}' does not match any OperatorDef.name in Blueprint.operators (defined: {defined:?})")]
UnresolvedOperatorRef {
agent: String,
op_ref: String,
defined: Vec<String>,
},
}
pub trait SpawnerFactory: Send + Sync {
fn build(
&self,
agent_def: &AgentDef,
hint: Option<&Value>,
) -> Result<Arc<dyn SpawnerAdapter>, CompileError>;
}
pub trait SpawnerFactoryKind: SpawnerFactory {
const KIND: AgentKind;
type Worker: crate::worker::Worker;
}
#[derive(Clone)]
pub struct SpawnerRegistry {
factories: HashMap<AgentKind, Arc<dyn SpawnerFactory>>,
}
impl SpawnerRegistry {
pub fn new() -> Self {
Self {
factories: HashMap::new(),
}
}
pub fn register<F: SpawnerFactoryKind + 'static>(&mut self, factory: Arc<F>) -> &mut Self {
let f: Arc<dyn SpawnerFactory> = factory;
self.factories.insert(F::KIND, f);
self
}
}
impl Default for SpawnerRegistry {
fn default() -> Self {
Self::new()
}
}
pub struct Compiler {
registry: SpawnerRegistry,
default_spawner: Option<Arc<dyn SpawnerAdapter>>,
}
pub struct CompiledBlueprint {
pub router: Arc<CompiledAgentTable>,
pub flow: FlowNode,
pub metadata: BlueprintMetadata,
}
impl Compiler {
pub fn new(registry: SpawnerRegistry) -> Self {
Self {
registry,
default_spawner: None,
}
}
pub fn with_default(mut self, sp: Arc<dyn SpawnerAdapter>) -> Self {
self.default_spawner = Some(sp);
self
}
pub fn compile(&self, bp: &Blueprint) -> Result<CompiledBlueprint, CompileError> {
let mut routes: HashMap<String, Arc<dyn SpawnerAdapter>> = HashMap::new();
let mut seen: HashMap<String, ()> = HashMap::new();
let defined: Vec<String> = bp.operators.iter().map(|o| o.name.clone()).collect();
for ad in &bp.agents {
if !matches!(ad.kind, AgentKind::Operator) {
continue;
}
let op_ref = ad.spec.get("operator_ref").and_then(|v| v.as_str());
if let Some(op_ref) = op_ref {
if !defined.iter().any(|n| n == op_ref) {
return Err(CompileError::UnresolvedOperatorRef {
agent: ad.name.clone(),
op_ref: op_ref.to_string(),
defined: defined.clone(),
});
}
}
}
for ad in &bp.agents {
if seen.contains_key(&ad.name) {
return Err(CompileError::DuplicateAgent(ad.name.clone()));
}
seen.insert(ad.name.clone(), ());
let factory = match self.registry.factories.get(&ad.kind) {
Some(f) => f.clone(),
None => {
if bp.strategy.strict_kind {
return Err(CompileError::UnknownKind(ad.kind.clone()));
} else {
tracing::warn!(
agent = %ad.name,
kind = ?ad.kind,
"no spawner factory registered for agent kind; \
dropping agent from routing table (strict_kind=false)"
);
continue;
}
}
};
let hint = bp.hints.per_agent.get(&ad.name);
let spawner = factory.build(ad, hint)?;
routes.insert(ad.name.clone(), spawner);
}
if bp.strategy.strict_refs {
verify_refs(&bp.flow, &routes, self.default_spawner.is_some())?;
}
let router = Arc::new(CompiledAgentTable {
routes,
default: self.default_spawner.clone(),
});
Ok(CompiledBlueprint {
router,
flow: bp.flow.clone(),
metadata: bp.metadata.clone(),
})
}
}
fn verify_refs(
node: &FlowNode,
routes: &HashMap<String, Arc<dyn SpawnerAdapter>>,
has_default: bool,
) -> Result<(), CompileError> {
let mut refs: Vec<String> = Vec::new();
collect_refs(node, &mut refs);
for r in refs {
if !routes.contains_key(&r) && !has_default {
return Err(CompileError::UnresolvedRef(r));
}
}
Ok(())
}
fn collect_refs(node: &FlowNode, out: &mut Vec<String>) {
match node {
FlowNode::Step { ref_, .. } => out.push(ref_.clone()),
FlowNode::Seq { children } => {
for c in children {
collect_refs(c, out);
}
}
FlowNode::Branch { then_, else_, .. } => {
collect_refs(then_, out);
collect_refs(else_, out);
}
FlowNode::Fanout { body, .. } => collect_refs(body, out),
FlowNode::Loop { body, .. } => collect_refs(body, out),
FlowNode::Try { body, catch, .. } => {
collect_refs(body, out);
collect_refs(catch, out);
}
FlowNode::Assign { .. } => {} }
}
pub struct CompiledAgentTable {
pub(crate) routes: HashMap<String, Arc<dyn SpawnerAdapter>>,
pub(crate) default: Option<Arc<dyn SpawnerAdapter>>,
}
impl CompiledAgentTable {
pub fn has_route(&self, agent: &str) -> bool {
self.routes.contains_key(agent)
}
pub fn routed_agents(&self) -> Vec<String> {
self.routes.keys().cloned().collect()
}
}
#[async_trait]
impl SpawnerAdapter for CompiledAgentTable {
async fn spawn(
&self,
engine: &Engine,
ctx: &Ctx,
task_id: TaskId,
attempt: u32,
token: CapToken,
) -> Result<Box<dyn Worker>, SpawnError> {
let sp = self
.routes
.get(&ctx.agent)
.cloned()
.or_else(|| self.default.clone())
.ok_or_else(|| SpawnError::NotRegistered(ctx.agent.clone()))?;
sp.spawn(engine, ctx, task_id, attempt, token).await
}
}
pub struct SubprocessProcessSpawnerFactory;
impl SpawnerFactoryKind for SubprocessProcessSpawnerFactory {
const KIND: AgentKind = AgentKind::Subprocess;
type Worker = crate::worker::process_spawner::ProcessWorker;
}
impl SpawnerFactory for SubprocessProcessSpawnerFactory {
fn build(
&self,
agent_def: &AgentDef,
_hint: Option<&Value>,
) -> Result<Arc<dyn SpawnerAdapter>, CompileError> {
let agent_name = &agent_def.name;
let spec = &agent_def.spec;
let invalid = |msg: String| CompileError::InvalidSpec {
name: agent_name.to_string(),
msg,
};
let program = spec
.get("program")
.and_then(|v| v.as_str())
.ok_or_else(|| invalid("shell spec: 'program' (string) required".into()))?
.to_string();
let args: Vec<String> = spec
.get("args")
.and_then(|v| v.as_array())
.map(|a| {
a.iter()
.filter_map(|x| x.as_str().map(|s| s.to_string()))
.collect()
})
.unwrap_or_default();
let use_stdin = spec
.get("use_stdin")
.and_then(|v| v.as_bool())
.unwrap_or(true);
let stream_mode = match spec.get("stream_mode").and_then(|v| v.as_str()) {
Some("ndjson_lines") => Some(StreamMode::NdjsonLines),
Some("sse_events") => Some(StreamMode::SseEvents),
Some("length_prefixed") => Some(StreamMode::LengthPrefixed),
Some(other) => return Err(invalid(format!("unknown stream_mode: {other}"))),
None => None,
};
let mut sp = ProcessSpawner {
program,
args,
use_stdin,
stream_mode,
};
if let Some(mode) = sp.stream_mode.clone() {
sp = sp.stream_mode(mode);
}
Ok(Arc::new(sp))
}
}
pub struct LuaInProcessSpawnerFactory {
registry: HashMap<String, WorkerFn>,
bridges: HashMap<String, HostBridge>,
}
#[derive(Clone)]
pub struct HostBridge(
Arc<dyn Fn(serde_json::Value) -> Result<serde_json::Value, String> + Send + Sync>,
);
impl HostBridge {
pub fn new<F>(f: F) -> Self
where
F: Fn(serde_json::Value) -> Result<serde_json::Value, String> + Send + Sync + 'static,
{
Self(Arc::new(f))
}
pub fn call(&self, arg: serde_json::Value) -> Result<serde_json::Value, String> {
(self.0)(arg)
}
}
#[derive(Clone)]
pub struct LuaScriptSource {
pub source: String,
pub label: String,
}
impl LuaScriptSource {
pub fn new(source: impl Into<String>, label: impl Into<String>) -> Self {
Self {
source: source.into(),
label: label.into(),
}
}
}
impl LuaInProcessSpawnerFactory {
pub fn new() -> Self {
Self {
registry: HashMap::new(),
bridges: HashMap::new(),
}
}
pub fn with_bridge(mut self, name: impl Into<String>, bridge: HostBridge) -> Self {
self.bridges.insert(name.into(), bridge);
self
}
pub fn register_lua(mut self, fn_id: impl Into<String>, source: LuaScriptSource) -> Self {
let source = Arc::new(source);
let bridges = Arc::new(self.bridges.clone());
let wrapped: WorkerFn = Arc::new(move |inv| {
let source = source.clone();
let bridges = bridges.clone();
Box::pin(run_lua_worker(source, bridges, inv))
});
self.registry.insert(fn_id.into(), wrapped);
self
}
}
async fn run_lua_worker(
source: Arc<LuaScriptSource>,
bridges: Arc<HashMap<String, HostBridge>>,
inv: crate::worker::adapter::WorkerInvocation,
) -> Result<crate::worker::adapter::WorkerResult, crate::worker::adapter::WorkerError> {
use crate::worker::adapter::WorkerError;
use mlua::LuaSerdeExt;
let label = source.label.clone();
let outcome =
tokio::task::spawn_blocking(move || -> Result<(serde_json::Value, bool), String> {
let lua = mlua::Lua::new();
let g = lua.globals();
g.set("_PROMPT", inv.prompt.clone())
.map_err(|e| format!("set _PROMPT: {e}"))?;
g.set("_AGENT", inv.agent.clone())
.map_err(|e| format!("set _AGENT: {e}"))?;
g.set("_TASK_ID", inv.task_id.to_string())
.map_err(|e| format!("set _TASK_ID: {e}"))?;
g.set("_ATTEMPT", inv.attempt as i64)
.map_err(|e| format!("set _ATTEMPT: {e}"))?;
if let Ok(json_val) = serde_json::from_str::<serde_json::Value>(&inv.prompt) {
let lua_val = lua
.to_value(&json_val)
.map_err(|e| format!("_CTX to_value: {e}"))?;
g.set("_CTX", lua_val)
.map_err(|e| format!("set _CTX: {e}"))?;
}
if !bridges.is_empty() {
let host = lua
.create_table()
.map_err(|e| format!("create host table: {e}"))?;
for (name, bridge) in bridges.iter() {
let bridge = bridge.clone();
let bname = name.clone();
let f = lua
.create_function(move |lua, arg: mlua::Value| {
let json_arg: serde_json::Value = lua.from_value(arg).map_err(|e| {
mlua::Error::external(format!("bridge {bname} arg → json: {e}"))
})?;
let result_json =
bridge.call(json_arg).map_err(mlua::Error::external)?;
lua.to_value(&result_json).map_err(|e| {
mlua::Error::external(format!("bridge {bname} ret → lua: {e}"))
})
})
.map_err(|e| format!("create_function {name}: {e}"))?;
host.set(name.as_str(), f)
.map_err(|e| format!("host.{name} set: {e}"))?;
}
g.set("host", host).map_err(|e| format!("set host: {e}"))?;
}
let result: mlua::Value = lua
.load(&source.source)
.set_name(&source.label)
.eval()
.map_err(|e| format!("lua eval [{}]: {e}", source.label))?;
let json_result: serde_json::Value = lua
.from_value(result)
.map_err(|e| format!("lua → json [{}]: {e}", source.label))?;
let (value, ok) = match &json_result {
serde_json::Value::Object(map)
if map.contains_key("value") || map.contains_key("ok") =>
{
let ok = map.get("ok").and_then(|v| v.as_bool()).unwrap_or(true);
let value = map.get("value").cloned().unwrap_or(json_result.clone());
(value, ok)
}
_ => (json_result, true),
};
Ok((value, ok))
})
.await
.map_err(|e| WorkerError::Failed(format!("spawn_blocking join [{label}]: {e}")))?
.map_err(WorkerError::Failed)?;
Ok(crate::worker::adapter::WorkerResult {
value: outcome.0,
ok: outcome.1,
})
}
impl Default for LuaInProcessSpawnerFactory {
fn default() -> Self {
Self::new()
}
}
impl SpawnerFactoryKind for LuaInProcessSpawnerFactory {
const KIND: AgentKind = AgentKind::Lua;
type Worker = LuaWorker;
}
impl SpawnerFactory for LuaInProcessSpawnerFactory {
fn build(
&self,
agent_def: &AgentDef,
_hint: Option<&Value>,
) -> Result<Arc<dyn SpawnerAdapter>, CompileError> {
build_inproc_from_registry::<LuaWorker>(&self.registry, agent_def, "lua")
}
}
pub struct RustFnInProcessSpawnerFactory {
registry: HashMap<String, WorkerFn>,
}
impl RustFnInProcessSpawnerFactory {
pub fn new() -> Self {
Self {
registry: HashMap::new(),
}
}
pub fn register_fn<F, Fut>(mut self, fn_id: impl Into<String>, f: F) -> Self
where
F: Fn(crate::worker::adapter::WorkerInvocation) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<
Output = Result<
crate::worker::adapter::WorkerResult,
crate::worker::adapter::WorkerError,
>,
> + Send
+ 'static,
{
let f = Arc::new(f);
let wrapped: WorkerFn = Arc::new(move |inv| {
let f = f.clone();
Box::pin(f(inv))
});
self.registry.insert(fn_id.into(), wrapped);
self
}
}
impl Default for RustFnInProcessSpawnerFactory {
fn default() -> Self {
Self::new()
}
}
impl SpawnerFactoryKind for RustFnInProcessSpawnerFactory {
const KIND: AgentKind = AgentKind::RustFn;
type Worker = RustFnWorker;
}
impl SpawnerFactory for RustFnInProcessSpawnerFactory {
fn build(
&self,
agent_def: &AgentDef,
_hint: Option<&Value>,
) -> Result<Arc<dyn SpawnerAdapter>, CompileError> {
build_inproc_from_registry::<RustFnWorker>(&self.registry, agent_def, "rust_fn")
}
}
fn build_inproc_from_registry<W>(
registry: &HashMap<String, WorkerFn>,
agent_def: &AgentDef,
kind_label: &str,
) -> Result<Arc<dyn SpawnerAdapter>, CompileError>
where
W: crate::worker::Worker + From<crate::worker::WorkerJoinHandler> + Send + Sync + 'static,
{
let agent_name = &agent_def.name;
let spec = &agent_def.spec;
let invalid = |msg: String| CompileError::InvalidSpec {
name: agent_name.to_string(),
msg,
};
let fn_id = spec
.get("fn_id")
.and_then(|v| v.as_str())
.ok_or_else(|| invalid(format!("{kind_label} spec: 'fn_id' (string) required")))?;
let f = registry
.get(fn_id)
.cloned()
.ok_or_else(|| invalid(format!("fn_id '{fn_id}' not registered in factory")))?;
let mut sp: InProcSpawner<W> = InProcSpawner::<W>::typed();
sp.registry.insert(agent_name.to_string(), f);
Ok(Arc::new(sp))
}
pub struct LuaWorker {
pub handler: crate::worker::WorkerJoinHandler,
}
impl From<crate::worker::WorkerJoinHandler> for LuaWorker {
fn from(handler: crate::worker::WorkerJoinHandler) -> Self {
Self { handler }
}
}
#[async_trait::async_trait]
impl crate::worker::Worker for LuaWorker {
fn id(&self) -> &crate::types::WorkerId {
&self.handler.worker_id
}
fn cancel_token(&self) -> tokio_util::sync::CancellationToken {
self.handler.cancel.clone()
}
async fn join(self: Box<Self>) -> Result<(), crate::worker::adapter::WorkerError> {
self.handler.await_completion().await
}
}
pub struct RustFnWorker {
pub handler: crate::worker::WorkerJoinHandler,
}
impl From<crate::worker::WorkerJoinHandler> for RustFnWorker {
fn from(handler: crate::worker::WorkerJoinHandler) -> Self {
Self { handler }
}
}
#[async_trait::async_trait]
impl crate::worker::Worker for RustFnWorker {
fn id(&self) -> &crate::types::WorkerId {
&self.handler.worker_id
}
fn cancel_token(&self) -> tokio_util::sync::CancellationToken {
self.handler.cancel.clone()
}
async fn join(self: Box<Self>) -> Result<(), crate::worker::adapter::WorkerError> {
self.handler.await_completion().await
}
}
pub struct OperatorSpawnerFactory {
operators: Arc<std::sync::RwLock<HashMap<String, Arc<dyn Operator>>>>,
}
impl OperatorSpawnerFactory {
pub fn new() -> Self {
Self {
operators: Arc::new(std::sync::RwLock::new(HashMap::new())),
}
}
pub fn register_operator(&self, id: impl Into<String>, op: Arc<dyn Operator>) -> &Self {
self.operators
.write()
.expect("OperatorSpawnerFactory.operators RwLock poisoned")
.insert(id.into(), op);
self
}
pub fn unregister_operator(&self, id: &str) -> &Self {
self.operators
.write()
.expect("OperatorSpawnerFactory.operators RwLock poisoned")
.remove(id);
self
}
}
impl Default for OperatorSpawnerFactory {
fn default() -> Self {
Self::new()
}
}
impl SpawnerFactoryKind for OperatorSpawnerFactory {
const KIND: AgentKind = AgentKind::Operator;
type Worker = crate::operator::OperatorWorker;
}
impl SpawnerFactory for OperatorSpawnerFactory {
fn build(
&self,
agent_def: &AgentDef,
_hint: Option<&Value>,
) -> Result<Arc<dyn SpawnerAdapter>, CompileError> {
let agent_name = &agent_def.name;
let spec = &agent_def.spec;
let system_prompt = agent_def.profile.as_ref().map(|p| p.system_prompt.clone());
let invalid = |msg: String| CompileError::InvalidSpec {
name: agent_name.to_string(),
msg,
};
let op_ref = spec
.get("operator_ref")
.and_then(|v| v.as_str())
.ok_or_else(|| invalid("operator spec: 'operator_ref' (string) required".into()))?;
let operators = self
.operators
.read()
.expect("OperatorSpawnerFactory.operators RwLock poisoned");
let op = operators.get(op_ref).cloned().ok_or_else(|| {
let mut names: Vec<String> = operators.keys().cloned().collect();
names.sort();
let names_list = if names.is_empty() {
"<none>".to_string()
} else {
names.join(", ")
};
invalid(format!(
"operator_ref '{op_ref}' not registered in factory. \
Registered sids: [{names_list}]. \
Hint: call mse_operator_join(roles=[...]) to mint the sid first."
))
})?;
drop(operators);
Ok(Arc::new(OperatorSpawner::new(op, system_prompt)))
}
}