use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::{mpsc, oneshot, RwLock};
use mlua_isle::{AsyncIsle, AsyncIsleDriver};
use tracing::{info, info_span, warn};
use crate::bridge;
use crate::bus::{Event, EventBus};
use crate::error::{BlockError, BlockResult};
use crate::mcp_client::McpManager;
const EMBEDDED_BLOCKS: &[(&str, &str)] = &[("agent", include_str!("../blocks/agent/init.lua"))];
fn build_blocks_path(project_root: &Path) -> String {
let mut out = String::new();
let project_blocks = project_root.join("blocks");
if project_blocks.is_dir() {
let pb = project_blocks.to_string_lossy();
out.push_str(&format!("{pb}/?.lua;{pb}/?/init.lua;"));
}
match std::env::current_exe() {
Ok(exe) => {
if let Some(exe_dir) = exe.parent() {
let exe_blocks = exe_dir.join("blocks");
if exe_blocks.is_dir() {
let eb = exe_blocks.to_string_lossy();
out.push_str(&format!("{eb}/?.lua;{eb}/?/init.lua;"));
}
}
}
Err(e) => {
warn!(error = %e, "current_exe() failed; skipping exe_dir/blocks/ from package.path");
}
}
out
}
pub struct BlockConfig {
pub script_path: PathBuf,
pub project_root: PathBuf,
pub relay_url: Option<String>,
pub secret_key: Option<String>,
pub mcp_rpc_timeout: Duration,
}
#[derive(Clone)]
pub struct HostContext {
pub project_root: PathBuf,
pub mesh_agent: Option<Arc<agent_mesh_sdk::MeshAgent>>,
pub mcp_manager: Arc<RwLock<McpManager>>,
pub http_client: reqwest::Client,
pub sql_conn: Arc<Mutex<rusqlite::Connection>>,
pub sql_interrupt: Arc<rusqlite::InterruptHandle>,
pub kv_conn: Arc<Mutex<rusqlite::Connection>>,
pub kv_interrupt: Arc<rusqlite::InterruptHandle>,
pub ts_conn: Arc<Mutex<rusqlite::Connection>>,
#[allow(dead_code)]
pub ts_interrupt: Arc<rusqlite::InterruptHandle>,
#[allow(dead_code)]
pub isle: Arc<AsyncIsle>,
pub handler_isle: Arc<AsyncIsle>,
#[allow(dead_code)]
pub bus_tx: mpsc::Sender<Event>,
pub event_bus: Arc<Mutex<Option<EventBus>>>,
}
fn open_sqlite(
path: &Path,
label: &'static str,
) -> BlockResult<(
Arc<Mutex<rusqlite::Connection>>,
Arc<rusqlite::InterruptHandle>,
)> {
let is_memory = crate::bridge::config::is_memory_sql(path);
if !is_memory {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| BlockError::Runtime(format!("{label} dir create: {e}")))?;
}
}
let conn = rusqlite::Connection::open(path)
.map_err(|e| BlockError::Runtime(format!("sqlite open {}: {e}", path.display())))?;
if !is_memory {
let journal = crate::bridge::config::sql_journal_mode();
conn.pragma_update(None, "journal_mode", &journal)
.map_err(|e| BlockError::Runtime(format!("journal_mode={journal}: {e}")))?;
}
let busy_ms = crate::bridge::config::sql_busy_timeout().as_millis() as i64;
conn.pragma_update(None, "busy_timeout", busy_ms)
.map_err(|e| BlockError::Runtime(format!("busy_timeout pragma: {e}")))?;
info!(label, path = %path.display(), busy_ms, "sqlite initialized");
let interrupt = Arc::new(conn.get_interrupt_handle());
let conn = Arc::new(Mutex::new(conn));
Ok((conn, interrupt))
}
fn build_isle_init(
script_name: String,
script_dir: String,
blocks_paths: String,
) -> impl FnOnce(&mlua::Lua) -> mlua::Result<()> + Send + 'static {
move |lua| {
lua.globals().set("_SCRIPT_NAME", script_name.as_str())?;
mlua_batteries::register_all(lua, "std")?;
let package: mlua::Table = lua.globals().get("package")?;
let current_path: String = package.get("path")?;
let new_path =
format!("{script_dir}/?.lua;{script_dir}/?/init.lua;{blocks_paths}{current_path}");
package.set("path", new_path)?;
let embedded: HashMap<&'static str, &'static str> =
EMBEDDED_BLOCKS.iter().copied().collect();
let searchers: mlua::Table = package.get("searchers")?;
let loader =
lua.create_function(move |lua, name: String| match embedded.get(name.as_str()) {
Some(source) => {
let chunk = lua
.load(*source)
.set_name(format!("@embedded:blocks/{name}/init.lua"));
let func = chunk.into_function()?;
Ok(mlua::Value::Function(func))
}
None => {
let msg = lua.create_string(format!("\n\tno embedded block '{name}'"))?;
Ok(mlua::Value::String(msg))
}
})?;
let next_idx = searchers.raw_len() + 1;
searchers.raw_set(next_idx, loader)?;
Ok(())
}
}
async fn spawn_handler_isle(
script_name: String,
script_dir: String,
blocks_paths: String,
) -> BlockResult<(Arc<AsyncIsle>, AsyncIsleDriver)> {
let init = build_isle_init(script_name, script_dir, blocks_paths);
let (isle, driver) = AsyncIsle::builder()
.thread_name("agent-block-handler-isle")
.spawn(init)
.await
.map_err(|e| BlockError::Runtime(format!("handler isle spawn failed: {e}")))?;
info!(
thread_name = "agent-block-handler-isle",
"handler Isle spawned"
);
Ok((Arc::new(isle), driver))
}
fn hex_decode_32(s: &str) -> Result<[u8; 32], String> {
let s = s.trim();
if s.len() != 64 {
return Err(format!("expected 64 hex chars, got {}", s.len()));
}
let mut out = [0u8; 32];
for (i, byte) in out.iter_mut().enumerate() {
let hi = u8::from_str_radix(&s[2 * i..2 * i + 1], 16)
.map_err(|e| format!("invalid hex at position {}: {e}", 2 * i))?;
let lo = u8::from_str_radix(&s[2 * i + 1..2 * i + 2], 16)
.map_err(|e| format!("invalid hex at position {}: {e}", 2 * i + 1))?;
*byte = (hi << 4) | lo;
}
Ok(out)
}
pub async fn run(config: BlockConfig) -> BlockResult<()> {
let script_name = config
.script_path
.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_else(|| "unknown".to_string());
let root_span = info_span!("agent_block", script = %script_name);
let _root_guard = root_span.enter();
let env_path = config.project_root.join(".env");
match dotenvy::from_path(&env_path) {
Ok(()) => info!(path = %env_path.display(), ".env loaded"),
Err(dotenvy::Error::Io(_)) => {} Err(e) => tracing::warn!(path = %env_path.display(), error = %e, ".env parse error"),
}
let _init_guard = info_span!("init").entered();
let bus_capacity = crate::bridge::config::bus_capacity();
let (bus_tx, bus_rx) = mpsc::channel::<Event>(bus_capacity);
let event_bus = Arc::new(Mutex::new(Some(EventBus::new(bus_rx))));
let mesh_agent = if let Some(ref relay_url) = config.relay_url {
let keypair = match &config.secret_key {
Some(hex_str) => {
let bytes = hex_decode_32(hex_str)
.map_err(|e| BlockError::Runtime(format!("--secret-key: {e}")))?;
agent_mesh_core::identity::AgentKeypair::from_bytes(&bytes)
}
None => agent_mesh_core::identity::AgentKeypair::generate(),
};
info!(agent_id = %keypair.agent_id(), "mesh identity");
let acl = agent_mesh_core::acl::AclPolicy {
default_deny: false,
rules: vec![],
};
let handler: Arc<dyn agent_mesh_sdk::RequestHandler> =
Arc::new(BusRelayHandler::new(bus_tx.clone()));
let url = relay_url.clone();
let agent = agent_mesh_sdk::MeshAgent::connect(keypair, &url, acl, handler)
.await
.map_err(|e| BlockError::Mesh(format!("connect to {relay_url} failed: {e}")))?;
info!(relay_url = %relay_url, "mesh connected");
Some(Arc::new(agent))
} else {
None
};
let mcp_manager = Arc::new(RwLock::new(McpManager::with_rpc_timeout(
config.mcp_rpc_timeout,
)?));
let project_root = config
.project_root
.canonicalize()
.or_else(|_| std::env::current_dir().map(|cwd| cwd.join(&config.project_root)))?;
let http_client = reqwest::Client::new();
let sql_path = crate::bridge::config::sql_path().map_err(BlockError::Runtime)?;
let (sql_conn, sql_interrupt) = open_sqlite(&sql_path, "sql")?;
let kv_path = crate::bridge::config::kv_path().map_err(BlockError::Runtime)?;
let (kv_conn, kv_interrupt) = open_sqlite(&kv_path, "kv")?;
let ts_path = crate::bridge::config::ts_path().map_err(BlockError::Runtime)?;
let (ts_conn, ts_interrupt) = open_sqlite(&ts_path, "ts")?;
let script_path = config.script_path.clone();
let script_dir = script_path
.parent()
.map(|p| p.to_string_lossy().to_string())
.unwrap_or_else(|| ".".to_string());
let blocks_paths = build_blocks_path(&project_root);
let (isle, driver) = AsyncIsle::spawn(build_isle_init(
script_name.clone(),
script_dir.clone(),
blocks_paths.clone(),
))
.await
.map_err(|e| BlockError::Runtime(format!("AsyncIsle spawn failed: {e}")))?;
let isle = Arc::new(isle);
let (handler_isle, handler_driver) = spawn_handler_isle(
script_name.clone(),
script_dir.clone(),
blocks_paths.clone(),
)
.await?;
{
let mut mgr = mcp_manager.write().await;
mgr.set_handler_isle(Arc::clone(&handler_isle));
mgr.set_main_isle(Arc::clone(&isle));
}
let ctx = HostContext {
project_root,
mesh_agent,
mcp_manager: Arc::clone(&mcp_manager),
http_client,
sql_conn,
sql_interrupt,
kv_conn,
kv_interrupt,
ts_conn,
ts_interrupt,
isle: Arc::clone(&isle),
handler_isle: Arc::clone(&handler_isle),
bus_tx: bus_tx.clone(),
event_bus: Arc::clone(&event_bus),
};
{
let ctx = ctx.clone();
isle.exec(move |lua| {
bridge::register_all(lua, &ctx)
.map_err(|e| mlua_isle::IsleError::Lua(format!("bridge register failed: {e}")))?;
Ok(String::new())
})
.await
.map_err(|e| BlockError::Runtime(format!("bridge register: {e}")))?;
}
{
let ctx = ctx.clone();
handler_isle
.exec(move |lua| {
bridge::register_all_handler_side(lua, &ctx).map_err(|e| {
mlua_isle::IsleError::Lua(format!("handler bridge register failed: {e}"))
})?;
Ok(String::new())
})
.await
.map_err(|e| BlockError::Runtime(format!("handler bridge register: {e}")))?;
}
drop(_init_guard);
{
let _exec_guard = info_span!("execute", script = %script_name).entered();
let script = std::fs::read_to_string(&script_path)
.map_err(|e| BlockError::Script(format!("{}: {e}", script_path.display())))?;
isle.coroutine_eval(&script)
.await
.map_err(|e| BlockError::Script(format!("{e}")))?;
}
{
let _shutdown_guard = info_span!("shutdown").entered();
mcp_manager.write().await.disconnect_all().await?;
driver
.shutdown()
.await
.map_err(|e| BlockError::Runtime(format!("AsyncIsle shutdown failed: {e}")))?;
match handler_driver.shutdown().await {
Ok(()) => info!(
thread_name = "agent-block-handler-isle",
"handler Isle shut down"
),
Err(e) => tracing::error!(
error = %e,
thread_name = "agent-block-handler-isle",
"handler Isle shutdown failed"
),
}
}
Ok(())
}
struct BusRelayHandler {
tx: mpsc::Sender<Event>,
}
impl BusRelayHandler {
fn new(tx: mpsc::Sender<Event>) -> Self {
Self { tx }
}
}
const BUS_ACK_TIMEOUT: Duration = Duration::from_secs(30);
#[async_trait::async_trait]
impl agent_mesh_sdk::RequestHandler for BusRelayHandler {
async fn handle(
&self,
from: &agent_mesh_core::identity::AgentId,
payload: &serde_json::Value,
_cancel: agent_mesh_sdk::CancelToken,
) -> serde_json::Value {
let id = uuid::Uuid::new_v4().to_string();
let meta = serde_json::json!({"from": from.to_string()});
let (ack_tx, ack_rx) = oneshot::channel();
let event = Event {
kind: "mesh".into(),
id: id.clone(),
payload: payload.clone(),
meta,
ack_tx: Some(ack_tx),
};
if let Err(e) = self.tx.send(event).await {
tracing::error!(error = %e, id = %id, "bus channel closed; rejecting mesh request");
return serde_json::json!({"error": "bus channel closed"});
}
match tokio::time::timeout(BUS_ACK_TIMEOUT, ack_rx).await {
Ok(Ok(Ok(v))) => v,
Ok(Ok(Err(e))) => {
tracing::error!(id = %id, error = %e, "mesh handler returned error");
serde_json::json!({"error": e.to_string()})
}
Ok(Err(e)) => {
tracing::error!(id = %id, error = %e, "mesh ack receiver dropped");
serde_json::json!({"error": "ack dropped"})
}
Err(_) => {
tracing::error!(id = %id, timeout_secs = BUS_ACK_TIMEOUT.as_secs(), "mesh handler timeout");
serde_json::json!({"error": "handler timeout"})
}
}
}
}