1use std::collections::HashMap;
13use std::path::{Path, PathBuf};
14use std::sync::{Arc, Mutex};
15use std::time::Duration;
16use tokio::sync::{mpsc, oneshot, RwLock};
17
18use mlua_isle::{AsyncIsle, AsyncIsleDriver};
19use tracing::{info, info_span, warn};
20
21use crate::bridge;
22use crate::bus::{Event, EventBus};
23use agent_block_mcp::McpManager;
24use agent_block_types::error::{BlockError, BlockResult};
25
26const EMBEDDED_BLOCKS: &[(&str, &str)] = &[
30 ("agent", include_str!("../blocks/agent/init.lua")),
31 ("session", include_str!("../blocks/session/init.lua")),
32];
33
34fn build_blocks_path(project_root: &Path) -> String {
43 let mut out = String::new();
44
45 let project_blocks = project_root.join("blocks");
47 if project_blocks.is_dir() {
48 let pb = project_blocks.to_string_lossy();
49 out.push_str(&format!("{pb}/?.lua;{pb}/?/init.lua;"));
50 }
51
52 match std::env::current_exe() {
54 Ok(exe) => {
55 if let Some(exe_dir) = exe.parent() {
56 let exe_blocks = exe_dir.join("blocks");
57 if exe_blocks.is_dir() {
58 let eb = exe_blocks.to_string_lossy();
59 out.push_str(&format!("{eb}/?.lua;{eb}/?/init.lua;"));
60 }
61 }
62 }
63 Err(e) => {
64 warn!(error = %e, "current_exe() failed; skipping exe_dir/blocks/ from package.path");
65 }
66 }
67
68 out
69}
70
71pub struct BlockConfig {
72 pub script_path: PathBuf,
73 pub project_root: PathBuf,
74 pub relay_url: Option<String>,
75 pub secret_key: Option<String>,
78 pub mcp_rpc_timeout: Duration,
81 pub prompt: Option<String>,
83 pub context: Option<String>,
85}
86
87#[derive(Clone)]
89pub struct HostContext {
90 pub project_root: PathBuf,
91 pub mesh_agent: Option<Arc<agent_mesh_sdk::MeshAgent>>,
92 pub mcp_manager: Arc<RwLock<McpManager>>,
93 pub http_client: reqwest::Client,
95 pub sql_conn: Arc<Mutex<rusqlite::Connection>>,
97 pub sql_interrupt: Arc<rusqlite::InterruptHandle>,
100 pub kv_conn: Arc<Mutex<rusqlite::Connection>>,
104 pub kv_interrupt: Arc<rusqlite::InterruptHandle>,
106 pub ts_conn: Arc<Mutex<rusqlite::Connection>>,
109 #[allow(dead_code)]
112 pub ts_interrupt: Arc<rusqlite::InterruptHandle>,
113 #[allow(dead_code)]
120 pub isle: Arc<AsyncIsle>,
121 pub handler_isle: Arc<AsyncIsle>,
131 #[allow(dead_code)]
136 pub bus_tx: mpsc::Sender<Event>,
137 pub event_bus: Arc<Mutex<Option<EventBus>>>,
142}
143
144fn open_sqlite(
151 path: &Path,
152 label: &'static str,
153) -> BlockResult<(
154 Arc<Mutex<rusqlite::Connection>>,
155 Arc<rusqlite::InterruptHandle>,
156)> {
157 let is_memory = crate::bridge::config::is_memory_sql(path);
158 if !is_memory {
159 if let Some(parent) = path.parent() {
160 std::fs::create_dir_all(parent)
161 .map_err(|e| BlockError::Runtime(format!("{label} dir create: {e}")))?;
162 }
163 }
164 let conn = rusqlite::Connection::open(path)
165 .map_err(|e| BlockError::Runtime(format!("sqlite open {}: {e}", path.display())))?;
166 if !is_memory {
167 let journal = crate::bridge::config::sql_journal_mode();
168 conn.pragma_update(None, "journal_mode", &journal)
169 .map_err(|e| BlockError::Runtime(format!("journal_mode={journal}: {e}")))?;
170 }
171 let busy_ms = crate::bridge::config::sql_busy_timeout().as_millis() as i64;
172 conn.pragma_update(None, "busy_timeout", busy_ms)
173 .map_err(|e| BlockError::Runtime(format!("busy_timeout pragma: {e}")))?;
174 info!(label, path = %path.display(), busy_ms, "sqlite initialized");
175 let interrupt = Arc::new(conn.get_interrupt_handle());
176 let conn = Arc::new(Mutex::new(conn));
177 Ok((conn, interrupt))
178}
179
180fn build_isle_init(
189 script_name: String,
190 script_dir: String,
191 blocks_paths: String,
192 prompt: Option<String>,
193 context: Option<String>,
194) -> impl FnOnce(&mlua::Lua) -> mlua::Result<()> + Send + 'static {
195 move |lua| {
196 lua.globals().set("_SCRIPT_NAME", script_name.as_str())?;
198 if let Some(ref p) = prompt {
199 lua.globals().set("_PROMPT", p.as_str())?;
200 }
201 if let Some(ref c) = context {
202 lua.globals().set("_CONTEXT", c.as_str())?;
203 }
204
205 mlua_batteries::register_all(lua, "std")?;
206
207 let package: mlua::Table = lua.globals().get("package")?;
210 let current_path: String = package.get("path")?;
211 let new_path =
212 format!("{script_dir}/?.lua;{script_dir}/?/init.lua;{blocks_paths}{current_path}");
213 package.set("path", new_path)?;
214
215 let embedded: HashMap<&'static str, &'static str> =
220 EMBEDDED_BLOCKS.iter().copied().collect();
221
222 let searchers: mlua::Table = package.get("searchers")?;
223 let loader =
224 lua.create_function(move |lua, name: String| match embedded.get(name.as_str()) {
225 Some(source) => {
226 let chunk = lua
227 .load(*source)
228 .set_name(format!("@embedded:blocks/{name}/init.lua"));
229 let func = chunk.into_function()?;
230 Ok(mlua::Value::Function(func))
231 }
232 None => {
233 let msg = lua.create_string(format!("\n\tno embedded block '{name}'"))?;
234 Ok(mlua::Value::String(msg))
235 }
236 })?;
237 let next_idx = searchers.raw_len() + 1;
239 searchers.raw_set(next_idx, loader)?;
240
241 Ok(())
242 }
243}
244
245async fn spawn_handler_isle(
255 script_name: String,
256 script_dir: String,
257 blocks_paths: String,
258 prompt: Option<String>,
259 context: Option<String>,
260) -> BlockResult<(Arc<AsyncIsle>, AsyncIsleDriver)> {
261 let init = build_isle_init(script_name, script_dir, blocks_paths, prompt, context);
262 let (isle, driver) = AsyncIsle::builder()
263 .thread_name("agent-block-handler-isle")
264 .spawn(init)
265 .await
266 .map_err(|e| BlockError::Runtime(format!("handler isle spawn failed: {e}")))?;
267 info!(
268 thread_name = "agent-block-handler-isle",
269 "handler Isle spawned"
270 );
271 Ok((Arc::new(isle), driver))
272}
273
274fn hex_decode_32(s: &str) -> Result<[u8; 32], String> {
275 let s = s.trim();
276 if s.len() != 64 {
277 return Err(format!("expected 64 hex chars, got {}", s.len()));
278 }
279 let mut out = [0u8; 32];
280 for (i, byte) in out.iter_mut().enumerate() {
281 let hi = u8::from_str_radix(&s[2 * i..2 * i + 1], 16)
282 .map_err(|e| format!("invalid hex at position {}: {e}", 2 * i))?;
283 let lo = u8::from_str_radix(&s[2 * i + 1..2 * i + 2], 16)
284 .map_err(|e| format!("invalid hex at position {}: {e}", 2 * i + 1))?;
285 *byte = (hi << 4) | lo;
286 }
287 Ok(out)
288}
289
290pub async fn run(config: BlockConfig) -> BlockResult<()> {
291 let script_name = config
292 .script_path
293 .file_name()
294 .map(|n| n.to_string_lossy().to_string())
295 .unwrap_or_else(|| "unknown".to_string());
296
297 let root_span = info_span!("agent_block", script = %script_name);
298 let _root_guard = root_span.enter();
299
300 let env_path = config.project_root.join(".env");
304 match dotenvy::from_path(&env_path) {
305 Ok(()) => info!(path = %env_path.display(), ".env loaded"),
306 Err(dotenvy::Error::Io(_)) => {} Err(e) => tracing::warn!(path = %env_path.display(), error = %e, ".env parse error"),
308 }
309
310 let _init_guard = info_span!("init").entered();
312
313 let bus_capacity = crate::bridge::config::bus_capacity();
318 let (bus_tx, bus_rx) = mpsc::channel::<Event>(bus_capacity);
319 let event_bus = Arc::new(Mutex::new(Some(EventBus::new(bus_rx))));
320
321 let mesh_agent = if let Some(ref relay_url) = config.relay_url {
322 let keypair = match &config.secret_key {
323 Some(hex_str) => {
324 let bytes = hex_decode_32(hex_str)
325 .map_err(|e| BlockError::Runtime(format!("--secret-key: {e}")))?;
326 agent_mesh_core::identity::AgentKeypair::from_bytes(&bytes)
327 }
328 None => agent_mesh_core::identity::AgentKeypair::generate(),
329 };
330 info!(agent_id = %keypair.agent_id(), "mesh identity");
331 let acl = agent_mesh_core::acl::AclPolicy {
332 default_deny: false,
333 rules: vec![],
334 };
335 let handler: Arc<dyn agent_mesh_sdk::RequestHandler> =
336 Arc::new(BusRelayHandler::new(bus_tx.clone()));
337 let url = relay_url.clone();
338 let agent = agent_mesh_sdk::MeshAgent::connect(keypair, &url, acl, handler)
339 .await
340 .map_err(|e| BlockError::Mesh(format!("connect to {relay_url} failed: {e}")))?;
341 info!(relay_url = %relay_url, "mesh connected");
342 Some(Arc::new(agent))
343 } else {
344 None
345 };
346
347 let mcp_manager = Arc::new(RwLock::new(McpManager::with_rpc_timeout(
348 config.mcp_rpc_timeout,
349 )?));
350
351 let project_root = config
355 .project_root
356 .canonicalize()
357 .or_else(|_| std::env::current_dir().map(|cwd| cwd.join(&config.project_root)))?;
358
359 let http_client = reqwest::Client::new();
360
361 let sql_path = crate::bridge::config::sql_path().map_err(BlockError::Runtime)?;
364 let (sql_conn, sql_interrupt) = open_sqlite(&sql_path, "sql")?;
365
366 let kv_path = crate::bridge::config::kv_path().map_err(BlockError::Runtime)?;
367 let (kv_conn, kv_interrupt) = open_sqlite(&kv_path, "kv")?;
368
369 let ts_path = crate::bridge::config::ts_path().map_err(BlockError::Runtime)?;
370 let (ts_conn, ts_interrupt) = open_sqlite(&ts_path, "ts")?;
371
372 let script_path = config.script_path.clone();
373 let script_dir = script_path
374 .parent()
375 .map(|p| p.to_string_lossy().to_string())
376 .unwrap_or_else(|| ".".to_string());
377
378 let blocks_paths = build_blocks_path(&project_root);
384 let prompt = config.prompt.clone();
385 let context = config.context.clone();
386
387 let (isle, driver) = AsyncIsle::spawn(build_isle_init(
389 script_name.clone(),
390 script_dir.clone(),
391 blocks_paths.clone(),
392 prompt.clone(),
393 context.clone(),
394 ))
395 .await
396 .map_err(|e| BlockError::Runtime(format!("AsyncIsle spawn failed: {e}")))?;
397 let isle = Arc::new(isle);
398
399 let (handler_isle, handler_driver) = spawn_handler_isle(
401 script_name.clone(),
402 script_dir.clone(),
403 blocks_paths.clone(),
404 prompt,
405 context,
406 )
407 .await?;
408
409 {
415 let mut mgr = mcp_manager.write().await;
416 mgr.set_handler_isle(Arc::clone(&handler_isle));
417 mgr.set_main_isle(Arc::clone(&isle));
418 }
419
420 let ctx = HostContext {
425 project_root,
426 mesh_agent,
427 mcp_manager: Arc::clone(&mcp_manager),
428 http_client,
429 sql_conn,
430 sql_interrupt,
431 kv_conn,
432 kv_interrupt,
433 ts_conn,
434 ts_interrupt,
435 isle: Arc::clone(&isle),
436 handler_isle: Arc::clone(&handler_isle),
437 bus_tx: bus_tx.clone(),
438 event_bus: Arc::clone(&event_bus),
439 };
440
441 {
442 let ctx = ctx.clone();
443 isle.exec(move |lua| {
444 bridge::register_all(lua, &ctx)
445 .map_err(|e| mlua_isle::IsleError::Lua(format!("bridge register failed: {e}")))?;
446 Ok(String::new())
447 })
448 .await
449 .map_err(|e| BlockError::Runtime(format!("bridge register: {e}")))?;
450 }
451
452 {
453 let ctx = ctx.clone();
454 handler_isle
455 .exec(move |lua| {
456 bridge::register_all_handler_side(lua, &ctx).map_err(|e| {
457 mlua_isle::IsleError::Lua(format!("handler bridge register failed: {e}"))
458 })?;
459 Ok(String::new())
460 })
461 .await
462 .map_err(|e| BlockError::Runtime(format!("handler bridge register: {e}")))?;
463 }
464
465 drop(_init_guard);
466
467 {
469 let _exec_guard = info_span!("execute", script = %script_name).entered();
470
471 let script = std::fs::read_to_string(&script_path)
472 .map_err(|e| BlockError::Script(format!("{}: {e}", script_path.display())))?;
473
474 isle.coroutine_eval(&script)
475 .await
476 .map_err(|e| BlockError::Script(format!("{e}")))?;
477 }
478
479 {
481 let _shutdown_guard = info_span!("shutdown").entered();
482
483 mcp_manager.write().await.disconnect_all().await?;
484
485 driver
486 .shutdown()
487 .await
488 .map_err(|e| BlockError::Runtime(format!("AsyncIsle shutdown failed: {e}")))?;
489
490 match handler_driver.shutdown().await {
495 Ok(()) => info!(
496 thread_name = "agent-block-handler-isle",
497 "handler Isle shut down"
498 ),
499 Err(e) => tracing::error!(
500 error = %e,
501 thread_name = "agent-block-handler-isle",
502 "handler Isle shutdown failed"
503 ),
504 }
505 }
506
507 Ok(())
508}
509
510struct BusRelayHandler {
529 tx: mpsc::Sender<Event>,
530}
531
532impl BusRelayHandler {
533 fn new(tx: mpsc::Sender<Event>) -> Self {
534 Self { tx }
535 }
536}
537
538const BUS_ACK_TIMEOUT: Duration = Duration::from_secs(30);
540
541#[async_trait::async_trait]
542impl agent_mesh_sdk::RequestHandler for BusRelayHandler {
543 async fn handle(
544 &self,
545 from: &agent_mesh_core::identity::AgentId,
546 payload: &serde_json::Value,
547 _cancel: agent_mesh_sdk::CancelToken,
548 ) -> serde_json::Value {
549 let id = uuid::Uuid::new_v4().to_string();
550 let meta = serde_json::json!({"from": from.to_string()});
551 let (ack_tx, ack_rx) = oneshot::channel();
552 let event = Event {
553 kind: "mesh".into(),
554 id: id.clone(),
555 payload: payload.clone(),
556 meta,
557 ack_tx: Some(ack_tx),
558 };
559
560 if let Err(e) = self.tx.send(event).await {
561 tracing::error!(error = %e, id = %id, "bus channel closed; rejecting mesh request");
562 return serde_json::json!({"error": "bus channel closed"});
563 }
564
565 match tokio::time::timeout(BUS_ACK_TIMEOUT, ack_rx).await {
566 Ok(Ok(Ok(v))) => v,
567 Ok(Ok(Err(e))) => {
568 tracing::error!(id = %id, error = %e, "mesh handler returned error");
569 serde_json::json!({"error": e.to_string()})
570 }
571 Ok(Err(e)) => {
572 tracing::error!(id = %id, error = %e, "mesh ack receiver dropped");
573 serde_json::json!({"error": "ack dropped"})
574 }
575 Err(_) => {
576 tracing::error!(id = %id, timeout_secs = BUS_ACK_TIMEOUT.as_secs(), "mesh handler timeout");
577 serde_json::json!({"error": "handler timeout"})
578 }
579 }
580 }
581}