use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::RwLock;
use mlua_isle::AsyncIsle;
use tracing::{info, info_span, warn};
use crate::bridge;
use crate::error::{BlockError, BlockResult};
use crate::mcp_client::McpManager;
const EMBEDDED_BLOCKS: &[(&str, &str)] = &[("agent", include_str!("../blocks/agent/init.lua"))];
fn build_blocks_path(project_root: &Path) -> String {
let mut out = String::new();
let project_blocks = project_root.join("blocks");
if project_blocks.is_dir() {
let pb = project_blocks.to_string_lossy();
out.push_str(&format!("{pb}/?.lua;{pb}/?/init.lua;"));
}
match std::env::current_exe() {
Ok(exe) => {
if let Some(exe_dir) = exe.parent() {
let exe_blocks = exe_dir.join("blocks");
if exe_blocks.is_dir() {
let eb = exe_blocks.to_string_lossy();
out.push_str(&format!("{eb}/?.lua;{eb}/?/init.lua;"));
}
}
}
Err(e) => {
warn!(error = %e, "current_exe() failed; skipping exe_dir/blocks/ from package.path");
}
}
out
}
pub struct BlockConfig {
pub script_path: PathBuf,
pub project_root: PathBuf,
pub relay_url: Option<String>,
pub mcp_rpc_timeout: Duration,
}
#[derive(Clone)]
pub struct HostContext {
pub project_root: PathBuf,
pub mesh_agent: Option<Arc<agent_mesh_sdk::MeshAgent>>,
pub mcp_manager: Arc<RwLock<McpManager>>,
pub http_client: reqwest::Client,
pub sql_conn: Arc<Mutex<rusqlite::Connection>>,
pub sql_interrupt: Arc<rusqlite::InterruptHandle>,
pub kv_conn: Arc<Mutex<rusqlite::Connection>>,
pub kv_interrupt: Arc<rusqlite::InterruptHandle>,
}
fn open_sqlite(
path: &Path,
label: &'static str,
) -> BlockResult<(
Arc<Mutex<rusqlite::Connection>>,
Arc<rusqlite::InterruptHandle>,
)> {
let is_memory = crate::bridge::config::is_memory_sql(path);
if !is_memory {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| BlockError::Runtime(format!("{label} dir create: {e}")))?;
}
}
let conn = rusqlite::Connection::open(path)
.map_err(|e| BlockError::Runtime(format!("sqlite open {}: {e}", path.display())))?;
if !is_memory {
let journal = crate::bridge::config::sql_journal_mode();
conn.pragma_update(None, "journal_mode", &journal)
.map_err(|e| BlockError::Runtime(format!("journal_mode={journal}: {e}")))?;
}
let busy_ms = crate::bridge::config::sql_busy_timeout().as_millis() as i64;
conn.pragma_update(None, "busy_timeout", busy_ms)
.map_err(|e| BlockError::Runtime(format!("busy_timeout pragma: {e}")))?;
info!(label, path = %path.display(), busy_ms, "sqlite initialized");
let interrupt = Arc::new(conn.get_interrupt_handle());
let conn = Arc::new(Mutex::new(conn));
Ok((conn, interrupt))
}
pub async fn run(config: BlockConfig) -> BlockResult<()> {
let script_name = config
.script_path
.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_else(|| "unknown".to_string());
let root_span = info_span!("agent_block", script = %script_name);
let _root_guard = root_span.enter();
let env_path = config.project_root.join(".env");
match dotenvy::from_path(&env_path) {
Ok(()) => info!(path = %env_path.display(), ".env loaded"),
Err(dotenvy::Error::Io(_)) => {} Err(e) => tracing::warn!(path = %env_path.display(), error = %e, ".env parse error"),
}
let _init_guard = info_span!("init").entered();
let mesh_agent = if let Some(ref relay_url) = config.relay_url {
let keypair = agent_mesh_core::identity::AgentKeypair::generate();
let acl = agent_mesh_core::acl::AclPolicy {
default_deny: false,
rules: vec![],
};
let handler: Arc<dyn agent_mesh_sdk::RequestHandler> = Arc::new(NoopHandler);
let url = relay_url.clone();
let agent = agent_mesh_sdk::MeshAgent::connect(keypair, &url, acl, handler)
.await
.map_err(|e| BlockError::Mesh(format!("connect to {relay_url} failed: {e}")))?;
info!(relay_url = %relay_url, "mesh connected");
Some(Arc::new(agent))
} else {
None
};
let mcp_manager = Arc::new(RwLock::new(McpManager::with_rpc_timeout(
config.mcp_rpc_timeout,
)?));
let project_root = config
.project_root
.canonicalize()
.or_else(|_| std::env::current_dir().map(|cwd| cwd.join(&config.project_root)))?;
let http_client = reqwest::Client::new();
let sql_path = crate::bridge::config::sql_path().map_err(BlockError::Runtime)?;
let (sql_conn, sql_interrupt) = open_sqlite(&sql_path, "sql")?;
let kv_path = crate::bridge::config::kv_path().map_err(BlockError::Runtime)?;
let (kv_conn, kv_interrupt) = open_sqlite(&kv_path, "kv")?;
let ctx = HostContext {
project_root,
mesh_agent,
mcp_manager: Arc::clone(&mcp_manager),
http_client,
sql_conn,
sql_interrupt,
kv_conn,
kv_interrupt,
};
let script_path = config.script_path.clone();
let script_dir = script_path
.parent()
.map(|p| p.to_string_lossy().to_string())
.unwrap_or_else(|| ".".to_string());
let script_name_for_lua = script_name.clone();
let (isle, driver) = AsyncIsle::spawn(move |lua| {
lua.globals()
.set("_SCRIPT_NAME", script_name_for_lua.as_str())?;
mlua_batteries::register_all(lua, "std")?;
bridge::register_all(lua, &ctx)?;
let package: mlua::Table = lua.globals().get("package")?;
let current_path: String = package.get("path")?;
let blocks_paths = build_blocks_path(&ctx.project_root);
let new_path =
format!("{script_dir}/?.lua;{script_dir}/?/init.lua;{blocks_paths}{current_path}");
package.set("path", new_path)?;
let embedded: HashMap<&'static str, &'static str> =
EMBEDDED_BLOCKS.iter().copied().collect();
let searchers: mlua::Table = package.get("searchers")?;
let loader =
lua.create_function(move |lua, name: String| match embedded.get(name.as_str()) {
Some(source) => {
let chunk = lua
.load(*source)
.set_name(format!("@embedded:blocks/{name}/init.lua"));
let func = chunk.into_function()?;
Ok(mlua::Value::Function(func))
}
None => {
let msg = lua.create_string(format!("\n\tno embedded block '{name}'"))?;
Ok(mlua::Value::String(msg))
}
})?;
let next_idx = searchers.raw_len() + 1;
searchers.raw_set(next_idx, loader)?;
Ok(())
})
.await
.map_err(|e| BlockError::Runtime(format!("AsyncIsle spawn failed: {e}")))?;
drop(_init_guard);
{
let _exec_guard = info_span!("execute", script = %script_name).entered();
let script = std::fs::read_to_string(&script_path)
.map_err(|e| BlockError::Script(format!("{}: {e}", script_path.display())))?;
isle.coroutine_eval(&script)
.await
.map_err(|e| BlockError::Script(format!("{e}")))?;
}
{
let _shutdown_guard = info_span!("shutdown").entered();
mcp_manager.write().await.disconnect_all().await?;
driver
.shutdown()
.await
.map_err(|e| BlockError::Runtime(format!("AsyncIsle shutdown failed: {e}")))?;
}
Ok(())
}
struct NoopHandler;
#[async_trait::async_trait]
impl agent_mesh_sdk::RequestHandler for NoopHandler {
async fn handle(
&self,
_from: &agent_mesh_core::identity::AgentId,
_payload: &serde_json::Value,
_cancel: agent_mesh_sdk::CancelToken,
) -> serde_json::Value {
serde_json::json!({"error": "no handler registered"})
}
}