agent_block_core/host.rs
1//! Host — the thin Rust shell that wires up Lua VM, Mesh, HTTP, and MCP.
2//!
3//! # Responsibilities
4//!
5//! 1. Spawn an mlua-isle `AsyncIsle` (dedicated Lua VM thread with coroutine support)
6//! 2. Optionally connect to agent-mesh relay
7//! 3. Initialize the MCP manager for stdio-based MCP server connections
8//! 4. Inject all Lua stdlib bridges (`mesh.*`, `http.*`, `sh.*`, `tool.*`, `log.*`, `mcp.*`)
9//! 5. Execute the user-provided Lua script via `coroutine_eval` (async-aware)
10//! 6. Graceful shutdown (Isle + MCP servers + mesh)
11
12use std::collections::HashMap;
13use std::path::{Path, PathBuf};
14use std::sync::{Arc, Mutex};
15use std::time::Duration;
16use tokio::sync::{mpsc, oneshot, RwLock};
17
18use mlua_isle::{AsyncIsle, AsyncIsleDriver};
19use tracing::{info, info_span, warn};
20
21use crate::bridge;
22use crate::bus::{Event, EventBus, Handler};
23use agent_block_mcp::McpManager;
24use agent_block_types::error::{BlockError, BlockResult};
25use tokio_util::sync::CancellationToken;
26
27/// Embedded Lua sources for blocks/ StdPkg modules.
28/// These are baked into the binary at compile time so `cargo install` works
29/// without any extra file distribution.
30const EMBEDDED_BLOCKS: &[(&str, &str)] = &[
31 ("agent", include_str!("../blocks/agent/init.lua")),
32 ("session", include_str!("../blocks/session/init.lua")),
33];
34
35/// Build the `blocks/` portion of `package.path` from filesystem locations.
36///
37/// Priority (highest first):
38/// 1. `project_root/blocks/` — user-customisable, overrides embedded StdPkg
39/// 2. `exe_dir/blocks/` — development hot-reload (next to the binary)
40///
41/// Returns a semicolon-terminated string ready to prepend to `package.path`,
42/// or an empty string when no `blocks/` directories are found.
43fn build_blocks_path(project_root: &Path) -> String {
44 let mut out = String::new();
45
46 // 1. project_root/blocks/
47 let project_blocks = project_root.join("blocks");
48 if project_blocks.is_dir() {
49 let pb = project_blocks.to_string_lossy();
50 out.push_str(&format!("{pb}/?.lua;{pb}/?/init.lua;"));
51 }
52
53 // 2. exe_dir/blocks/
54 match std::env::current_exe() {
55 Ok(exe) => {
56 if let Some(exe_dir) = exe.parent() {
57 let exe_blocks = exe_dir.join("blocks");
58 if exe_blocks.is_dir() {
59 let eb = exe_blocks.to_string_lossy();
60 out.push_str(&format!("{eb}/?.lua;{eb}/?/init.lua;"));
61 }
62 }
63 }
64 Err(e) => {
65 warn!(error = %e, "current_exe() failed; skipping exe_dir/blocks/ from package.path");
66 }
67 }
68
69 out
70}
71
72pub struct BlockConfig {
73 pub script_path: PathBuf,
74 pub project_root: PathBuf,
75 pub relay_url: Option<String>,
76 /// Ed25519 secret key (64 hex chars). If `None`, a random keypair is
77 /// generated. Required to talk to registry/ACL-gated hosted meshes.
78 pub secret_key: Option<String>,
79 /// Per-RPC timeout for every MCP round-trip (connect / list / call).
80 /// Defaults to [`agent_block_mcp::DEFAULT_RPC_TIMEOUT`].
81 pub mcp_rpc_timeout: Duration,
82 /// Prompt string injected as `_PROMPT` Lua global. `None` = global not set.
83 pub prompt: Option<String>,
84 /// Context string injected as `_CONTEXT` Lua global. `None` = global not set.
85 pub context: Option<String>,
86 /// Host-side Rust handlers pre-installed on the EventBus before the user
87 /// script starts. Each entry registers `handler` against `kind` via
88 /// [`EventBus::on`], so a script-side `bus.emit(kind, payload)` is
89 /// captured by the Rust handler rather than dispatched to a Lua function.
90 ///
91 /// Intended for SDK consumers that embed `agent-block-core` and need to
92 /// receive script output programmatically (e.g. a Spawner adapter that
93 /// turns LLM script output into a typed `WorkerResult`). Lua-side
94 /// `bus.on(kind, fn)` registrations layered on top of the handler Isle
95 /// are still possible, but the EventBus dispatches a single handler per
96 /// `kind` (last-write-wins), so host-side and Lua-side registrations on
97 /// the same `kind` collide; choose one side per routing key.
98 ///
99 /// Defaults to an empty map (no host handlers).
100 pub host_handlers: HashMap<String, Arc<dyn Handler>>,
101 /// When `true`, the EventBus dispatcher loop is driven in the background
102 /// for the duration of the script and shut down gracefully after the
103 /// script completes. Required for SDK-embed callers that supply
104 /// [`Self::host_handlers`] and need `bus.emit(kind, payload)` events
105 /// emitted from the script to actually reach those handlers without
106 /// requiring the script to call `bus.serve()` (which blocks on
107 /// SIGTERM / Ctrl+C and never returns under programmatic embedding).
108 ///
109 /// After the script finishes, the dispatcher is given a grace window
110 /// (`AGENT_BLOCK_TASK_GRACE_MS`, default 1000ms) to drain queued events
111 /// and finish any in-flight handler, then is cancelled.
112 ///
113 /// Mutually exclusive with Lua-side `bus.serve()`: enabling this flag
114 /// takes ownership of the EventBus before the script runs, so a script
115 /// that calls `bus.on(...)` followed by `bus.serve()` will error
116 /// ("bus.serve() has already taken ownership"). Use this flag when the
117 /// script's sole purpose is to push events to host handlers.
118 ///
119 /// Defaults to `false` (legacy behavior: dispatcher only runs when the
120 /// script calls `bus.serve()`).
121 pub auto_serve_bus: bool,
122 /// Optional caller-supplied cancellation token. When cancelled, the
123 /// in-flight script is interrupted via the Isle's debug-hook cancel
124 /// path, the auto-serve dispatcher (if any) is shut down, and `run()`
125 /// returns `Err(BlockError::Cancelled)`.
126 ///
127 /// Intended for SDK consumers that spawn `run()` as a tokio task and
128 /// need an out-of-band abort signal (timeouts, parent-task cancellation
129 /// propagation, user-driven stop). The token is observed across the
130 /// `coroutine_eval` await; once cancellation propagates, the shutdown
131 /// sequence (MCP disconnect, Isle drivers, auto-serve dispatcher)
132 /// still runs so file descriptors and remote handles are released.
133 ///
134 /// Defaults to `None` (legacy behavior: `run()` only completes when
135 /// the script returns naturally).
136 pub shutdown_token: Option<CancellationToken>,
137}
138
139/// Shared context passed into Lua bridge functions.
140#[derive(Clone)]
141pub struct HostContext {
142 pub project_root: PathBuf,
143 pub mesh_agent: Option<Arc<agent_mesh_sdk::MeshAgent>>,
144 pub mcp_manager: Arc<RwLock<McpManager>>,
145 /// Shared async HTTP client for `http.*` bridge.
146 pub http_client: reqwest::Client,
147 /// Shared SQLite connection for `sql.*` bridge (user tables).
148 pub sql_conn: Arc<Mutex<rusqlite::Connection>>,
149 /// Interrupt handle for the sql connection.
150 /// Used to cancel in-flight queries on timeout (see `bridge/sql.rs`).
151 pub sql_interrupt: Arc<rusqlite::InterruptHandle>,
152 /// Shared SQLite connection for `kv.*` bridge (`__kv` table only).
153 /// Separate from sql_conn so KV scratch state and user SQL data don't
154 /// share WAL, page cache, or backup lifecycle.
155 pub kv_conn: Arc<Mutex<rusqlite::Connection>>,
156 /// Interrupt handle for the kv connection.
157 pub kv_interrupt: Arc<rusqlite::InterruptHandle>,
158 /// Shared SQLite connection for `ts.*` bridge (TSDB — time-series table).
159 /// Separate DB file so TSDB WAL does not share page cache with kv/sql.
160 pub ts_conn: Arc<Mutex<rusqlite::Connection>>,
161 /// Interrupt handle for the ts connection.
162 /// Used by `bridge::ts` to cancel in-flight queries on timeout (Subtask 2).
163 #[allow(dead_code)]
164 pub ts_interrupt: Arc<rusqlite::InterruptHandle>,
165 /// Async handle to the main Isle Lua VM that runs the user script via
166 /// `coroutine_eval`. After Subtask 2, `bridge::bus` no longer dispatches
167 /// handlers against this Isle; handlers live on `handler_isle` instead.
168 /// The field is retained because bridge code still keyed to the main
169 /// Isle (future `coroutine_call` back-edges, introspection APIs) may
170 /// need it, and removing it would force another HostContext reshape.
171 #[allow(dead_code)]
172 pub isle: Arc<AsyncIsle>,
173 /// Dedicated Isle for EventBus handler execution. Lua handlers
174 /// registered via `bus.on` / `bus.on_any` run here so that CPU-bound
175 /// handler code does not occupy the main Isle's LocalSet and block
176 /// grace timers / shutdown wakers on the main VM side.
177 ///
178 /// Used by `bridge::bus` to forward handler bytecode
179 /// (`Function::dump(true)` → `handler_isle.exec(...)`) and by
180 /// [`LuaHandler::call`](crate::bridge::bus) to dispatch via
181 /// `coroutine_call("__bus_dispatch", ...)`.
182 pub handler_isle: Arc<AsyncIsle>,
183 /// Ingress sender for the EventBus. Adapters (mesh / webhook / …)
184 /// clone this and push `Event`s. The ST3 mesh adapter captures its own
185 /// clone at `MeshAgent::connect` time, so the field itself is not read
186 /// elsewhere in the ST3 cut — kept `pub` for ST4+ adapter wiring.
187 #[allow(dead_code)]
188 pub bus_tx: mpsc::Sender<Event>,
189 /// Mutex-wrapped `Option<EventBus>` so `bus.on` / `bus.on_any` can lock
190 /// briefly from sync Lua context, and `bus.serve` can `Option::take`
191 /// ownership before entering the long-lived `run()` await (avoiding the
192 /// await-holding-lock anti-pattern on a `std::sync::Mutex`).
193 pub event_bus: Arc<Mutex<Option<EventBus>>>,
194}
195
196/// Open a SQLite connection at `path` (or `:memory:`) and apply the shared
197/// pragmas driven by ENV (`journal_mode`, `busy_timeout`). Returns the
198/// connection wrapped in Arc<Mutex<_>> together with its interrupt handle.
199///
200/// `label` is used only for the init log line (`sql` / `kv`) so that the two
201/// databases are distinguishable in tracing output.
202fn open_sqlite(
203 path: &Path,
204 label: &'static str,
205) -> BlockResult<(
206 Arc<Mutex<rusqlite::Connection>>,
207 Arc<rusqlite::InterruptHandle>,
208)> {
209 let is_memory = crate::bridge::config::is_memory_sql(path);
210 if !is_memory {
211 if let Some(parent) = path.parent() {
212 std::fs::create_dir_all(parent)
213 .map_err(|e| BlockError::Runtime(format!("{label} dir create: {e}")))?;
214 }
215 }
216 let conn = rusqlite::Connection::open(path)
217 .map_err(|e| BlockError::Runtime(format!("sqlite open {}: {e}", path.display())))?;
218 if !is_memory {
219 let journal = crate::bridge::config::sql_journal_mode();
220 conn.pragma_update(None, "journal_mode", &journal)
221 .map_err(|e| BlockError::Runtime(format!("journal_mode={journal}: {e}")))?;
222 }
223 let busy_ms = crate::bridge::config::sql_busy_timeout().as_millis() as i64;
224 conn.pragma_update(None, "busy_timeout", busy_ms)
225 .map_err(|e| BlockError::Runtime(format!("busy_timeout pragma: {e}")))?;
226 info!(label, path = %path.display(), busy_ms, "sqlite initialized");
227 let interrupt = Arc::new(conn.get_interrupt_handle());
228 let conn = Arc::new(Mutex::new(conn));
229 Ok((conn, interrupt))
230}
231
232/// Build the init closure shared between the main Isle and the handler
233/// Isle. Sets `_SCRIPT_NAME`, registers `mlua-batteries` `std.*`, and
234/// configures `package.path` / `package.searchers` so `require "agent"`
235/// (and any `blocks/` module) works inside the Lua VM.
236///
237/// Returns an `FnOnce` so each call produces a fresh closure; this lets
238/// both Isles be spawned from the same config without `Clone` bounds on
239/// the captured `HashMap`.
240fn build_isle_init(
241 script_name: String,
242 script_dir: String,
243 blocks_paths: String,
244 prompt: Option<String>,
245 context: Option<String>,
246) -> impl FnOnce(&mlua::Lua) -> mlua::Result<()> + Send + 'static {
247 move |lua| {
248 // Set script name before registering bridges (used by log.* for attribution)
249 lua.globals().set("_SCRIPT_NAME", script_name.as_str())?;
250 if let Some(ref p) = prompt {
251 lua.globals().set("_PROMPT", p.as_str())?;
252 }
253 if let Some(ref c) = context {
254 lua.globals().set("_CONTEXT", c.as_str())?;
255 }
256
257 mlua_batteries::register_all(lua, "std")?;
258
259 // ── package.path ──────────────────────────────────────────────
260 // Priority: script_dir > project_root/blocks/ > exe_dir/blocks/ > default
261 let package: mlua::Table = lua.globals().get("package")?;
262 let current_path: String = package.get("path")?;
263 let new_path =
264 format!("{script_dir}/?.lua;{script_dir}/?/init.lua;{blocks_paths}{current_path}");
265 package.set("path", new_path)?;
266
267 // ── package.searchers — embedded fallback ─────────────────────
268 // Register a custom searcher that loads blocks/ modules from the
269 // sources baked in at compile time. This is the lowest-priority
270 // searcher so filesystem copies always win.
271 let embedded: HashMap<&'static str, &'static str> =
272 EMBEDDED_BLOCKS.iter().copied().collect();
273
274 let searchers: mlua::Table = package.get("searchers")?;
275 let loader =
276 lua.create_function(move |lua, name: String| match embedded.get(name.as_str()) {
277 Some(source) => {
278 let chunk = lua
279 .load(*source)
280 .set_name(format!("@embedded:blocks/{name}/init.lua"));
281 let func = chunk.into_function()?;
282 Ok(mlua::Value::Function(func))
283 }
284 None => {
285 let msg = lua.create_string(format!("\n\tno embedded block '{name}'"))?;
286 Ok(mlua::Value::String(msg))
287 }
288 })?;
289 // Append as the last searcher so filesystem paths remain preferred.
290 let next_idx = searchers.raw_len() + 1;
291 searchers.raw_set(next_idx, loader)?;
292
293 Ok(())
294 }
295}
296
297/// Spawn the dedicated handler Isle.
298///
299/// The handler Isle runs Lua bus handlers (`bus.on` / `bus.on_any`) on a
300/// separate OS thread with its own `tokio` current-thread runtime, keeping
301/// CPU-bound handlers from starving the main Isle's grace timers.
302///
303/// Bridge registration is deferred to a follow-up `exec` in `run()` because
304/// `HostContext` is not constructible until both Isles exist (the struct
305/// itself holds `Arc<AsyncIsle>` for both).
306async fn spawn_handler_isle(
307 script_name: String,
308 script_dir: String,
309 blocks_paths: String,
310 prompt: Option<String>,
311 context: Option<String>,
312) -> BlockResult<(Arc<AsyncIsle>, AsyncIsleDriver)> {
313 let init = build_isle_init(script_name, script_dir, blocks_paths, prompt, context);
314 let (isle, driver) = AsyncIsle::builder()
315 .thread_name("agent-block-handler-isle")
316 .spawn(init)
317 .await
318 .map_err(|e| BlockError::Runtime(format!("handler isle spawn failed: {e}")))?;
319 info!(
320 thread_name = "agent-block-handler-isle",
321 "handler Isle spawned"
322 );
323 Ok((Arc::new(isle), driver))
324}
325
326fn hex_decode_32(s: &str) -> Result<[u8; 32], String> {
327 let s = s.trim();
328 if s.len() != 64 {
329 return Err(format!("expected 64 hex chars, got {}", s.len()));
330 }
331 let mut out = [0u8; 32];
332 for (i, byte) in out.iter_mut().enumerate() {
333 let hi = u8::from_str_radix(&s[2 * i..2 * i + 1], 16)
334 .map_err(|e| format!("invalid hex at position {}: {e}", 2 * i))?;
335 let lo = u8::from_str_radix(&s[2 * i + 1..2 * i + 2], 16)
336 .map_err(|e| format!("invalid hex at position {}: {e}", 2 * i + 1))?;
337 *byte = (hi << 4) | lo;
338 }
339 Ok(out)
340}
341
342pub async fn run(config: BlockConfig) -> BlockResult<()> {
343 let script_name = config
344 .script_path
345 .file_name()
346 .map(|n| n.to_string_lossy().to_string())
347 .unwrap_or_else(|| "unknown".to_string());
348
349 // NOTE: We previously held entered span guards across awaits for nested
350 // span context. That made the `run()` future `!Send`, which prevents
351 // SDK consumers from `tokio::spawn(run(config))`. Span context is
352 // attached to events via fields on the `info_span!` calls below; the
353 // missing nesting is an acceptable trade-off for `Send` correctness.
354 let _root_span = info_span!("agent_block", script = %script_name);
355
356 // ── .env ──────────────────────────────────────────────────────
357 // Load .env from project_root if present. Variables are merged into
358 // the process environment so Lua's `std.env.get()` picks them up.
359 let env_path = config.project_root.join(".env");
360 match dotenvy::from_path(&env_path) {
361 Ok(()) => info!(path = %env_path.display(), ".env loaded"),
362 Err(dotenvy::Error::Io(_)) => {} // file not found — fine
363 Err(e) => tracing::warn!(path = %env_path.display(), error = %e, ".env parse error"),
364 }
365
366 // ── Init ──────────────────────────────────────────────────────
367 let _init_span = info_span!("init");
368
369 // ── EventBus channel ─────────────────────────────────────────────
370 // Construct the bounded mpsc BEFORE MeshAgent::connect so the relay
371 // handler can hold a `bus_tx` clone and forward incoming requests
372 // into the dispatcher. Capacity is ENV-driven (see bridge::config).
373 let bus_capacity = crate::bridge::config::bus_capacity();
374 let (bus_tx, bus_rx) = mpsc::channel::<Event>(bus_capacity);
375 let event_bus = Arc::new(Mutex::new(Some(EventBus::new(bus_rx))));
376
377 // ── Pre-install host-side Rust handlers ───────────────────────────
378 // SDK consumers attach Rust handlers via `BlockConfig.host_handlers`
379 // so that script-side `bus.emit(kind, payload)` is captured by a Rust
380 // `Arc<dyn Handler>` instead of being dispatched to a Lua function.
381 // Registered here (before any Lua bridge registers handlers and before
382 // `bus.serve` takes ownership of the bus) so the EventBus already
383 // carries the host handlers when the script starts.
384 if !config.host_handlers.is_empty() {
385 let mut guard = event_bus
386 .lock()
387 .map_err(|_| BlockError::Bus("event_bus mutex poisoned".into()))?;
388 let bus = guard
389 .as_mut()
390 .ok_or_else(|| BlockError::Bus("event_bus already taken".into()))?;
391 for (kind, handler) in &config.host_handlers {
392 bus.on(kind.clone(), Arc::clone(handler))
393 .map_err(|e| BlockError::Bus(format!("host_handlers on({kind}): {e}")))?;
394 }
395 info!(
396 count = config.host_handlers.len(),
397 "host handlers pre-installed"
398 );
399 }
400
401 // ── auto-serve: background dispatcher for SDK-embed callers ───────
402 // When `auto_serve_bus` is on and host handlers are installed, take the
403 // EventBus out of the Mutex *before* the script runs and spawn the
404 // dispatcher loop on the runtime. This lets `bus.emit(kind, payload)`
405 // from the script reach the host handler without requiring the script
406 // to call `bus.serve()` (which blocks on signals and never returns
407 // under programmatic embedding).
408 let auto_serve = config.auto_serve_bus && !config.host_handlers.is_empty();
409 let auto_serve_state: Option<(tokio::task::JoinHandle<()>, CancellationToken)> = if auto_serve {
410 let bus = {
411 let mut guard = event_bus
412 .lock()
413 .map_err(|_| BlockError::Bus("event_bus mutex poisoned".into()))?;
414 guard
415 .take()
416 .ok_or_else(|| BlockError::Bus("event_bus already taken".into()))?
417 };
418 let token = CancellationToken::new();
419 let token_for_task = token.clone();
420 let handle = tokio::spawn(async move {
421 let mut bus = bus;
422 if let Err(e) = bus.run(token_for_task).await {
423 tracing::error!(error = %e, "auto-serve: dispatcher loop returned error");
424 }
425 });
426 info!("auto-serve: dispatcher spawned");
427 Some((handle, token))
428 } else {
429 None
430 };
431
432 let mesh_agent = if let Some(ref relay_url) = config.relay_url {
433 let keypair = match &config.secret_key {
434 Some(hex_str) => {
435 let bytes = hex_decode_32(hex_str)
436 .map_err(|e| BlockError::Runtime(format!("--secret-key: {e}")))?;
437 agent_mesh_core::identity::AgentKeypair::from_bytes(&bytes)
438 }
439 None => agent_mesh_core::identity::AgentKeypair::generate(),
440 };
441 info!(agent_id = %keypair.agent_id(), "mesh identity");
442 let acl = agent_mesh_core::acl::AclPolicy {
443 default_deny: false,
444 rules: vec![],
445 };
446 let handler: Arc<dyn agent_mesh_sdk::RequestHandler> =
447 Arc::new(BusRelayHandler::new(bus_tx.clone()));
448 let url = relay_url.clone();
449 let agent = agent_mesh_sdk::MeshAgent::connect(keypair, &url, acl, handler)
450 .await
451 .map_err(|e| BlockError::Mesh(format!("connect to {relay_url} failed: {e}")))?;
452 info!(relay_url = %relay_url, "mesh connected");
453 Some(Arc::new(agent))
454 } else {
455 None
456 };
457
458 let mcp_manager = Arc::new(RwLock::new(McpManager::with_rpc_timeout(
459 config.mcp_rpc_timeout,
460 )?));
461
462 // Resolve project_root to absolute path.
463 // canonicalize() can fail if the path doesn't exist; fall back to
464 // joining with current_dir to guarantee an absolute path.
465 let project_root = config
466 .project_root
467 .canonicalize()
468 .or_else(|_| std::env::current_dir().map(|cwd| cwd.join(&config.project_root)))?;
469
470 let http_client = reqwest::Client::new();
471
472 // ── SQLite init (kv + sql get separate DB files) ──────────────────────
473 // All knobs are ENV-driven (see `bridge/config.rs`).
474 let sql_path = crate::bridge::config::sql_path().map_err(BlockError::Runtime)?;
475 let (sql_conn, sql_interrupt) = open_sqlite(&sql_path, "sql")?;
476
477 let kv_path = crate::bridge::config::kv_path().map_err(BlockError::Runtime)?;
478 let (kv_conn, kv_interrupt) = open_sqlite(&kv_path, "kv")?;
479
480 let ts_path = crate::bridge::config::ts_path().map_err(BlockError::Runtime)?;
481 let (ts_conn, ts_interrupt) = open_sqlite(&ts_path, "ts")?;
482
483 let script_path = config.script_path.clone();
484 let script_dir = script_path
485 .parent()
486 .map(|p| p.to_string_lossy().to_string())
487 .unwrap_or_else(|| ".".to_string());
488
489 // Precompute values captured by the init closure so we don't need to
490 // move the full `HostContext` into it (HostContext now holds
491 // `Arc<AsyncIsle>`, which is available only after `AsyncIsle::spawn`
492 // returns — classic chicken-and-egg). All bridge registrations run in a
493 // second pass via `isle.exec` below.
494 let blocks_paths = build_blocks_path(&project_root);
495 let prompt = config.prompt.clone();
496 let context = config.context.clone();
497
498 // ── main Isle ─────────────────────────────────────────────────
499 let (isle, driver) = AsyncIsle::spawn(build_isle_init(
500 script_name.clone(),
501 script_dir.clone(),
502 blocks_paths.clone(),
503 prompt.clone(),
504 context.clone(),
505 ))
506 .await
507 .map_err(|e| BlockError::Runtime(format!("AsyncIsle spawn failed: {e}")))?;
508 let isle = Arc::new(isle);
509
510 // ── handler Isle (sequential, dependencies are trivial) ────────
511 let (handler_isle, handler_driver) = spawn_handler_isle(
512 script_name.clone(),
513 script_dir.clone(),
514 blocks_paths.clone(),
515 prompt,
516 context,
517 )
518 .await?;
519
520 // Wire both Isles into McpManager so Lua notification callbacks can be
521 // dispatched from the rmcp task thread.
522 // - handler_isle: sampling/createMessage dispatch (exec on handler Isle)
523 // - main_isle: progress/log notification dispatch (exec on main Isle so
524 // user callback upvalues are preserved — no bytecode dump/reload needed)
525 {
526 let mut mgr = mcp_manager.write().await;
527 mgr.set_handler_isle(Arc::clone(&handler_isle));
528 mgr.set_main_isle(Arc::clone(&isle));
529 }
530
531 // ── HostContext + bridge registration ──────────────────────────────
532 // Wrap the isle in an Arc so `HostContext` can hand it to
533 // `bridge::bus` (which uses `AsyncIsle::coroutine_call` to invoke Lua
534 // handlers from the EventBus dispatcher task).
535 let ctx = HostContext {
536 project_root,
537 mesh_agent,
538 mcp_manager: Arc::clone(&mcp_manager),
539 http_client,
540 sql_conn,
541 sql_interrupt,
542 kv_conn,
543 kv_interrupt,
544 ts_conn,
545 ts_interrupt,
546 isle: Arc::clone(&isle),
547 handler_isle: Arc::clone(&handler_isle),
548 bus_tx: bus_tx.clone(),
549 event_bus: Arc::clone(&event_bus),
550 };
551
552 {
553 let ctx = ctx.clone();
554 isle.exec(move |lua| {
555 bridge::register_all(lua, &ctx)
556 .map_err(|e| mlua_isle::IsleError::Lua(format!("bridge register failed: {e}")))?;
557 Ok(String::new())
558 })
559 .await
560 .map_err(|e| BlockError::Runtime(format!("bridge register: {e}")))?;
561 }
562
563 {
564 let ctx = ctx.clone();
565 handler_isle
566 .exec(move |lua| {
567 bridge::register_all_handler_side(lua, &ctx).map_err(|e| {
568 mlua_isle::IsleError::Lua(format!("handler bridge register failed: {e}"))
569 })?;
570 Ok(String::new())
571 })
572 .await
573 .map_err(|e| BlockError::Runtime(format!("handler bridge register: {e}")))?;
574 }
575
576 drop(_init_span);
577
578 // ── Execute ───────────────────────────────────────────────────
579 // When `shutdown_token` is supplied, race the script future against
580 // the caller's cancellation signal. On cancel, propagate to the Isle
581 // via the AsyncTask's cancel token so the debug hook unwinds the Lua
582 // VM, then continue into the shutdown sequence below (we still want
583 // to release MCP/mesh handles and join the auto-serve dispatcher
584 // before returning).
585 let script_result: Result<(), BlockError> = {
586 let _exec_span = info_span!("execute", script = %script_name);
587
588 let script = std::fs::read_to_string(&script_path)
589 .map_err(|e| BlockError::Script(format!("{}: {e}", script_path.display())))?;
590
591 let mut task = isle.spawn_coroutine_eval(&script);
592 let task_cancel = task.cancel_token().clone();
593 match config.shutdown_token.as_ref() {
594 Some(token) => {
595 tokio::select! {
596 biased;
597 _ = token.cancelled() => {
598 task_cancel.cancel();
599 // Wait for the Isle to unwind so the VM is in a
600 // consistent state before driver shutdown. The
601 // debug hook fires at the next HOOK_INTERVAL.
602 let _ = (&mut task).await;
603 info!("shutdown_token: cancelled by caller");
604 Err(BlockError::Cancelled)
605 }
606 res = &mut task => res.map(|_| ()).map_err(|e| BlockError::Script(format!("{e}"))),
607 }
608 }
609 None => (&mut task)
610 .await
611 .map(|_| ())
612 .map_err(|e| BlockError::Script(format!("{e}"))),
613 }
614 };
615
616 // ── auto-serve drain + cancel ─────────────────────────────────
617 // Let the dispatcher drain events queued by the script, then signal
618 // shutdown and bound the join. Mirrors `bus.serve`'s grace pattern.
619 if let Some((handle, token)) = auto_serve_state {
620 let grace_ms = crate::bridge::config::task_grace_ms();
621 let grace = Duration::from_millis(grace_ms);
622 tokio::time::sleep(grace).await;
623 token.cancel();
624 match tokio::time::timeout(grace, handle).await {
625 Ok(Ok(())) => info!("auto-serve: dispatcher shut down cleanly"),
626 Ok(Err(join_err)) => {
627 tracing::error!(error = %join_err, "auto-serve: dispatcher task join error");
628 }
629 Err(_) => {
630 tracing::warn!(
631 grace_ms,
632 "auto-serve: dispatcher join timed out after cancel; forcing exit"
633 );
634 }
635 }
636 }
637
638 // ── Shutdown ──────────────────────────────────────────────────
639 {
640 let _shutdown_span = info_span!("shutdown");
641
642 mcp_manager.write().await.disconnect_all().await?;
643
644 driver
645 .shutdown()
646 .await
647 .map_err(|e| BlockError::Runtime(format!("AsyncIsle shutdown failed: {e}")))?;
648
649 // Handler Isle shutdown is independent of main shutdown: a failure
650 // here (e.g. ThreadPanic on the handler thread) is logged but does
651 // not poison the main process exit. The main Isle has already
652 // been stopped cleanly above.
653 match handler_driver.shutdown().await {
654 Ok(()) => info!(
655 thread_name = "agent-block-handler-isle",
656 "handler Isle shut down"
657 ),
658 Err(e) => tracing::error!(
659 error = %e,
660 thread_name = "agent-block-handler-isle",
661 "handler Isle shutdown failed"
662 ),
663 }
664 }
665
666 script_result
667}
668
669/// mesh → bus source adapter.
670///
671/// Implements [`agent_mesh_sdk::RequestHandler`] by packaging every incoming
672/// mesh request into an [`Event`] with `kind = "mesh"`, pushing it onto the
673/// bounded `bus_tx` channel, and awaiting the Lua handler's ack over a
674/// oneshot channel carried inside the event.
675///
676/// Error paths (all `tracing::error!`-logged — silent-err-drop policy):
677///
678/// | Failure | Return value |
679/// |---------------------------|----------------------------------------|
680/// | `bus_tx.send` closed/full | `{"error": "bus channel closed"}` |
681/// | ack receiver dropped | `{"error": "ack dropped"}` |
682/// | Lua handler `BlockError` | `{"error": "<handler error>"}` |
683/// | Handler exceeded 30s | `{"error": "handler timeout"}` |
684///
685/// The 30s ack timeout mirrors the client-side timeout on `mesh.request`
686/// (see `src/bridge/mesh.rs`).
687struct BusRelayHandler {
688 tx: mpsc::Sender<Event>,
689}
690
691impl BusRelayHandler {
692 fn new(tx: mpsc::Sender<Event>) -> Self {
693 Self { tx }
694 }
695}
696
697/// Bound used for both the mesh-adapter ack wait and other source timeouts.
698const BUS_ACK_TIMEOUT: Duration = Duration::from_secs(30);
699
700#[async_trait::async_trait]
701impl agent_mesh_sdk::RequestHandler for BusRelayHandler {
702 async fn handle(
703 &self,
704 from: &agent_mesh_core::identity::AgentId,
705 payload: &serde_json::Value,
706 _cancel: agent_mesh_sdk::CancelToken,
707 ) -> serde_json::Value {
708 let id = uuid::Uuid::new_v4().to_string();
709 let meta = serde_json::json!({"from": from.to_string()});
710 let (ack_tx, ack_rx) = oneshot::channel();
711 let event = Event {
712 kind: "mesh".into(),
713 id: id.clone(),
714 payload: payload.clone(),
715 meta,
716 ack_tx: Some(ack_tx),
717 };
718
719 if let Err(e) = self.tx.send(event).await {
720 tracing::error!(error = %e, id = %id, "bus channel closed; rejecting mesh request");
721 return serde_json::json!({"error": "bus channel closed"});
722 }
723
724 match tokio::time::timeout(BUS_ACK_TIMEOUT, ack_rx).await {
725 Ok(Ok(Ok(v))) => v,
726 Ok(Ok(Err(e))) => {
727 tracing::error!(id = %id, error = %e, "mesh handler returned error");
728 serde_json::json!({"error": e.to_string()})
729 }
730 Ok(Err(e)) => {
731 tracing::error!(id = %id, error = %e, "mesh ack receiver dropped");
732 serde_json::json!({"error": "ack dropped"})
733 }
734 Err(_) => {
735 tracing::error!(id = %id, timeout_secs = BUS_ACK_TIMEOUT.as_secs(), "mesh handler timeout");
736 serde_json::json!({"error": "handler timeout"})
737 }
738 }
739 }
740}