agent_block_core/host.rs
1//! Host — the thin Rust shell that wires up Lua VM, Mesh, HTTP, and MCP.
2//!
3//! # Responsibilities
4//!
5//! 1. Spawn an mlua-isle `AsyncIsle` (dedicated Lua VM thread with coroutine support)
6//! 2. Optionally connect to agent-mesh relay
7//! 3. Initialize the MCP manager for stdio-based MCP server connections
8//! 4. Inject all Lua stdlib bridges (`mesh.*`, `http.*`, `sh.*`, `tool.*`, `log.*`, `mcp.*`)
9//! 5. Execute the user-provided Lua script via `coroutine_eval` (async-aware)
10//! 6. Graceful shutdown (Isle + MCP servers + mesh)
11
12use std::collections::HashMap;
13use std::path::{Path, PathBuf};
14use std::sync::{Arc, Mutex};
15use std::time::Duration;
16use tokio::sync::{mpsc, oneshot, RwLock};
17
18use mlua_isle::{AsyncIsle, AsyncIsleDriver};
19use tracing::{info, info_span, warn};
20
21use crate::bridge;
22use crate::bus::{Event, EventBus, Handler};
23use agent_block_mcp::McpManager;
24use agent_block_types::error::{BlockError, BlockResult};
25use tokio_util::sync::CancellationToken;
26
27/// Embedded Lua sources for blocks/ StdPkg modules.
28/// These are baked into the binary at compile time so `cargo install` works
29/// without any extra file distribution.
30const EMBEDDED_BLOCKS: &[(&str, &str)] = &[
31 ("agent", include_str!("../blocks/agent/init.lua")),
32 ("session", include_str!("../blocks/session/init.lua")),
33];
34
35/// Build the `blocks/` portion of `package.path` from filesystem locations.
36///
37/// Priority (highest first):
38/// 1. `project_root/blocks/` — user-customisable, overrides embedded StdPkg
39/// 2. `exe_dir/blocks/` — development hot-reload (next to the binary)
40///
41/// Returns a semicolon-terminated string ready to prepend to `package.path`,
42/// or an empty string when no `blocks/` directories are found.
43fn build_blocks_path(project_root: &Path) -> String {
44 let mut out = String::new();
45
46 // 1. project_root/blocks/
47 let project_blocks = project_root.join("blocks");
48 if project_blocks.is_dir() {
49 let pb = project_blocks.to_string_lossy();
50 out.push_str(&format!("{pb}/?.lua;{pb}/?/init.lua;"));
51 }
52
53 // 2. exe_dir/blocks/
54 match std::env::current_exe() {
55 Ok(exe) => {
56 if let Some(exe_dir) = exe.parent() {
57 let exe_blocks = exe_dir.join("blocks");
58 if exe_blocks.is_dir() {
59 let eb = exe_blocks.to_string_lossy();
60 out.push_str(&format!("{eb}/?.lua;{eb}/?/init.lua;"));
61 }
62 }
63 }
64 Err(e) => {
65 warn!(error = %e, "current_exe() failed; skipping exe_dir/blocks/ from package.path");
66 }
67 }
68
69 out
70}
71
72pub struct BlockConfig {
73 pub script_path: PathBuf,
74 pub project_root: PathBuf,
75 pub relay_url: Option<String>,
76 /// Ed25519 secret key (64 hex chars). If `None`, a random keypair is
77 /// generated. Required to talk to registry/ACL-gated hosted meshes.
78 pub secret_key: Option<String>,
79 /// Per-RPC timeout for every MCP round-trip (connect / list / call).
80 /// Defaults to [`agent_block_mcp::DEFAULT_RPC_TIMEOUT`].
81 pub mcp_rpc_timeout: Duration,
82 /// Prompt string injected as `_PROMPT` Lua global. `None` = global not set.
83 pub prompt: Option<String>,
84 /// Context string injected as `_CONTEXT` Lua global. `None` = global not set.
85 pub context: Option<String>,
86 /// Host-side Rust handlers pre-installed on the EventBus before the user
87 /// script starts. Each entry registers `handler` against `kind` via
88 /// [`EventBus::on`], so a script-side `bus.emit(kind, payload)` is
89 /// captured by the Rust handler rather than dispatched to a Lua function.
90 ///
91 /// Intended for SDK consumers that embed `agent-block-core` and need to
92 /// receive script output programmatically (e.g. a Spawner adapter that
93 /// turns LLM script output into a typed `WorkerResult`). Lua-side
94 /// `bus.on(kind, fn)` registrations layered on top of the handler Isle
95 /// are still possible, but the EventBus dispatches a single handler per
96 /// `kind` (last-write-wins), so host-side and Lua-side registrations on
97 /// the same `kind` collide; choose one side per routing key.
98 ///
99 /// Defaults to an empty map (no host handlers).
100 pub host_handlers: HashMap<String, Arc<dyn Handler>>,
101 /// When `true`, the EventBus dispatcher loop is driven in the background
102 /// for the duration of the script and shut down gracefully after the
103 /// script completes. Required for SDK-embed callers that supply
104 /// [`Self::host_handlers`] and need `bus.emit(kind, payload)` events
105 /// emitted from the script to actually reach those handlers without
106 /// requiring the script to call `bus.serve()` (which blocks on
107 /// SIGTERM / Ctrl+C and never returns under programmatic embedding).
108 ///
109 /// After the script finishes, the dispatcher is given a grace window
110 /// (`AGENT_BLOCK_TASK_GRACE_MS`, default 1000ms) to drain queued events
111 /// and finish any in-flight handler, then is cancelled.
112 ///
113 /// Mutually exclusive with Lua-side `bus.serve()`: enabling this flag
114 /// takes ownership of the EventBus before the script runs, so a script
115 /// that calls `bus.on(...)` followed by `bus.serve()` will error
116 /// ("bus.serve() has already taken ownership"). Use this flag when the
117 /// script's sole purpose is to push events to host handlers.
118 ///
119 /// Defaults to `false` (legacy behavior: dispatcher only runs when the
120 /// script calls `bus.serve()`).
121 pub auto_serve_bus: bool,
122}
123
124/// Shared context passed into Lua bridge functions.
125#[derive(Clone)]
126pub struct HostContext {
127 pub project_root: PathBuf,
128 pub mesh_agent: Option<Arc<agent_mesh_sdk::MeshAgent>>,
129 pub mcp_manager: Arc<RwLock<McpManager>>,
130 /// Shared async HTTP client for `http.*` bridge.
131 pub http_client: reqwest::Client,
132 /// Shared SQLite connection for `sql.*` bridge (user tables).
133 pub sql_conn: Arc<Mutex<rusqlite::Connection>>,
134 /// Interrupt handle for the sql connection.
135 /// Used to cancel in-flight queries on timeout (see `bridge/sql.rs`).
136 pub sql_interrupt: Arc<rusqlite::InterruptHandle>,
137 /// Shared SQLite connection for `kv.*` bridge (`__kv` table only).
138 /// Separate from sql_conn so KV scratch state and user SQL data don't
139 /// share WAL, page cache, or backup lifecycle.
140 pub kv_conn: Arc<Mutex<rusqlite::Connection>>,
141 /// Interrupt handle for the kv connection.
142 pub kv_interrupt: Arc<rusqlite::InterruptHandle>,
143 /// Shared SQLite connection for `ts.*` bridge (TSDB — time-series table).
144 /// Separate DB file so TSDB WAL does not share page cache with kv/sql.
145 pub ts_conn: Arc<Mutex<rusqlite::Connection>>,
146 /// Interrupt handle for the ts connection.
147 /// Used by `bridge::ts` to cancel in-flight queries on timeout (Subtask 2).
148 #[allow(dead_code)]
149 pub ts_interrupt: Arc<rusqlite::InterruptHandle>,
150 /// Async handle to the main Isle Lua VM that runs the user script via
151 /// `coroutine_eval`. After Subtask 2, `bridge::bus` no longer dispatches
152 /// handlers against this Isle; handlers live on `handler_isle` instead.
153 /// The field is retained because bridge code still keyed to the main
154 /// Isle (future `coroutine_call` back-edges, introspection APIs) may
155 /// need it, and removing it would force another HostContext reshape.
156 #[allow(dead_code)]
157 pub isle: Arc<AsyncIsle>,
158 /// Dedicated Isle for EventBus handler execution. Lua handlers
159 /// registered via `bus.on` / `bus.on_any` run here so that CPU-bound
160 /// handler code does not occupy the main Isle's LocalSet and block
161 /// grace timers / shutdown wakers on the main VM side.
162 ///
163 /// Used by `bridge::bus` to forward handler bytecode
164 /// (`Function::dump(true)` → `handler_isle.exec(...)`) and by
165 /// [`LuaHandler::call`](crate::bridge::bus) to dispatch via
166 /// `coroutine_call("__bus_dispatch", ...)`.
167 pub handler_isle: Arc<AsyncIsle>,
168 /// Ingress sender for the EventBus. Adapters (mesh / webhook / …)
169 /// clone this and push `Event`s. The ST3 mesh adapter captures its own
170 /// clone at `MeshAgent::connect` time, so the field itself is not read
171 /// elsewhere in the ST3 cut — kept `pub` for ST4+ adapter wiring.
172 #[allow(dead_code)]
173 pub bus_tx: mpsc::Sender<Event>,
174 /// Mutex-wrapped `Option<EventBus>` so `bus.on` / `bus.on_any` can lock
175 /// briefly from sync Lua context, and `bus.serve` can `Option::take`
176 /// ownership before entering the long-lived `run()` await (avoiding the
177 /// await-holding-lock anti-pattern on a `std::sync::Mutex`).
178 pub event_bus: Arc<Mutex<Option<EventBus>>>,
179}
180
181/// Open a SQLite connection at `path` (or `:memory:`) and apply the shared
182/// pragmas driven by ENV (`journal_mode`, `busy_timeout`). Returns the
183/// connection wrapped in Arc<Mutex<_>> together with its interrupt handle.
184///
185/// `label` is used only for the init log line (`sql` / `kv`) so that the two
186/// databases are distinguishable in tracing output.
187fn open_sqlite(
188 path: &Path,
189 label: &'static str,
190) -> BlockResult<(
191 Arc<Mutex<rusqlite::Connection>>,
192 Arc<rusqlite::InterruptHandle>,
193)> {
194 let is_memory = crate::bridge::config::is_memory_sql(path);
195 if !is_memory {
196 if let Some(parent) = path.parent() {
197 std::fs::create_dir_all(parent)
198 .map_err(|e| BlockError::Runtime(format!("{label} dir create: {e}")))?;
199 }
200 }
201 let conn = rusqlite::Connection::open(path)
202 .map_err(|e| BlockError::Runtime(format!("sqlite open {}: {e}", path.display())))?;
203 if !is_memory {
204 let journal = crate::bridge::config::sql_journal_mode();
205 conn.pragma_update(None, "journal_mode", &journal)
206 .map_err(|e| BlockError::Runtime(format!("journal_mode={journal}: {e}")))?;
207 }
208 let busy_ms = crate::bridge::config::sql_busy_timeout().as_millis() as i64;
209 conn.pragma_update(None, "busy_timeout", busy_ms)
210 .map_err(|e| BlockError::Runtime(format!("busy_timeout pragma: {e}")))?;
211 info!(label, path = %path.display(), busy_ms, "sqlite initialized");
212 let interrupt = Arc::new(conn.get_interrupt_handle());
213 let conn = Arc::new(Mutex::new(conn));
214 Ok((conn, interrupt))
215}
216
217/// Build the init closure shared between the main Isle and the handler
218/// Isle. Sets `_SCRIPT_NAME`, registers `mlua-batteries` `std.*`, and
219/// configures `package.path` / `package.searchers` so `require "agent"`
220/// (and any `blocks/` module) works inside the Lua VM.
221///
222/// Returns an `FnOnce` so each call produces a fresh closure; this lets
223/// both Isles be spawned from the same config without `Clone` bounds on
224/// the captured `HashMap`.
225fn build_isle_init(
226 script_name: String,
227 script_dir: String,
228 blocks_paths: String,
229 prompt: Option<String>,
230 context: Option<String>,
231) -> impl FnOnce(&mlua::Lua) -> mlua::Result<()> + Send + 'static {
232 move |lua| {
233 // Set script name before registering bridges (used by log.* for attribution)
234 lua.globals().set("_SCRIPT_NAME", script_name.as_str())?;
235 if let Some(ref p) = prompt {
236 lua.globals().set("_PROMPT", p.as_str())?;
237 }
238 if let Some(ref c) = context {
239 lua.globals().set("_CONTEXT", c.as_str())?;
240 }
241
242 mlua_batteries::register_all(lua, "std")?;
243
244 // ── package.path ──────────────────────────────────────────────
245 // Priority: script_dir > project_root/blocks/ > exe_dir/blocks/ > default
246 let package: mlua::Table = lua.globals().get("package")?;
247 let current_path: String = package.get("path")?;
248 let new_path =
249 format!("{script_dir}/?.lua;{script_dir}/?/init.lua;{blocks_paths}{current_path}");
250 package.set("path", new_path)?;
251
252 // ── package.searchers — embedded fallback ─────────────────────
253 // Register a custom searcher that loads blocks/ modules from the
254 // sources baked in at compile time. This is the lowest-priority
255 // searcher so filesystem copies always win.
256 let embedded: HashMap<&'static str, &'static str> =
257 EMBEDDED_BLOCKS.iter().copied().collect();
258
259 let searchers: mlua::Table = package.get("searchers")?;
260 let loader =
261 lua.create_function(move |lua, name: String| match embedded.get(name.as_str()) {
262 Some(source) => {
263 let chunk = lua
264 .load(*source)
265 .set_name(format!("@embedded:blocks/{name}/init.lua"));
266 let func = chunk.into_function()?;
267 Ok(mlua::Value::Function(func))
268 }
269 None => {
270 let msg = lua.create_string(format!("\n\tno embedded block '{name}'"))?;
271 Ok(mlua::Value::String(msg))
272 }
273 })?;
274 // Append as the last searcher so filesystem paths remain preferred.
275 let next_idx = searchers.raw_len() + 1;
276 searchers.raw_set(next_idx, loader)?;
277
278 Ok(())
279 }
280}
281
282/// Spawn the dedicated handler Isle.
283///
284/// The handler Isle runs Lua bus handlers (`bus.on` / `bus.on_any`) on a
285/// separate OS thread with its own `tokio` current-thread runtime, keeping
286/// CPU-bound handlers from starving the main Isle's grace timers.
287///
288/// Bridge registration is deferred to a follow-up `exec` in `run()` because
289/// `HostContext` is not constructible until both Isles exist (the struct
290/// itself holds `Arc<AsyncIsle>` for both).
291async fn spawn_handler_isle(
292 script_name: String,
293 script_dir: String,
294 blocks_paths: String,
295 prompt: Option<String>,
296 context: Option<String>,
297) -> BlockResult<(Arc<AsyncIsle>, AsyncIsleDriver)> {
298 let init = build_isle_init(script_name, script_dir, blocks_paths, prompt, context);
299 let (isle, driver) = AsyncIsle::builder()
300 .thread_name("agent-block-handler-isle")
301 .spawn(init)
302 .await
303 .map_err(|e| BlockError::Runtime(format!("handler isle spawn failed: {e}")))?;
304 info!(
305 thread_name = "agent-block-handler-isle",
306 "handler Isle spawned"
307 );
308 Ok((Arc::new(isle), driver))
309}
310
311fn hex_decode_32(s: &str) -> Result<[u8; 32], String> {
312 let s = s.trim();
313 if s.len() != 64 {
314 return Err(format!("expected 64 hex chars, got {}", s.len()));
315 }
316 let mut out = [0u8; 32];
317 for (i, byte) in out.iter_mut().enumerate() {
318 let hi = u8::from_str_radix(&s[2 * i..2 * i + 1], 16)
319 .map_err(|e| format!("invalid hex at position {}: {e}", 2 * i))?;
320 let lo = u8::from_str_radix(&s[2 * i + 1..2 * i + 2], 16)
321 .map_err(|e| format!("invalid hex at position {}: {e}", 2 * i + 1))?;
322 *byte = (hi << 4) | lo;
323 }
324 Ok(out)
325}
326
327pub async fn run(config: BlockConfig) -> BlockResult<()> {
328 let script_name = config
329 .script_path
330 .file_name()
331 .map(|n| n.to_string_lossy().to_string())
332 .unwrap_or_else(|| "unknown".to_string());
333
334 let root_span = info_span!("agent_block", script = %script_name);
335 let _root_guard = root_span.enter();
336
337 // ── .env ──────────────────────────────────────────────────────
338 // Load .env from project_root if present. Variables are merged into
339 // the process environment so Lua's `std.env.get()` picks them up.
340 let env_path = config.project_root.join(".env");
341 match dotenvy::from_path(&env_path) {
342 Ok(()) => info!(path = %env_path.display(), ".env loaded"),
343 Err(dotenvy::Error::Io(_)) => {} // file not found — fine
344 Err(e) => tracing::warn!(path = %env_path.display(), error = %e, ".env parse error"),
345 }
346
347 // ── Init ──────────────────────────────────────────────────────
348 let _init_guard = info_span!("init").entered();
349
350 // ── EventBus channel ─────────────────────────────────────────────
351 // Construct the bounded mpsc BEFORE MeshAgent::connect so the relay
352 // handler can hold a `bus_tx` clone and forward incoming requests
353 // into the dispatcher. Capacity is ENV-driven (see bridge::config).
354 let bus_capacity = crate::bridge::config::bus_capacity();
355 let (bus_tx, bus_rx) = mpsc::channel::<Event>(bus_capacity);
356 let event_bus = Arc::new(Mutex::new(Some(EventBus::new(bus_rx))));
357
358 // ── Pre-install host-side Rust handlers ───────────────────────────
359 // SDK consumers attach Rust handlers via `BlockConfig.host_handlers`
360 // so that script-side `bus.emit(kind, payload)` is captured by a Rust
361 // `Arc<dyn Handler>` instead of being dispatched to a Lua function.
362 // Registered here (before any Lua bridge registers handlers and before
363 // `bus.serve` takes ownership of the bus) so the EventBus already
364 // carries the host handlers when the script starts.
365 if !config.host_handlers.is_empty() {
366 let mut guard = event_bus
367 .lock()
368 .map_err(|_| BlockError::Bus("event_bus mutex poisoned".into()))?;
369 let bus = guard
370 .as_mut()
371 .ok_or_else(|| BlockError::Bus("event_bus already taken".into()))?;
372 for (kind, handler) in &config.host_handlers {
373 bus.on(kind.clone(), Arc::clone(handler))
374 .map_err(|e| BlockError::Bus(format!("host_handlers on({kind}): {e}")))?;
375 }
376 info!(
377 count = config.host_handlers.len(),
378 "host handlers pre-installed"
379 );
380 }
381
382 // ── auto-serve: background dispatcher for SDK-embed callers ───────
383 // When `auto_serve_bus` is on and host handlers are installed, take the
384 // EventBus out of the Mutex *before* the script runs and spawn the
385 // dispatcher loop on the runtime. This lets `bus.emit(kind, payload)`
386 // from the script reach the host handler without requiring the script
387 // to call `bus.serve()` (which blocks on signals and never returns
388 // under programmatic embedding).
389 let auto_serve = config.auto_serve_bus && !config.host_handlers.is_empty();
390 let auto_serve_state: Option<(tokio::task::JoinHandle<()>, CancellationToken)> = if auto_serve {
391 let bus = {
392 let mut guard = event_bus
393 .lock()
394 .map_err(|_| BlockError::Bus("event_bus mutex poisoned".into()))?;
395 guard
396 .take()
397 .ok_or_else(|| BlockError::Bus("event_bus already taken".into()))?
398 };
399 let token = CancellationToken::new();
400 let token_for_task = token.clone();
401 let handle = tokio::spawn(async move {
402 let mut bus = bus;
403 if let Err(e) = bus.run(token_for_task).await {
404 tracing::error!(error = %e, "auto-serve: dispatcher loop returned error");
405 }
406 });
407 info!("auto-serve: dispatcher spawned");
408 Some((handle, token))
409 } else {
410 None
411 };
412
413 let mesh_agent = if let Some(ref relay_url) = config.relay_url {
414 let keypair = match &config.secret_key {
415 Some(hex_str) => {
416 let bytes = hex_decode_32(hex_str)
417 .map_err(|e| BlockError::Runtime(format!("--secret-key: {e}")))?;
418 agent_mesh_core::identity::AgentKeypair::from_bytes(&bytes)
419 }
420 None => agent_mesh_core::identity::AgentKeypair::generate(),
421 };
422 info!(agent_id = %keypair.agent_id(), "mesh identity");
423 let acl = agent_mesh_core::acl::AclPolicy {
424 default_deny: false,
425 rules: vec![],
426 };
427 let handler: Arc<dyn agent_mesh_sdk::RequestHandler> =
428 Arc::new(BusRelayHandler::new(bus_tx.clone()));
429 let url = relay_url.clone();
430 let agent = agent_mesh_sdk::MeshAgent::connect(keypair, &url, acl, handler)
431 .await
432 .map_err(|e| BlockError::Mesh(format!("connect to {relay_url} failed: {e}")))?;
433 info!(relay_url = %relay_url, "mesh connected");
434 Some(Arc::new(agent))
435 } else {
436 None
437 };
438
439 let mcp_manager = Arc::new(RwLock::new(McpManager::with_rpc_timeout(
440 config.mcp_rpc_timeout,
441 )?));
442
443 // Resolve project_root to absolute path.
444 // canonicalize() can fail if the path doesn't exist; fall back to
445 // joining with current_dir to guarantee an absolute path.
446 let project_root = config
447 .project_root
448 .canonicalize()
449 .or_else(|_| std::env::current_dir().map(|cwd| cwd.join(&config.project_root)))?;
450
451 let http_client = reqwest::Client::new();
452
453 // ── SQLite init (kv + sql get separate DB files) ──────────────────────
454 // All knobs are ENV-driven (see `bridge/config.rs`).
455 let sql_path = crate::bridge::config::sql_path().map_err(BlockError::Runtime)?;
456 let (sql_conn, sql_interrupt) = open_sqlite(&sql_path, "sql")?;
457
458 let kv_path = crate::bridge::config::kv_path().map_err(BlockError::Runtime)?;
459 let (kv_conn, kv_interrupt) = open_sqlite(&kv_path, "kv")?;
460
461 let ts_path = crate::bridge::config::ts_path().map_err(BlockError::Runtime)?;
462 let (ts_conn, ts_interrupt) = open_sqlite(&ts_path, "ts")?;
463
464 let script_path = config.script_path.clone();
465 let script_dir = script_path
466 .parent()
467 .map(|p| p.to_string_lossy().to_string())
468 .unwrap_or_else(|| ".".to_string());
469
470 // Precompute values captured by the init closure so we don't need to
471 // move the full `HostContext` into it (HostContext now holds
472 // `Arc<AsyncIsle>`, which is available only after `AsyncIsle::spawn`
473 // returns — classic chicken-and-egg). All bridge registrations run in a
474 // second pass via `isle.exec` below.
475 let blocks_paths = build_blocks_path(&project_root);
476 let prompt = config.prompt.clone();
477 let context = config.context.clone();
478
479 // ── main Isle ─────────────────────────────────────────────────
480 let (isle, driver) = AsyncIsle::spawn(build_isle_init(
481 script_name.clone(),
482 script_dir.clone(),
483 blocks_paths.clone(),
484 prompt.clone(),
485 context.clone(),
486 ))
487 .await
488 .map_err(|e| BlockError::Runtime(format!("AsyncIsle spawn failed: {e}")))?;
489 let isle = Arc::new(isle);
490
491 // ── handler Isle (sequential, dependencies are trivial) ────────
492 let (handler_isle, handler_driver) = spawn_handler_isle(
493 script_name.clone(),
494 script_dir.clone(),
495 blocks_paths.clone(),
496 prompt,
497 context,
498 )
499 .await?;
500
501 // Wire both Isles into McpManager so Lua notification callbacks can be
502 // dispatched from the rmcp task thread.
503 // - handler_isle: sampling/createMessage dispatch (exec on handler Isle)
504 // - main_isle: progress/log notification dispatch (exec on main Isle so
505 // user callback upvalues are preserved — no bytecode dump/reload needed)
506 {
507 let mut mgr = mcp_manager.write().await;
508 mgr.set_handler_isle(Arc::clone(&handler_isle));
509 mgr.set_main_isle(Arc::clone(&isle));
510 }
511
512 // ── HostContext + bridge registration ──────────────────────────────
513 // Wrap the isle in an Arc so `HostContext` can hand it to
514 // `bridge::bus` (which uses `AsyncIsle::coroutine_call` to invoke Lua
515 // handlers from the EventBus dispatcher task).
516 let ctx = HostContext {
517 project_root,
518 mesh_agent,
519 mcp_manager: Arc::clone(&mcp_manager),
520 http_client,
521 sql_conn,
522 sql_interrupt,
523 kv_conn,
524 kv_interrupt,
525 ts_conn,
526 ts_interrupt,
527 isle: Arc::clone(&isle),
528 handler_isle: Arc::clone(&handler_isle),
529 bus_tx: bus_tx.clone(),
530 event_bus: Arc::clone(&event_bus),
531 };
532
533 {
534 let ctx = ctx.clone();
535 isle.exec(move |lua| {
536 bridge::register_all(lua, &ctx)
537 .map_err(|e| mlua_isle::IsleError::Lua(format!("bridge register failed: {e}")))?;
538 Ok(String::new())
539 })
540 .await
541 .map_err(|e| BlockError::Runtime(format!("bridge register: {e}")))?;
542 }
543
544 {
545 let ctx = ctx.clone();
546 handler_isle
547 .exec(move |lua| {
548 bridge::register_all_handler_side(lua, &ctx).map_err(|e| {
549 mlua_isle::IsleError::Lua(format!("handler bridge register failed: {e}"))
550 })?;
551 Ok(String::new())
552 })
553 .await
554 .map_err(|e| BlockError::Runtime(format!("handler bridge register: {e}")))?;
555 }
556
557 drop(_init_guard);
558
559 // ── Execute ───────────────────────────────────────────────────
560 {
561 let _exec_guard = info_span!("execute", script = %script_name).entered();
562
563 let script = std::fs::read_to_string(&script_path)
564 .map_err(|e| BlockError::Script(format!("{}: {e}", script_path.display())))?;
565
566 isle.coroutine_eval(&script)
567 .await
568 .map_err(|e| BlockError::Script(format!("{e}")))?;
569 }
570
571 // ── auto-serve drain + cancel ─────────────────────────────────
572 // Let the dispatcher drain events queued by the script, then signal
573 // shutdown and bound the join. Mirrors `bus.serve`'s grace pattern.
574 if let Some((handle, token)) = auto_serve_state {
575 let grace_ms = crate::bridge::config::task_grace_ms();
576 let grace = Duration::from_millis(grace_ms);
577 tokio::time::sleep(grace).await;
578 token.cancel();
579 match tokio::time::timeout(grace, handle).await {
580 Ok(Ok(())) => info!("auto-serve: dispatcher shut down cleanly"),
581 Ok(Err(join_err)) => {
582 tracing::error!(error = %join_err, "auto-serve: dispatcher task join error");
583 }
584 Err(_) => {
585 tracing::warn!(
586 grace_ms,
587 "auto-serve: dispatcher join timed out after cancel; forcing exit"
588 );
589 }
590 }
591 }
592
593 // ── Shutdown ──────────────────────────────────────────────────
594 {
595 let _shutdown_guard = info_span!("shutdown").entered();
596
597 mcp_manager.write().await.disconnect_all().await?;
598
599 driver
600 .shutdown()
601 .await
602 .map_err(|e| BlockError::Runtime(format!("AsyncIsle shutdown failed: {e}")))?;
603
604 // Handler Isle shutdown is independent of main shutdown: a failure
605 // here (e.g. ThreadPanic on the handler thread) is logged but does
606 // not poison the main process exit. The main Isle has already
607 // been stopped cleanly above.
608 match handler_driver.shutdown().await {
609 Ok(()) => info!(
610 thread_name = "agent-block-handler-isle",
611 "handler Isle shut down"
612 ),
613 Err(e) => tracing::error!(
614 error = %e,
615 thread_name = "agent-block-handler-isle",
616 "handler Isle shutdown failed"
617 ),
618 }
619 }
620
621 Ok(())
622}
623
624/// mesh → bus source adapter.
625///
626/// Implements [`agent_mesh_sdk::RequestHandler`] by packaging every incoming
627/// mesh request into an [`Event`] with `kind = "mesh"`, pushing it onto the
628/// bounded `bus_tx` channel, and awaiting the Lua handler's ack over a
629/// oneshot channel carried inside the event.
630///
631/// Error paths (all `tracing::error!`-logged — silent-err-drop policy):
632///
633/// | Failure | Return value |
634/// |---------------------------|----------------------------------------|
635/// | `bus_tx.send` closed/full | `{"error": "bus channel closed"}` |
636/// | ack receiver dropped | `{"error": "ack dropped"}` |
637/// | Lua handler `BlockError` | `{"error": "<handler error>"}` |
638/// | Handler exceeded 30s | `{"error": "handler timeout"}` |
639///
640/// The 30s ack timeout mirrors the client-side timeout on `mesh.request`
641/// (see `src/bridge/mesh.rs`).
642struct BusRelayHandler {
643 tx: mpsc::Sender<Event>,
644}
645
646impl BusRelayHandler {
647 fn new(tx: mpsc::Sender<Event>) -> Self {
648 Self { tx }
649 }
650}
651
652/// Bound used for both the mesh-adapter ack wait and other source timeouts.
653const BUS_ACK_TIMEOUT: Duration = Duration::from_secs(30);
654
655#[async_trait::async_trait]
656impl agent_mesh_sdk::RequestHandler for BusRelayHandler {
657 async fn handle(
658 &self,
659 from: &agent_mesh_core::identity::AgentId,
660 payload: &serde_json::Value,
661 _cancel: agent_mesh_sdk::CancelToken,
662 ) -> serde_json::Value {
663 let id = uuid::Uuid::new_v4().to_string();
664 let meta = serde_json::json!({"from": from.to_string()});
665 let (ack_tx, ack_rx) = oneshot::channel();
666 let event = Event {
667 kind: "mesh".into(),
668 id: id.clone(),
669 payload: payload.clone(),
670 meta,
671 ack_tx: Some(ack_tx),
672 };
673
674 if let Err(e) = self.tx.send(event).await {
675 tracing::error!(error = %e, id = %id, "bus channel closed; rejecting mesh request");
676 return serde_json::json!({"error": "bus channel closed"});
677 }
678
679 match tokio::time::timeout(BUS_ACK_TIMEOUT, ack_rx).await {
680 Ok(Ok(Ok(v))) => v,
681 Ok(Ok(Err(e))) => {
682 tracing::error!(id = %id, error = %e, "mesh handler returned error");
683 serde_json::json!({"error": e.to_string()})
684 }
685 Ok(Err(e)) => {
686 tracing::error!(id = %id, error = %e, "mesh ack receiver dropped");
687 serde_json::json!({"error": "ack dropped"})
688 }
689 Err(_) => {
690 tracing::error!(id = %id, timeout_secs = BUS_ACK_TIMEOUT.as_secs(), "mesh handler timeout");
691 serde_json::json!({"error": "handler timeout"})
692 }
693 }
694 }
695}