use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::{mpsc, oneshot, RwLock};
use mlua_isle::{AsyncIsle, AsyncIsleDriver};
use tracing::{info, info_span, warn};
use crate::bridge;
use crate::bus::{Event, EventBus, Handler};
use agent_block_mcp::McpManager;
use agent_block_types::error::{BlockError, BlockResult};
use tokio_util::sync::CancellationToken;
const EMBEDDED_BLOCKS: &[(&str, &str)] = &[
("agent", include_str!("../blocks/agent/init.lua")),
("session", include_str!("../blocks/session/init.lua")),
(
"compile_loop",
include_str!("../blocks/compile_loop/init.lua"),
),
];
const DEFAULT_AGENT_INVOKER: &str = r#"
local agent = require("agent")
local r = agent.run({
prompt = _PROMPT,
system = _CONTEXT,
})
bus.emit("_", r)
"#;
#[derive(Debug, Clone)]
pub enum ScriptSource {
Path(PathBuf),
Inline {
source: String,
name: String,
},
DefaultAgent,
}
#[derive(Debug, Clone)]
pub enum PromptSource {
Inline(String),
File(PathBuf),
}
#[derive(Debug, Clone)]
pub enum SecretKeySource {
Inline(String),
Env(String),
}
#[async_trait::async_trait]
pub trait ToolHandler: Send + Sync + 'static {
async fn call(&self, input: serde_json::Value) -> Result<serde_json::Value, BlockError>;
}
#[derive(Clone)]
pub struct HostToolSpec {
pub name: String,
pub description: String,
pub input_schema: serde_json::Value,
pub group: Option<String>,
pub handler: Arc<dyn ToolHandler>,
}
impl std::fmt::Debug for HostToolSpec {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HostToolSpec")
.field("name", &self.name)
.field("description", &self.description)
.field("input_schema", &self.input_schema)
.field("group", &self.group)
.field("handler", &"<dyn ToolHandler>")
.finish()
}
}
#[derive(Debug, Clone)]
pub struct ToolMeta {
pub name: String,
pub description: String,
pub group: Option<String>,
pub source: ToolSource,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ToolSource {
HostRust,
EmbeddedBlock,
}
pub fn inspect_tools(config: &BlockConfig) -> Vec<ToolMeta> {
let mut out = Vec::new();
for t in &config.host_tools {
out.push(ToolMeta {
name: t.name.clone(),
description: t.description.clone(),
group: t.group.clone(),
source: ToolSource::HostRust,
});
}
for (name, _src) in EMBEDDED_BLOCKS {
out.push(ToolMeta {
name: (*name).to_string(),
description: format!("Embedded StdPkg block (require(\"{name}\"))"),
group: None,
source: ToolSource::EmbeddedBlock,
});
}
out
}
fn build_blocks_path(project_root: &Path) -> String {
let mut out = String::new();
let project_blocks = project_root.join("blocks");
if project_blocks.is_dir() {
let pb = project_blocks.to_string_lossy();
out.push_str(&format!("{pb}/?.lua;{pb}/?/init.lua;"));
}
match std::env::current_exe() {
Ok(exe) => {
if let Some(exe_dir) = exe.parent() {
let exe_blocks = exe_dir.join("blocks");
if exe_blocks.is_dir() {
let eb = exe_blocks.to_string_lossy();
out.push_str(&format!("{eb}/?.lua;{eb}/?/init.lua;"));
}
}
}
Err(e) => {
warn!(error = %e, "current_exe() failed; skipping exe_dir/blocks/ from package.path");
}
}
out
}
pub struct BlockConfig {
pub script: ScriptSource,
pub project_root: PathBuf,
pub relay_url: Option<String>,
pub secret_key: Option<SecretKeySource>,
pub mcp_rpc_timeout: Duration,
pub prompt: Option<PromptSource>,
pub context: Option<PromptSource>,
pub host_handlers: HashMap<String, Arc<dyn Handler>>,
pub host_handler: Option<Arc<dyn Handler>>,
pub host_tools: Vec<HostToolSpec>,
pub http_client: Option<reqwest::Client>,
pub sql_path: Option<PathBuf>,
pub kv_path: Option<PathBuf>,
pub ts_path: Option<PathBuf>,
pub extra_globals: HashMap<String, serde_json::Value>,
pub auto_serve_bus: bool,
pub shutdown_token: Option<CancellationToken>,
}
#[derive(Clone)]
pub struct HostContext {
pub project_root: PathBuf,
pub mesh_agent: Option<Arc<agent_mesh_sdk::MeshAgent>>,
pub mcp_manager: Arc<RwLock<McpManager>>,
pub http_client: reqwest::Client,
pub sql_conn: Arc<Mutex<rusqlite::Connection>>,
pub sql_interrupt: Arc<rusqlite::InterruptHandle>,
pub kv_conn: Arc<Mutex<rusqlite::Connection>>,
pub kv_interrupt: Arc<rusqlite::InterruptHandle>,
pub ts_conn: Arc<Mutex<rusqlite::Connection>>,
#[allow(dead_code)]
pub ts_interrupt: Arc<rusqlite::InterruptHandle>,
#[allow(dead_code)]
pub isle: Arc<AsyncIsle>,
pub handler_isle: Arc<AsyncIsle>,
#[allow(dead_code)]
pub bus_tx: mpsc::Sender<Event>,
pub event_bus: Arc<Mutex<Option<EventBus>>>,
}
fn open_sqlite(
path: &Path,
label: &'static str,
) -> BlockResult<(
Arc<Mutex<rusqlite::Connection>>,
Arc<rusqlite::InterruptHandle>,
)> {
let is_memory = crate::bridge::config::is_memory_sql(path);
if !is_memory {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| BlockError::Runtime(format!("{label} dir create: {e}")))?;
}
}
let conn = rusqlite::Connection::open(path)
.map_err(|e| BlockError::Runtime(format!("sqlite open {}: {e}", path.display())))?;
if !is_memory {
let journal = crate::bridge::config::sql_journal_mode();
conn.pragma_update(None, "journal_mode", &journal)
.map_err(|e| BlockError::Runtime(format!("journal_mode={journal}: {e}")))?;
}
let busy_ms = crate::bridge::config::sql_busy_timeout().as_millis() as i64;
conn.pragma_update(None, "busy_timeout", busy_ms)
.map_err(|e| BlockError::Runtime(format!("busy_timeout pragma: {e}")))?;
info!(label, path = %path.display(), busy_ms, "sqlite initialized");
let interrupt = Arc::new(conn.get_interrupt_handle());
let conn = Arc::new(Mutex::new(conn));
Ok((conn, interrupt))
}
fn build_isle_init(
script_name: String,
script_dir: String,
blocks_paths: String,
prompt: Option<String>,
context: Option<String>,
extra_globals: HashMap<String, serde_json::Value>,
) -> impl FnOnce(&mlua::Lua) -> mlua::Result<()> + Send + 'static {
move |lua| {
lua.globals().set("_SCRIPT_NAME", script_name.as_str())?;
if let Some(ref p) = prompt {
lua.globals().set("_PROMPT", p.as_str())?;
}
if let Some(ref c) = context {
lua.globals().set("_CONTEXT", c.as_str())?;
}
mlua_batteries::register_all(lua, "std")?;
for (name, value) in &extra_globals {
let lua_value = crate::bridge::json_to_lua(lua, value.clone())
.map_err(|e| mlua::Error::external(format!("extra_globals[{name}]: {e}")))?;
lua.globals().set(name.as_str(), lua_value)?;
}
let package: mlua::Table = lua.globals().get("package")?;
let current_path: String = package.get("path")?;
let new_path =
format!("{script_dir}/?.lua;{script_dir}/?/init.lua;{blocks_paths}{current_path}");
package.set("path", new_path)?;
let embedded: HashMap<&'static str, &'static str> =
EMBEDDED_BLOCKS.iter().copied().collect();
let searchers: mlua::Table = package.get("searchers")?;
let loader =
lua.create_function(move |lua, name: String| match embedded.get(name.as_str()) {
Some(source) => {
let chunk = lua
.load(*source)
.set_name(format!("@embedded:blocks/{name}/init.lua"));
let func = chunk.into_function()?;
Ok(mlua::Value::Function(func))
}
None => {
let msg = lua.create_string(format!("\n\tno embedded block '{name}'"))?;
Ok(mlua::Value::String(msg))
}
})?;
let next_idx = searchers.raw_len() + 1;
searchers.raw_set(next_idx, loader)?;
Ok(())
}
}
async fn spawn_handler_isle(
script_name: String,
script_dir: String,
blocks_paths: String,
prompt: Option<String>,
context: Option<String>,
extra_globals: HashMap<String, serde_json::Value>,
) -> BlockResult<(Arc<AsyncIsle>, AsyncIsleDriver)> {
let init = build_isle_init(
script_name,
script_dir,
blocks_paths,
prompt,
context,
extra_globals,
);
let (isle, driver) = AsyncIsle::builder()
.thread_name("agent-block-handler-isle")
.spawn(init)
.await
.map_err(|e| BlockError::Runtime(format!("handler isle spawn failed: {e}")))?;
info!(
thread_name = "agent-block-handler-isle",
"handler Isle spawned"
);
Ok((Arc::new(isle), driver))
}
fn hex_decode_32(s: &str) -> Result<[u8; 32], String> {
let s = s.trim();
if s.len() != 64 {
return Err(format!("expected 64 hex chars, got {}", s.len()));
}
let mut out = [0u8; 32];
for (i, byte) in out.iter_mut().enumerate() {
let hi = u8::from_str_radix(&s[2 * i..2 * i + 1], 16)
.map_err(|e| format!("invalid hex at position {}: {e}", 2 * i))?;
let lo = u8::from_str_radix(&s[2 * i + 1..2 * i + 2], 16)
.map_err(|e| format!("invalid hex at position {}: {e}", 2 * i + 1))?;
*byte = (hi << 4) | lo;
}
Ok(out)
}
pub async fn run(config: BlockConfig) -> BlockResult<()> {
let (script_source, script_name, script_dir_pathbuf) = match &config.script {
ScriptSource::Path(p) => {
let source = std::fs::read_to_string(p)
.map_err(|e| BlockError::Script(format!("{}: {e}", p.display())))?;
let name = p
.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_else(|| "unknown".to_string());
let dir = p
.parent()
.map(|d| d.to_path_buf())
.unwrap_or_else(|| PathBuf::from("."));
(source, name, dir)
}
ScriptSource::Inline { source, name } => {
(source.clone(), name.clone(), config.project_root.clone())
}
ScriptSource::DefaultAgent => (
DEFAULT_AGENT_INVOKER.to_string(),
"default_agent_invoker.lua".to_string(),
config.project_root.clone(),
),
};
let prompt_resolved: Option<String> = match &config.prompt {
Some(PromptSource::Inline(s)) => Some(s.clone()),
Some(PromptSource::File(p)) => Some(
std::fs::read_to_string(p)
.map_err(|e| BlockError::Script(format!("prompt file {}: {e}", p.display())))?,
),
None => None,
};
let context_resolved: Option<String> = match &config.context {
Some(PromptSource::Inline(s)) => Some(s.clone()),
Some(PromptSource::File(p)) => Some(
std::fs::read_to_string(p)
.map_err(|e| BlockError::Script(format!("context file {}: {e}", p.display())))?,
),
None => None,
};
let secret_key_resolved: Option<String> = match &config.secret_key {
Some(SecretKeySource::Inline(s)) => Some(s.clone()),
Some(SecretKeySource::Env(var)) => std::env::var(var).ok(),
None => None,
};
let _root_span = info_span!("agent_block", script = %script_name);
let env_path = config.project_root.join(".env");
match dotenvy::from_path(&env_path) {
Ok(()) => info!(path = %env_path.display(), ".env loaded"),
Err(dotenvy::Error::Io(_)) => {} Err(e) => tracing::warn!(path = %env_path.display(), error = %e, ".env parse error"),
}
let _init_span = info_span!("init");
let bus_capacity = crate::bridge::config::bus_capacity();
let (bus_tx, bus_rx) = mpsc::channel::<Event>(bus_capacity);
let event_bus = Arc::new(Mutex::new(Some(EventBus::new(bus_rx))));
let has_kind_handlers = !config.host_handlers.is_empty();
let has_any_handler = config.host_handler.is_some();
if has_kind_handlers || has_any_handler {
let mut guard = event_bus
.lock()
.map_err(|_| BlockError::Bus("event_bus mutex poisoned".into()))?;
let bus = guard
.as_mut()
.ok_or_else(|| BlockError::Bus("event_bus already taken".into()))?;
for (kind, handler) in &config.host_handlers {
bus.on(kind.clone(), Arc::clone(handler))
.map_err(|e| BlockError::Bus(format!("host_handlers on({kind}): {e}")))?;
}
if let Some(any_handler) = &config.host_handler {
bus.on_any(Arc::clone(any_handler))
.map_err(|e| BlockError::Bus(format!("host_handler on_any: {e}")))?;
}
info!(
kind_handlers = config.host_handlers.len(),
any_handler = has_any_handler,
"host handlers pre-installed"
);
}
let auto_serve = config.auto_serve_bus && (has_kind_handlers || has_any_handler);
let auto_serve_state: Option<(tokio::task::JoinHandle<()>, CancellationToken)> = if auto_serve {
let bus = {
let mut guard = event_bus
.lock()
.map_err(|_| BlockError::Bus("event_bus mutex poisoned".into()))?;
guard
.take()
.ok_or_else(|| BlockError::Bus("event_bus already taken".into()))?
};
let token = CancellationToken::new();
let token_for_task = token.clone();
let handle = tokio::spawn(async move {
let mut bus = bus;
if let Err(e) = bus.run(token_for_task).await {
tracing::error!(error = %e, "auto-serve: dispatcher loop returned error");
}
});
info!("auto-serve: dispatcher spawned");
Some((handle, token))
} else {
None
};
let mesh_agent = if let Some(ref relay_url) = config.relay_url {
let keypair = match &secret_key_resolved {
Some(hex_str) => {
let bytes = hex_decode_32(hex_str)
.map_err(|e| BlockError::Runtime(format!("secret-key: {e}")))?;
agent_mesh_core::identity::AgentKeypair::from_bytes(&bytes)
}
None => agent_mesh_core::identity::AgentKeypair::generate(),
};
info!(agent_id = %keypair.agent_id(), "mesh identity");
let acl = agent_mesh_core::acl::AclPolicy {
default_deny: false,
rules: vec![],
};
let handler: Arc<dyn agent_mesh_sdk::RequestHandler> =
Arc::new(BusRelayHandler::new(bus_tx.clone()));
let url = relay_url.clone();
let agent = agent_mesh_sdk::MeshAgent::connect(keypair, &url, acl, handler)
.await
.map_err(|e| BlockError::Mesh(format!("connect to {relay_url} failed: {e}")))?;
info!(relay_url = %relay_url, "mesh connected");
Some(Arc::new(agent))
} else {
None
};
let mcp_manager = Arc::new(RwLock::new(McpManager::with_rpc_timeout(
config.mcp_rpc_timeout,
)?));
let project_root = config
.project_root
.canonicalize()
.or_else(|_| std::env::current_dir().map(|cwd| cwd.join(&config.project_root)))?;
let http_client = config.http_client.clone().unwrap_or_default();
let sql_path = match &config.sql_path {
Some(p) => p.clone(),
None => crate::bridge::config::sql_path().map_err(BlockError::Runtime)?,
};
let (sql_conn, sql_interrupt) = open_sqlite(&sql_path, "sql")?;
let kv_path = match &config.kv_path {
Some(p) => p.clone(),
None => crate::bridge::config::kv_path().map_err(BlockError::Runtime)?,
};
let (kv_conn, kv_interrupt) = open_sqlite(&kv_path, "kv")?;
let ts_path = match &config.ts_path {
Some(p) => p.clone(),
None => crate::bridge::config::ts_path().map_err(BlockError::Runtime)?,
};
let (ts_conn, ts_interrupt) = open_sqlite(&ts_path, "ts")?;
let script_dir = script_dir_pathbuf.to_string_lossy().to_string();
let blocks_paths = build_blocks_path(&project_root);
let prompt = prompt_resolved.clone();
let context = context_resolved.clone();
let (isle, driver) = AsyncIsle::spawn(build_isle_init(
script_name.clone(),
script_dir.clone(),
blocks_paths.clone(),
prompt.clone(),
context.clone(),
config.extra_globals.clone(),
))
.await
.map_err(|e| BlockError::Runtime(format!("AsyncIsle spawn failed: {e}")))?;
let isle = Arc::new(isle);
let (handler_isle, handler_driver) = spawn_handler_isle(
script_name.clone(),
script_dir.clone(),
blocks_paths.clone(),
prompt,
context,
config.extra_globals.clone(),
)
.await?;
{
let mut mgr = mcp_manager.write().await;
mgr.set_handler_isle(Arc::clone(&handler_isle));
mgr.set_main_isle(Arc::clone(&isle));
}
let ctx = HostContext {
project_root,
mesh_agent,
mcp_manager: Arc::clone(&mcp_manager),
http_client,
sql_conn,
sql_interrupt,
kv_conn,
kv_interrupt,
ts_conn,
ts_interrupt,
isle: Arc::clone(&isle),
handler_isle: Arc::clone(&handler_isle),
bus_tx: bus_tx.clone(),
event_bus: Arc::clone(&event_bus),
};
{
let ctx = ctx.clone();
isle.exec(move |lua| {
bridge::register_all(lua, &ctx)
.map_err(|e| mlua_isle::IsleError::Lua(format!("bridge register failed: {e}")))?;
Ok(String::new())
})
.await
.map_err(|e| BlockError::Runtime(format!("bridge register: {e}")))?;
}
{
let ctx = ctx.clone();
handler_isle
.exec(move |lua| {
bridge::register_all_handler_side(lua, &ctx).map_err(|e| {
mlua_isle::IsleError::Lua(format!("handler bridge register failed: {e}"))
})?;
Ok(String::new())
})
.await
.map_err(|e| BlockError::Runtime(format!("handler bridge register: {e}")))?;
}
if !config.host_tools.is_empty() {
let host_tools = config.host_tools.clone();
let tool_count = host_tools.len();
isle.exec(move |lua| {
let registry: mlua::Table = lua
.globals()
.get("_TOOL_REGISTRY")
.map_err(|e| mlua_isle::IsleError::Lua(format!("get _TOOL_REGISTRY: {e}")))?;
for tool in host_tools {
let entry = lua
.create_table()
.map_err(|e| mlua_isle::IsleError::Lua(format!("create entry: {e}")))?;
entry
.set("name", tool.name.as_str())
.map_err(|e| mlua_isle::IsleError::Lua(format!("set name: {e}")))?;
let schema = lua
.create_table()
.map_err(|e| mlua_isle::IsleError::Lua(format!("create schema: {e}")))?;
schema
.set("description", tool.description.as_str())
.map_err(|e| mlua_isle::IsleError::Lua(format!("set description: {e}")))?;
let input_schema_lua =
crate::bridge::json_to_lua(lua, tool.input_schema.clone())
.map_err(|e| mlua_isle::IsleError::Lua(format!("input_schema: {e}")))?;
schema
.set("input_schema", input_schema_lua)
.map_err(|e| mlua_isle::IsleError::Lua(format!("set input_schema: {e}")))?;
entry
.set("schema", schema)
.map_err(|e| mlua_isle::IsleError::Lua(format!("set schema: {e}")))?;
if let Some(group) = &tool.group {
entry
.set("group", group.as_str())
.map_err(|e| mlua_isle::IsleError::Lua(format!("set group: {e}")))?;
}
let handler_arc = Arc::clone(&tool.handler);
let handler_fn = lua
.create_async_function(move |lua, input: mlua::Value| {
let handler = Arc::clone(&handler_arc);
async move {
let input_json = crate::bridge::lua_to_json(&lua, input)?;
let result = handler
.call(input_json)
.await
.map_err(mlua::Error::external)?;
crate::bridge::json_to_lua(&lua, result)
}
})
.map_err(|e| mlua_isle::IsleError::Lua(format!("create handler: {e}")))?;
entry
.set("handler", handler_fn)
.map_err(|e| mlua_isle::IsleError::Lua(format!("set handler: {e}")))?;
registry
.set(tool.name.as_str(), entry)
.map_err(|e| mlua_isle::IsleError::Lua(format!("registry set: {e}")))?;
}
Ok(String::new())
})
.await
.map_err(|e| BlockError::Runtime(format!("host_tools inject: {e}")))?;
info!(count = tool_count, "host tools injected into Lua registry");
}
drop(_init_span);
let script_result: Result<(), BlockError> = {
let _exec_span = info_span!("execute", script = %script_name);
let mut task = isle.spawn_coroutine_eval(&script_source);
let task_cancel = task.cancel_token().clone();
match config.shutdown_token.as_ref() {
Some(token) => {
tokio::select! {
biased;
_ = token.cancelled() => {
task_cancel.cancel();
let _ = (&mut task).await;
info!("shutdown_token: cancelled by caller");
Err(BlockError::Cancelled)
}
res = &mut task => res.map(|_| ()).map_err(|e| BlockError::Script(format!("{e}"))),
}
}
None => (&mut task)
.await
.map(|_| ())
.map_err(|e| BlockError::Script(format!("{e}"))),
}
};
if let Some((handle, token)) = auto_serve_state {
let grace_ms = crate::bridge::config::task_grace_ms();
let grace = Duration::from_millis(grace_ms);
tokio::time::sleep(grace).await;
token.cancel();
match tokio::time::timeout(grace, handle).await {
Ok(Ok(())) => info!("auto-serve: dispatcher shut down cleanly"),
Ok(Err(join_err)) => {
tracing::error!(error = %join_err, "auto-serve: dispatcher task join error");
}
Err(_) => {
tracing::warn!(
grace_ms,
"auto-serve: dispatcher join timed out after cancel; forcing exit"
);
}
}
}
{
let _shutdown_span = info_span!("shutdown");
mcp_manager.write().await.disconnect_all().await?;
driver
.shutdown()
.await
.map_err(|e| BlockError::Runtime(format!("AsyncIsle shutdown failed: {e}")))?;
match handler_driver.shutdown().await {
Ok(()) => info!(
thread_name = "agent-block-handler-isle",
"handler Isle shut down"
),
Err(e) => tracing::error!(
error = %e,
thread_name = "agent-block-handler-isle",
"handler Isle shutdown failed"
),
}
}
script_result
}
struct BusRelayHandler {
tx: mpsc::Sender<Event>,
}
impl BusRelayHandler {
fn new(tx: mpsc::Sender<Event>) -> Self {
Self { tx }
}
}
const BUS_ACK_TIMEOUT: Duration = Duration::from_secs(30);
#[async_trait::async_trait]
impl agent_mesh_sdk::RequestHandler for BusRelayHandler {
async fn handle(
&self,
from: &agent_mesh_core::identity::AgentId,
payload: &serde_json::Value,
_cancel: agent_mesh_sdk::CancelToken,
) -> serde_json::Value {
let id = uuid::Uuid::new_v4().to_string();
let meta = serde_json::json!({"from": from.to_string()});
let (ack_tx, ack_rx) = oneshot::channel();
let event = Event {
kind: "mesh".into(),
id: id.clone(),
payload: payload.clone(),
meta,
ack_tx: Some(ack_tx),
};
if let Err(e) = self.tx.send(event).await {
tracing::error!(error = %e, id = %id, "bus channel closed; rejecting mesh request");
return serde_json::json!({"error": "bus channel closed"});
}
match tokio::time::timeout(BUS_ACK_TIMEOUT, ack_rx).await {
Ok(Ok(Ok(v))) => v,
Ok(Ok(Err(e))) => {
tracing::error!(id = %id, error = %e, "mesh handler returned error");
serde_json::json!({"error": e.to_string()})
}
Ok(Err(e)) => {
tracing::error!(id = %id, error = %e, "mesh ack receiver dropped");
serde_json::json!({"error": "ack dropped"})
}
Err(_) => {
tracing::error!(id = %id, timeout_secs = BUS_ACK_TIMEOUT.as_secs(), "mesh handler timeout");
serde_json::json!({"error": "handler timeout"})
}
}
}
}