1use std::collections::HashMap;
13use std::path::{Path, PathBuf};
14use std::sync::{Arc, Mutex};
15use std::time::Duration;
16use tokio::sync::{mpsc, oneshot, RwLock};
17
18use mlua_isle::{AsyncIsle, AsyncIsleDriver};
19use tracing::{info, info_span, warn};
20
21use crate::bridge;
22use crate::bus::{Event, EventBus, Handler};
23use agent_block_mcp::McpManager;
24use agent_block_types::error::{BlockError, BlockResult};
25
26const EMBEDDED_BLOCKS: &[(&str, &str)] = &[
30 ("agent", include_str!("../blocks/agent/init.lua")),
31 ("session", include_str!("../blocks/session/init.lua")),
32];
33
34fn build_blocks_path(project_root: &Path) -> String {
43 let mut out = String::new();
44
45 let project_blocks = project_root.join("blocks");
47 if project_blocks.is_dir() {
48 let pb = project_blocks.to_string_lossy();
49 out.push_str(&format!("{pb}/?.lua;{pb}/?/init.lua;"));
50 }
51
52 match std::env::current_exe() {
54 Ok(exe) => {
55 if let Some(exe_dir) = exe.parent() {
56 let exe_blocks = exe_dir.join("blocks");
57 if exe_blocks.is_dir() {
58 let eb = exe_blocks.to_string_lossy();
59 out.push_str(&format!("{eb}/?.lua;{eb}/?/init.lua;"));
60 }
61 }
62 }
63 Err(e) => {
64 warn!(error = %e, "current_exe() failed; skipping exe_dir/blocks/ from package.path");
65 }
66 }
67
68 out
69}
70
71pub struct BlockConfig {
72 pub script_path: PathBuf,
73 pub project_root: PathBuf,
74 pub relay_url: Option<String>,
75 pub secret_key: Option<String>,
78 pub mcp_rpc_timeout: Duration,
81 pub prompt: Option<String>,
83 pub context: Option<String>,
85 pub host_handlers: HashMap<String, Arc<dyn Handler>>,
100}
101
102#[derive(Clone)]
104pub struct HostContext {
105 pub project_root: PathBuf,
106 pub mesh_agent: Option<Arc<agent_mesh_sdk::MeshAgent>>,
107 pub mcp_manager: Arc<RwLock<McpManager>>,
108 pub http_client: reqwest::Client,
110 pub sql_conn: Arc<Mutex<rusqlite::Connection>>,
112 pub sql_interrupt: Arc<rusqlite::InterruptHandle>,
115 pub kv_conn: Arc<Mutex<rusqlite::Connection>>,
119 pub kv_interrupt: Arc<rusqlite::InterruptHandle>,
121 pub ts_conn: Arc<Mutex<rusqlite::Connection>>,
124 #[allow(dead_code)]
127 pub ts_interrupt: Arc<rusqlite::InterruptHandle>,
128 #[allow(dead_code)]
135 pub isle: Arc<AsyncIsle>,
136 pub handler_isle: Arc<AsyncIsle>,
146 #[allow(dead_code)]
151 pub bus_tx: mpsc::Sender<Event>,
152 pub event_bus: Arc<Mutex<Option<EventBus>>>,
157}
158
159fn open_sqlite(
166 path: &Path,
167 label: &'static str,
168) -> BlockResult<(
169 Arc<Mutex<rusqlite::Connection>>,
170 Arc<rusqlite::InterruptHandle>,
171)> {
172 let is_memory = crate::bridge::config::is_memory_sql(path);
173 if !is_memory {
174 if let Some(parent) = path.parent() {
175 std::fs::create_dir_all(parent)
176 .map_err(|e| BlockError::Runtime(format!("{label} dir create: {e}")))?;
177 }
178 }
179 let conn = rusqlite::Connection::open(path)
180 .map_err(|e| BlockError::Runtime(format!("sqlite open {}: {e}", path.display())))?;
181 if !is_memory {
182 let journal = crate::bridge::config::sql_journal_mode();
183 conn.pragma_update(None, "journal_mode", &journal)
184 .map_err(|e| BlockError::Runtime(format!("journal_mode={journal}: {e}")))?;
185 }
186 let busy_ms = crate::bridge::config::sql_busy_timeout().as_millis() as i64;
187 conn.pragma_update(None, "busy_timeout", busy_ms)
188 .map_err(|e| BlockError::Runtime(format!("busy_timeout pragma: {e}")))?;
189 info!(label, path = %path.display(), busy_ms, "sqlite initialized");
190 let interrupt = Arc::new(conn.get_interrupt_handle());
191 let conn = Arc::new(Mutex::new(conn));
192 Ok((conn, interrupt))
193}
194
195fn build_isle_init(
204 script_name: String,
205 script_dir: String,
206 blocks_paths: String,
207 prompt: Option<String>,
208 context: Option<String>,
209) -> impl FnOnce(&mlua::Lua) -> mlua::Result<()> + Send + 'static {
210 move |lua| {
211 lua.globals().set("_SCRIPT_NAME", script_name.as_str())?;
213 if let Some(ref p) = prompt {
214 lua.globals().set("_PROMPT", p.as_str())?;
215 }
216 if let Some(ref c) = context {
217 lua.globals().set("_CONTEXT", c.as_str())?;
218 }
219
220 mlua_batteries::register_all(lua, "std")?;
221
222 let package: mlua::Table = lua.globals().get("package")?;
225 let current_path: String = package.get("path")?;
226 let new_path =
227 format!("{script_dir}/?.lua;{script_dir}/?/init.lua;{blocks_paths}{current_path}");
228 package.set("path", new_path)?;
229
230 let embedded: HashMap<&'static str, &'static str> =
235 EMBEDDED_BLOCKS.iter().copied().collect();
236
237 let searchers: mlua::Table = package.get("searchers")?;
238 let loader =
239 lua.create_function(move |lua, name: String| match embedded.get(name.as_str()) {
240 Some(source) => {
241 let chunk = lua
242 .load(*source)
243 .set_name(format!("@embedded:blocks/{name}/init.lua"));
244 let func = chunk.into_function()?;
245 Ok(mlua::Value::Function(func))
246 }
247 None => {
248 let msg = lua.create_string(format!("\n\tno embedded block '{name}'"))?;
249 Ok(mlua::Value::String(msg))
250 }
251 })?;
252 let next_idx = searchers.raw_len() + 1;
254 searchers.raw_set(next_idx, loader)?;
255
256 Ok(())
257 }
258}
259
260async fn spawn_handler_isle(
270 script_name: String,
271 script_dir: String,
272 blocks_paths: String,
273 prompt: Option<String>,
274 context: Option<String>,
275) -> BlockResult<(Arc<AsyncIsle>, AsyncIsleDriver)> {
276 let init = build_isle_init(script_name, script_dir, blocks_paths, prompt, context);
277 let (isle, driver) = AsyncIsle::builder()
278 .thread_name("agent-block-handler-isle")
279 .spawn(init)
280 .await
281 .map_err(|e| BlockError::Runtime(format!("handler isle spawn failed: {e}")))?;
282 info!(
283 thread_name = "agent-block-handler-isle",
284 "handler Isle spawned"
285 );
286 Ok((Arc::new(isle), driver))
287}
288
289fn hex_decode_32(s: &str) -> Result<[u8; 32], String> {
290 let s = s.trim();
291 if s.len() != 64 {
292 return Err(format!("expected 64 hex chars, got {}", s.len()));
293 }
294 let mut out = [0u8; 32];
295 for (i, byte) in out.iter_mut().enumerate() {
296 let hi = u8::from_str_radix(&s[2 * i..2 * i + 1], 16)
297 .map_err(|e| format!("invalid hex at position {}: {e}", 2 * i))?;
298 let lo = u8::from_str_radix(&s[2 * i + 1..2 * i + 2], 16)
299 .map_err(|e| format!("invalid hex at position {}: {e}", 2 * i + 1))?;
300 *byte = (hi << 4) | lo;
301 }
302 Ok(out)
303}
304
305pub async fn run(config: BlockConfig) -> BlockResult<()> {
306 let script_name = config
307 .script_path
308 .file_name()
309 .map(|n| n.to_string_lossy().to_string())
310 .unwrap_or_else(|| "unknown".to_string());
311
312 let root_span = info_span!("agent_block", script = %script_name);
313 let _root_guard = root_span.enter();
314
315 let env_path = config.project_root.join(".env");
319 match dotenvy::from_path(&env_path) {
320 Ok(()) => info!(path = %env_path.display(), ".env loaded"),
321 Err(dotenvy::Error::Io(_)) => {} Err(e) => tracing::warn!(path = %env_path.display(), error = %e, ".env parse error"),
323 }
324
325 let _init_guard = info_span!("init").entered();
327
328 let bus_capacity = crate::bridge::config::bus_capacity();
333 let (bus_tx, bus_rx) = mpsc::channel::<Event>(bus_capacity);
334 let event_bus = Arc::new(Mutex::new(Some(EventBus::new(bus_rx))));
335
336 if !config.host_handlers.is_empty() {
344 let mut guard = event_bus
345 .lock()
346 .map_err(|_| BlockError::Bus("event_bus mutex poisoned".into()))?;
347 let bus = guard
348 .as_mut()
349 .ok_or_else(|| BlockError::Bus("event_bus already taken".into()))?;
350 for (kind, handler) in &config.host_handlers {
351 bus.on(kind.clone(), Arc::clone(handler))
352 .map_err(|e| BlockError::Bus(format!("host_handlers on({kind}): {e}")))?;
353 }
354 info!(count = config.host_handlers.len(), "host handlers pre-installed");
355 }
356
357 let mesh_agent = if let Some(ref relay_url) = config.relay_url {
358 let keypair = match &config.secret_key {
359 Some(hex_str) => {
360 let bytes = hex_decode_32(hex_str)
361 .map_err(|e| BlockError::Runtime(format!("--secret-key: {e}")))?;
362 agent_mesh_core::identity::AgentKeypair::from_bytes(&bytes)
363 }
364 None => agent_mesh_core::identity::AgentKeypair::generate(),
365 };
366 info!(agent_id = %keypair.agent_id(), "mesh identity");
367 let acl = agent_mesh_core::acl::AclPolicy {
368 default_deny: false,
369 rules: vec![],
370 };
371 let handler: Arc<dyn agent_mesh_sdk::RequestHandler> =
372 Arc::new(BusRelayHandler::new(bus_tx.clone()));
373 let url = relay_url.clone();
374 let agent = agent_mesh_sdk::MeshAgent::connect(keypair, &url, acl, handler)
375 .await
376 .map_err(|e| BlockError::Mesh(format!("connect to {relay_url} failed: {e}")))?;
377 info!(relay_url = %relay_url, "mesh connected");
378 Some(Arc::new(agent))
379 } else {
380 None
381 };
382
383 let mcp_manager = Arc::new(RwLock::new(McpManager::with_rpc_timeout(
384 config.mcp_rpc_timeout,
385 )?));
386
387 let project_root = config
391 .project_root
392 .canonicalize()
393 .or_else(|_| std::env::current_dir().map(|cwd| cwd.join(&config.project_root)))?;
394
395 let http_client = reqwest::Client::new();
396
397 let sql_path = crate::bridge::config::sql_path().map_err(BlockError::Runtime)?;
400 let (sql_conn, sql_interrupt) = open_sqlite(&sql_path, "sql")?;
401
402 let kv_path = crate::bridge::config::kv_path().map_err(BlockError::Runtime)?;
403 let (kv_conn, kv_interrupt) = open_sqlite(&kv_path, "kv")?;
404
405 let ts_path = crate::bridge::config::ts_path().map_err(BlockError::Runtime)?;
406 let (ts_conn, ts_interrupt) = open_sqlite(&ts_path, "ts")?;
407
408 let script_path = config.script_path.clone();
409 let script_dir = script_path
410 .parent()
411 .map(|p| p.to_string_lossy().to_string())
412 .unwrap_or_else(|| ".".to_string());
413
414 let blocks_paths = build_blocks_path(&project_root);
420 let prompt = config.prompt.clone();
421 let context = config.context.clone();
422
423 let (isle, driver) = AsyncIsle::spawn(build_isle_init(
425 script_name.clone(),
426 script_dir.clone(),
427 blocks_paths.clone(),
428 prompt.clone(),
429 context.clone(),
430 ))
431 .await
432 .map_err(|e| BlockError::Runtime(format!("AsyncIsle spawn failed: {e}")))?;
433 let isle = Arc::new(isle);
434
435 let (handler_isle, handler_driver) = spawn_handler_isle(
437 script_name.clone(),
438 script_dir.clone(),
439 blocks_paths.clone(),
440 prompt,
441 context,
442 )
443 .await?;
444
445 {
451 let mut mgr = mcp_manager.write().await;
452 mgr.set_handler_isle(Arc::clone(&handler_isle));
453 mgr.set_main_isle(Arc::clone(&isle));
454 }
455
456 let ctx = HostContext {
461 project_root,
462 mesh_agent,
463 mcp_manager: Arc::clone(&mcp_manager),
464 http_client,
465 sql_conn,
466 sql_interrupt,
467 kv_conn,
468 kv_interrupt,
469 ts_conn,
470 ts_interrupt,
471 isle: Arc::clone(&isle),
472 handler_isle: Arc::clone(&handler_isle),
473 bus_tx: bus_tx.clone(),
474 event_bus: Arc::clone(&event_bus),
475 };
476
477 {
478 let ctx = ctx.clone();
479 isle.exec(move |lua| {
480 bridge::register_all(lua, &ctx)
481 .map_err(|e| mlua_isle::IsleError::Lua(format!("bridge register failed: {e}")))?;
482 Ok(String::new())
483 })
484 .await
485 .map_err(|e| BlockError::Runtime(format!("bridge register: {e}")))?;
486 }
487
488 {
489 let ctx = ctx.clone();
490 handler_isle
491 .exec(move |lua| {
492 bridge::register_all_handler_side(lua, &ctx).map_err(|e| {
493 mlua_isle::IsleError::Lua(format!("handler bridge register failed: {e}"))
494 })?;
495 Ok(String::new())
496 })
497 .await
498 .map_err(|e| BlockError::Runtime(format!("handler bridge register: {e}")))?;
499 }
500
501 drop(_init_guard);
502
503 {
505 let _exec_guard = info_span!("execute", script = %script_name).entered();
506
507 let script = std::fs::read_to_string(&script_path)
508 .map_err(|e| BlockError::Script(format!("{}: {e}", script_path.display())))?;
509
510 isle.coroutine_eval(&script)
511 .await
512 .map_err(|e| BlockError::Script(format!("{e}")))?;
513 }
514
515 {
517 let _shutdown_guard = info_span!("shutdown").entered();
518
519 mcp_manager.write().await.disconnect_all().await?;
520
521 driver
522 .shutdown()
523 .await
524 .map_err(|e| BlockError::Runtime(format!("AsyncIsle shutdown failed: {e}")))?;
525
526 match handler_driver.shutdown().await {
531 Ok(()) => info!(
532 thread_name = "agent-block-handler-isle",
533 "handler Isle shut down"
534 ),
535 Err(e) => tracing::error!(
536 error = %e,
537 thread_name = "agent-block-handler-isle",
538 "handler Isle shutdown failed"
539 ),
540 }
541 }
542
543 Ok(())
544}
545
546struct BusRelayHandler {
565 tx: mpsc::Sender<Event>,
566}
567
568impl BusRelayHandler {
569 fn new(tx: mpsc::Sender<Event>) -> Self {
570 Self { tx }
571 }
572}
573
574const BUS_ACK_TIMEOUT: Duration = Duration::from_secs(30);
576
577#[async_trait::async_trait]
578impl agent_mesh_sdk::RequestHandler for BusRelayHandler {
579 async fn handle(
580 &self,
581 from: &agent_mesh_core::identity::AgentId,
582 payload: &serde_json::Value,
583 _cancel: agent_mesh_sdk::CancelToken,
584 ) -> serde_json::Value {
585 let id = uuid::Uuid::new_v4().to_string();
586 let meta = serde_json::json!({"from": from.to_string()});
587 let (ack_tx, ack_rx) = oneshot::channel();
588 let event = Event {
589 kind: "mesh".into(),
590 id: id.clone(),
591 payload: payload.clone(),
592 meta,
593 ack_tx: Some(ack_tx),
594 };
595
596 if let Err(e) = self.tx.send(event).await {
597 tracing::error!(error = %e, id = %id, "bus channel closed; rejecting mesh request");
598 return serde_json::json!({"error": "bus channel closed"});
599 }
600
601 match tokio::time::timeout(BUS_ACK_TIMEOUT, ack_rx).await {
602 Ok(Ok(Ok(v))) => v,
603 Ok(Ok(Err(e))) => {
604 tracing::error!(id = %id, error = %e, "mesh handler returned error");
605 serde_json::json!({"error": e.to_string()})
606 }
607 Ok(Err(e)) => {
608 tracing::error!(id = %id, error = %e, "mesh ack receiver dropped");
609 serde_json::json!({"error": "ack dropped"})
610 }
611 Err(_) => {
612 tracing::error!(id = %id, timeout_secs = BUS_ACK_TIMEOUT.as_secs(), "mesh handler timeout");
613 serde_json::json!({"error": "handler timeout"})
614 }
615 }
616 }
617}