Skip to main content

agent_block_core/
host.rs

1//! Host — the thin Rust shell that wires up Lua VM, Mesh, HTTP, and MCP.
2//!
3//! # Responsibilities
4//!
5//! 1. Spawn an mlua-isle `AsyncIsle` (dedicated Lua VM thread with coroutine support)
6//! 2. Optionally connect to agent-mesh relay
7//! 3. Initialize the MCP manager for stdio-based MCP server connections
8//! 4. Inject all Lua stdlib bridges (`mesh.*`, `http.*`, `sh.*`, `tool.*`, `log.*`, `mcp.*`)
9//! 5. Execute the user-provided Lua script via `coroutine_eval` (async-aware)
10//! 6. Graceful shutdown (Isle + MCP servers + mesh)
11
12use std::collections::HashMap;
13use std::path::{Path, PathBuf};
14use std::sync::{Arc, Mutex};
15use std::time::Duration;
16use tokio::sync::{mpsc, oneshot, RwLock};
17
18use mlua_isle::{AsyncIsle, AsyncIsleDriver};
19use tracing::{info, info_span, warn};
20
21use crate::bridge;
22use crate::bus::{Event, EventBus, Handler};
23use agent_block_mcp::McpManager;
24use agent_block_types::error::{BlockError, BlockResult};
25use tokio_util::sync::CancellationToken;
26
27/// Embedded Lua sources for blocks/ StdPkg modules.
28/// These are baked into the binary at compile time so `cargo install` works
29/// without any extra file distribution.
30const EMBEDDED_BLOCKS: &[(&str, &str)] = &[
31    ("agent", include_str!("../blocks/agent/init.lua")),
32    ("session", include_str!("../blocks/session/init.lua")),
33];
34
35/// Build the `blocks/` portion of `package.path` from filesystem locations.
36///
37/// Priority (highest first):
38/// 1. `project_root/blocks/` — user-customisable, overrides embedded StdPkg
39/// 2. `exe_dir/blocks/`      — development hot-reload (next to the binary)
40///
41/// Returns a semicolon-terminated string ready to prepend to `package.path`,
42/// or an empty string when no `blocks/` directories are found.
43fn build_blocks_path(project_root: &Path) -> String {
44    let mut out = String::new();
45
46    // 1. project_root/blocks/
47    let project_blocks = project_root.join("blocks");
48    if project_blocks.is_dir() {
49        let pb = project_blocks.to_string_lossy();
50        out.push_str(&format!("{pb}/?.lua;{pb}/?/init.lua;"));
51    }
52
53    // 2. exe_dir/blocks/
54    match std::env::current_exe() {
55        Ok(exe) => {
56            if let Some(exe_dir) = exe.parent() {
57                let exe_blocks = exe_dir.join("blocks");
58                if exe_blocks.is_dir() {
59                    let eb = exe_blocks.to_string_lossy();
60                    out.push_str(&format!("{eb}/?.lua;{eb}/?/init.lua;"));
61                }
62            }
63        }
64        Err(e) => {
65            warn!(error = %e, "current_exe() failed; skipping exe_dir/blocks/ from package.path");
66        }
67    }
68
69    out
70}
71
72pub struct BlockConfig {
73    pub script_path: PathBuf,
74    pub project_root: PathBuf,
75    pub relay_url: Option<String>,
76    /// Ed25519 secret key (64 hex chars). If `None`, a random keypair is
77    /// generated. Required to talk to registry/ACL-gated hosted meshes.
78    pub secret_key: Option<String>,
79    /// Per-RPC timeout for every MCP round-trip (connect / list / call).
80    /// Defaults to [`agent_block_mcp::DEFAULT_RPC_TIMEOUT`].
81    pub mcp_rpc_timeout: Duration,
82    /// Prompt string injected as `_PROMPT` Lua global. `None` = global not set.
83    pub prompt: Option<String>,
84    /// Context string injected as `_CONTEXT` Lua global. `None` = global not set.
85    pub context: Option<String>,
86    /// Host-side Rust handlers pre-installed on the EventBus before the user
87    /// script starts. Each entry registers `handler` against `kind` via
88    /// [`EventBus::on`], so a script-side `bus.emit(kind, payload)` is
89    /// captured by the Rust handler rather than dispatched to a Lua function.
90    ///
91    /// Intended for SDK consumers that embed `agent-block-core` and need to
92    /// receive script output programmatically (e.g. a Spawner adapter that
93    /// turns LLM script output into a typed `WorkerResult`). Lua-side
94    /// `bus.on(kind, fn)` registrations layered on top of the handler Isle
95    /// are still possible, but the EventBus dispatches a single handler per
96    /// `kind` (last-write-wins), so host-side and Lua-side registrations on
97    /// the same `kind` collide; choose one side per routing key.
98    ///
99    /// Defaults to an empty map (no host handlers).
100    pub host_handlers: HashMap<String, Arc<dyn Handler>>,
101    /// When `true`, the EventBus dispatcher loop is driven in the background
102    /// for the duration of the script and shut down gracefully after the
103    /// script completes. Required for SDK-embed callers that supply
104    /// [`Self::host_handlers`] and need `bus.emit(kind, payload)` events
105    /// emitted from the script to actually reach those handlers without
106    /// requiring the script to call `bus.serve()` (which blocks on
107    /// SIGTERM / Ctrl+C and never returns under programmatic embedding).
108    ///
109    /// After the script finishes, the dispatcher is given a grace window
110    /// (`AGENT_BLOCK_TASK_GRACE_MS`, default 1000ms) to drain queued events
111    /// and finish any in-flight handler, then is cancelled.
112    ///
113    /// Mutually exclusive with Lua-side `bus.serve()`: enabling this flag
114    /// takes ownership of the EventBus before the script runs, so a script
115    /// that calls `bus.on(...)` followed by `bus.serve()` will error
116    /// ("bus.serve() has already taken ownership"). Use this flag when the
117    /// script's sole purpose is to push events to host handlers.
118    ///
119    /// Defaults to `false` (legacy behavior: dispatcher only runs when the
120    /// script calls `bus.serve()`).
121    pub auto_serve_bus: bool,
122    /// Optional caller-supplied cancellation token. When cancelled, the
123    /// in-flight script is interrupted via the Isle's debug-hook cancel
124    /// path, the auto-serve dispatcher (if any) is shut down, and `run()`
125    /// returns `Err(BlockError::Cancelled)`.
126    ///
127    /// Intended for SDK consumers that spawn `run()` as a tokio task and
128    /// need an out-of-band abort signal (timeouts, parent-task cancellation
129    /// propagation, user-driven stop). The token is observed across the
130    /// `coroutine_eval` await; once cancellation propagates, the shutdown
131    /// sequence (MCP disconnect, Isle drivers, auto-serve dispatcher)
132    /// still runs so file descriptors and remote handles are released.
133    ///
134    /// Defaults to `None` (legacy behavior: `run()` only completes when
135    /// the script returns naturally).
136    pub shutdown_token: Option<CancellationToken>,
137}
138
139/// Shared context passed into Lua bridge functions.
140#[derive(Clone)]
141pub struct HostContext {
142    pub project_root: PathBuf,
143    pub mesh_agent: Option<Arc<agent_mesh_sdk::MeshAgent>>,
144    pub mcp_manager: Arc<RwLock<McpManager>>,
145    /// Shared async HTTP client for `http.*` bridge.
146    pub http_client: reqwest::Client,
147    /// Shared SQLite connection for `sql.*` bridge (user tables).
148    pub sql_conn: Arc<Mutex<rusqlite::Connection>>,
149    /// Interrupt handle for the sql connection.
150    /// Used to cancel in-flight queries on timeout (see `bridge/sql.rs`).
151    pub sql_interrupt: Arc<rusqlite::InterruptHandle>,
152    /// Shared SQLite connection for `kv.*` bridge (`__kv` table only).
153    /// Separate from sql_conn so KV scratch state and user SQL data don't
154    /// share WAL, page cache, or backup lifecycle.
155    pub kv_conn: Arc<Mutex<rusqlite::Connection>>,
156    /// Interrupt handle for the kv connection.
157    pub kv_interrupt: Arc<rusqlite::InterruptHandle>,
158    /// Shared SQLite connection for `ts.*` bridge (TSDB — time-series table).
159    /// Separate DB file so TSDB WAL does not share page cache with kv/sql.
160    pub ts_conn: Arc<Mutex<rusqlite::Connection>>,
161    /// Interrupt handle for the ts connection.
162    /// Used by `bridge::ts` to cancel in-flight queries on timeout (Subtask 2).
163    #[allow(dead_code)]
164    pub ts_interrupt: Arc<rusqlite::InterruptHandle>,
165    /// Async handle to the main Isle Lua VM that runs the user script via
166    /// `coroutine_eval`. After Subtask 2, `bridge::bus` no longer dispatches
167    /// handlers against this Isle; handlers live on `handler_isle` instead.
168    /// The field is retained because bridge code still keyed to the main
169    /// Isle (future `coroutine_call` back-edges, introspection APIs) may
170    /// need it, and removing it would force another HostContext reshape.
171    #[allow(dead_code)]
172    pub isle: Arc<AsyncIsle>,
173    /// Dedicated Isle for EventBus handler execution. Lua handlers
174    /// registered via `bus.on` / `bus.on_any` run here so that CPU-bound
175    /// handler code does not occupy the main Isle's LocalSet and block
176    /// grace timers / shutdown wakers on the main VM side.
177    ///
178    /// Used by `bridge::bus` to forward handler bytecode
179    /// (`Function::dump(true)` → `handler_isle.exec(...)`) and by
180    /// [`LuaHandler::call`](crate::bridge::bus) to dispatch via
181    /// `coroutine_call("__bus_dispatch", ...)`.
182    pub handler_isle: Arc<AsyncIsle>,
183    /// Ingress sender for the EventBus. Adapters (mesh / webhook / …)
184    /// clone this and push `Event`s. The ST3 mesh adapter captures its own
185    /// clone at `MeshAgent::connect` time, so the field itself is not read
186    /// elsewhere in the ST3 cut — kept `pub` for ST4+ adapter wiring.
187    #[allow(dead_code)]
188    pub bus_tx: mpsc::Sender<Event>,
189    /// Mutex-wrapped `Option<EventBus>` so `bus.on` / `bus.on_any` can lock
190    /// briefly from sync Lua context, and `bus.serve` can `Option::take`
191    /// ownership before entering the long-lived `run()` await (avoiding the
192    /// await-holding-lock anti-pattern on a `std::sync::Mutex`).
193    pub event_bus: Arc<Mutex<Option<EventBus>>>,
194}
195
196/// Open a SQLite connection at `path` (or `:memory:`) and apply the shared
197/// pragmas driven by ENV (`journal_mode`, `busy_timeout`). Returns the
198/// connection wrapped in Arc<Mutex<_>> together with its interrupt handle.
199///
200/// `label` is used only for the init log line (`sql` / `kv`) so that the two
201/// databases are distinguishable in tracing output.
202fn open_sqlite(
203    path: &Path,
204    label: &'static str,
205) -> BlockResult<(
206    Arc<Mutex<rusqlite::Connection>>,
207    Arc<rusqlite::InterruptHandle>,
208)> {
209    let is_memory = crate::bridge::config::is_memory_sql(path);
210    if !is_memory {
211        if let Some(parent) = path.parent() {
212            std::fs::create_dir_all(parent)
213                .map_err(|e| BlockError::Runtime(format!("{label} dir create: {e}")))?;
214        }
215    }
216    let conn = rusqlite::Connection::open(path)
217        .map_err(|e| BlockError::Runtime(format!("sqlite open {}: {e}", path.display())))?;
218    if !is_memory {
219        let journal = crate::bridge::config::sql_journal_mode();
220        conn.pragma_update(None, "journal_mode", &journal)
221            .map_err(|e| BlockError::Runtime(format!("journal_mode={journal}: {e}")))?;
222    }
223    let busy_ms = crate::bridge::config::sql_busy_timeout().as_millis() as i64;
224    conn.pragma_update(None, "busy_timeout", busy_ms)
225        .map_err(|e| BlockError::Runtime(format!("busy_timeout pragma: {e}")))?;
226    info!(label, path = %path.display(), busy_ms, "sqlite initialized");
227    let interrupt = Arc::new(conn.get_interrupt_handle());
228    let conn = Arc::new(Mutex::new(conn));
229    Ok((conn, interrupt))
230}
231
232/// Build the init closure shared between the main Isle and the handler
233/// Isle.  Sets `_SCRIPT_NAME`, registers `mlua-batteries` `std.*`, and
234/// configures `package.path` / `package.searchers` so `require "agent"`
235/// (and any `blocks/` module) works inside the Lua VM.
236///
237/// Returns an `FnOnce` so each call produces a fresh closure; this lets
238/// both Isles be spawned from the same config without `Clone` bounds on
239/// the captured `HashMap`.
240fn build_isle_init(
241    script_name: String,
242    script_dir: String,
243    blocks_paths: String,
244    prompt: Option<String>,
245    context: Option<String>,
246) -> impl FnOnce(&mlua::Lua) -> mlua::Result<()> + Send + 'static {
247    move |lua| {
248        // Set script name before registering bridges (used by log.* for attribution)
249        lua.globals().set("_SCRIPT_NAME", script_name.as_str())?;
250        if let Some(ref p) = prompt {
251            lua.globals().set("_PROMPT", p.as_str())?;
252        }
253        if let Some(ref c) = context {
254            lua.globals().set("_CONTEXT", c.as_str())?;
255        }
256
257        mlua_batteries::register_all(lua, "std")?;
258
259        // ── package.path ──────────────────────────────────────────────
260        // Priority: script_dir > project_root/blocks/ > exe_dir/blocks/ > default
261        let package: mlua::Table = lua.globals().get("package")?;
262        let current_path: String = package.get("path")?;
263        let new_path =
264            format!("{script_dir}/?.lua;{script_dir}/?/init.lua;{blocks_paths}{current_path}");
265        package.set("path", new_path)?;
266
267        // ── package.searchers — embedded fallback ─────────────────────
268        // Register a custom searcher that loads blocks/ modules from the
269        // sources baked in at compile time.  This is the lowest-priority
270        // searcher so filesystem copies always win.
271        let embedded: HashMap<&'static str, &'static str> =
272            EMBEDDED_BLOCKS.iter().copied().collect();
273
274        let searchers: mlua::Table = package.get("searchers")?;
275        let loader =
276            lua.create_function(move |lua, name: String| match embedded.get(name.as_str()) {
277                Some(source) => {
278                    let chunk = lua
279                        .load(*source)
280                        .set_name(format!("@embedded:blocks/{name}/init.lua"));
281                    let func = chunk.into_function()?;
282                    Ok(mlua::Value::Function(func))
283                }
284                None => {
285                    let msg = lua.create_string(format!("\n\tno embedded block '{name}'"))?;
286                    Ok(mlua::Value::String(msg))
287                }
288            })?;
289        // Append as the last searcher so filesystem paths remain preferred.
290        let next_idx = searchers.raw_len() + 1;
291        searchers.raw_set(next_idx, loader)?;
292
293        Ok(())
294    }
295}
296
297/// Spawn the dedicated handler Isle.
298///
299/// The handler Isle runs Lua bus handlers (`bus.on` / `bus.on_any`) on a
300/// separate OS thread with its own `tokio` current-thread runtime, keeping
301/// CPU-bound handlers from starving the main Isle's grace timers.
302///
303/// Bridge registration is deferred to a follow-up `exec` in `run()` because
304/// `HostContext` is not constructible until both Isles exist (the struct
305/// itself holds `Arc<AsyncIsle>` for both).
306async fn spawn_handler_isle(
307    script_name: String,
308    script_dir: String,
309    blocks_paths: String,
310    prompt: Option<String>,
311    context: Option<String>,
312) -> BlockResult<(Arc<AsyncIsle>, AsyncIsleDriver)> {
313    let init = build_isle_init(script_name, script_dir, blocks_paths, prompt, context);
314    let (isle, driver) = AsyncIsle::builder()
315        .thread_name("agent-block-handler-isle")
316        .spawn(init)
317        .await
318        .map_err(|e| BlockError::Runtime(format!("handler isle spawn failed: {e}")))?;
319    info!(
320        thread_name = "agent-block-handler-isle",
321        "handler Isle spawned"
322    );
323    Ok((Arc::new(isle), driver))
324}
325
326fn hex_decode_32(s: &str) -> Result<[u8; 32], String> {
327    let s = s.trim();
328    if s.len() != 64 {
329        return Err(format!("expected 64 hex chars, got {}", s.len()));
330    }
331    let mut out = [0u8; 32];
332    for (i, byte) in out.iter_mut().enumerate() {
333        let hi = u8::from_str_radix(&s[2 * i..2 * i + 1], 16)
334            .map_err(|e| format!("invalid hex at position {}: {e}", 2 * i))?;
335        let lo = u8::from_str_radix(&s[2 * i + 1..2 * i + 2], 16)
336            .map_err(|e| format!("invalid hex at position {}: {e}", 2 * i + 1))?;
337        *byte = (hi << 4) | lo;
338    }
339    Ok(out)
340}
341
342pub async fn run(config: BlockConfig) -> BlockResult<()> {
343    let script_name = config
344        .script_path
345        .file_name()
346        .map(|n| n.to_string_lossy().to_string())
347        .unwrap_or_else(|| "unknown".to_string());
348
349    // NOTE: We previously held entered span guards across awaits for nested
350    // span context. That made the `run()` future `!Send`, which prevents
351    // SDK consumers from `tokio::spawn(run(config))`. Span context is
352    // attached to events via fields on the `info_span!` calls below; the
353    // missing nesting is an acceptable trade-off for `Send` correctness.
354    let _root_span = info_span!("agent_block", script = %script_name);
355
356    // ── .env ──────────────────────────────────────────────────────
357    // Load .env from project_root if present. Variables are merged into
358    // the process environment so Lua's `std.env.get()` picks them up.
359    let env_path = config.project_root.join(".env");
360    match dotenvy::from_path(&env_path) {
361        Ok(()) => info!(path = %env_path.display(), ".env loaded"),
362        Err(dotenvy::Error::Io(_)) => {} // file not found — fine
363        Err(e) => tracing::warn!(path = %env_path.display(), error = %e, ".env parse error"),
364    }
365
366    // ── Init ──────────────────────────────────────────────────────
367    let _init_span = info_span!("init");
368
369    // ── EventBus channel ─────────────────────────────────────────────
370    // Construct the bounded mpsc BEFORE MeshAgent::connect so the relay
371    // handler can hold a `bus_tx` clone and forward incoming requests
372    // into the dispatcher. Capacity is ENV-driven (see bridge::config).
373    let bus_capacity = crate::bridge::config::bus_capacity();
374    let (bus_tx, bus_rx) = mpsc::channel::<Event>(bus_capacity);
375    let event_bus = Arc::new(Mutex::new(Some(EventBus::new(bus_rx))));
376
377    // ── Pre-install host-side Rust handlers ───────────────────────────
378    // SDK consumers attach Rust handlers via `BlockConfig.host_handlers`
379    // so that script-side `bus.emit(kind, payload)` is captured by a Rust
380    // `Arc<dyn Handler>` instead of being dispatched to a Lua function.
381    // Registered here (before any Lua bridge registers handlers and before
382    // `bus.serve` takes ownership of the bus) so the EventBus already
383    // carries the host handlers when the script starts.
384    if !config.host_handlers.is_empty() {
385        let mut guard = event_bus
386            .lock()
387            .map_err(|_| BlockError::Bus("event_bus mutex poisoned".into()))?;
388        let bus = guard
389            .as_mut()
390            .ok_or_else(|| BlockError::Bus("event_bus already taken".into()))?;
391        for (kind, handler) in &config.host_handlers {
392            bus.on(kind.clone(), Arc::clone(handler))
393                .map_err(|e| BlockError::Bus(format!("host_handlers on({kind}): {e}")))?;
394        }
395        info!(
396            count = config.host_handlers.len(),
397            "host handlers pre-installed"
398        );
399    }
400
401    // ── auto-serve: background dispatcher for SDK-embed callers ───────
402    // When `auto_serve_bus` is on and host handlers are installed, take the
403    // EventBus out of the Mutex *before* the script runs and spawn the
404    // dispatcher loop on the runtime. This lets `bus.emit(kind, payload)`
405    // from the script reach the host handler without requiring the script
406    // to call `bus.serve()` (which blocks on signals and never returns
407    // under programmatic embedding).
408    let auto_serve = config.auto_serve_bus && !config.host_handlers.is_empty();
409    let auto_serve_state: Option<(tokio::task::JoinHandle<()>, CancellationToken)> = if auto_serve {
410        let bus = {
411            let mut guard = event_bus
412                .lock()
413                .map_err(|_| BlockError::Bus("event_bus mutex poisoned".into()))?;
414            guard
415                .take()
416                .ok_or_else(|| BlockError::Bus("event_bus already taken".into()))?
417        };
418        let token = CancellationToken::new();
419        let token_for_task = token.clone();
420        let handle = tokio::spawn(async move {
421            let mut bus = bus;
422            if let Err(e) = bus.run(token_for_task).await {
423                tracing::error!(error = %e, "auto-serve: dispatcher loop returned error");
424            }
425        });
426        info!("auto-serve: dispatcher spawned");
427        Some((handle, token))
428    } else {
429        None
430    };
431
432    let mesh_agent = if let Some(ref relay_url) = config.relay_url {
433        let keypair = match &config.secret_key {
434            Some(hex_str) => {
435                let bytes = hex_decode_32(hex_str)
436                    .map_err(|e| BlockError::Runtime(format!("--secret-key: {e}")))?;
437                agent_mesh_core::identity::AgentKeypair::from_bytes(&bytes)
438            }
439            None => agent_mesh_core::identity::AgentKeypair::generate(),
440        };
441        info!(agent_id = %keypair.agent_id(), "mesh identity");
442        let acl = agent_mesh_core::acl::AclPolicy {
443            default_deny: false,
444            rules: vec![],
445        };
446        let handler: Arc<dyn agent_mesh_sdk::RequestHandler> =
447            Arc::new(BusRelayHandler::new(bus_tx.clone()));
448        let url = relay_url.clone();
449        let agent = agent_mesh_sdk::MeshAgent::connect(keypair, &url, acl, handler)
450            .await
451            .map_err(|e| BlockError::Mesh(format!("connect to {relay_url} failed: {e}")))?;
452        info!(relay_url = %relay_url, "mesh connected");
453        Some(Arc::new(agent))
454    } else {
455        None
456    };
457
458    let mcp_manager = Arc::new(RwLock::new(McpManager::with_rpc_timeout(
459        config.mcp_rpc_timeout,
460    )?));
461
462    // Resolve project_root to absolute path.
463    // canonicalize() can fail if the path doesn't exist; fall back to
464    // joining with current_dir to guarantee an absolute path.
465    let project_root = config
466        .project_root
467        .canonicalize()
468        .or_else(|_| std::env::current_dir().map(|cwd| cwd.join(&config.project_root)))?;
469
470    let http_client = reqwest::Client::new();
471
472    // ── SQLite init (kv + sql get separate DB files) ──────────────────────
473    // All knobs are ENV-driven (see `bridge/config.rs`).
474    let sql_path = crate::bridge::config::sql_path().map_err(BlockError::Runtime)?;
475    let (sql_conn, sql_interrupt) = open_sqlite(&sql_path, "sql")?;
476
477    let kv_path = crate::bridge::config::kv_path().map_err(BlockError::Runtime)?;
478    let (kv_conn, kv_interrupt) = open_sqlite(&kv_path, "kv")?;
479
480    let ts_path = crate::bridge::config::ts_path().map_err(BlockError::Runtime)?;
481    let (ts_conn, ts_interrupt) = open_sqlite(&ts_path, "ts")?;
482
483    let script_path = config.script_path.clone();
484    let script_dir = script_path
485        .parent()
486        .map(|p| p.to_string_lossy().to_string())
487        .unwrap_or_else(|| ".".to_string());
488
489    // Precompute values captured by the init closure so we don't need to
490    // move the full `HostContext` into it (HostContext now holds
491    // `Arc<AsyncIsle>`, which is available only after `AsyncIsle::spawn`
492    // returns — classic chicken-and-egg). All bridge registrations run in a
493    // second pass via `isle.exec` below.
494    let blocks_paths = build_blocks_path(&project_root);
495    let prompt = config.prompt.clone();
496    let context = config.context.clone();
497
498    // ── main Isle ─────────────────────────────────────────────────
499    let (isle, driver) = AsyncIsle::spawn(build_isle_init(
500        script_name.clone(),
501        script_dir.clone(),
502        blocks_paths.clone(),
503        prompt.clone(),
504        context.clone(),
505    ))
506    .await
507    .map_err(|e| BlockError::Runtime(format!("AsyncIsle spawn failed: {e}")))?;
508    let isle = Arc::new(isle);
509
510    // ── handler Isle (sequential, dependencies are trivial) ────────
511    let (handler_isle, handler_driver) = spawn_handler_isle(
512        script_name.clone(),
513        script_dir.clone(),
514        blocks_paths.clone(),
515        prompt,
516        context,
517    )
518    .await?;
519
520    // Wire both Isles into McpManager so Lua notification callbacks can be
521    // dispatched from the rmcp task thread.
522    // - handler_isle: sampling/createMessage dispatch (exec on handler Isle)
523    // - main_isle: progress/log notification dispatch (exec on main Isle so
524    //   user callback upvalues are preserved — no bytecode dump/reload needed)
525    {
526        let mut mgr = mcp_manager.write().await;
527        mgr.set_handler_isle(Arc::clone(&handler_isle));
528        mgr.set_main_isle(Arc::clone(&isle));
529    }
530
531    // ── HostContext + bridge registration ──────────────────────────────
532    // Wrap the isle in an Arc so `HostContext` can hand it to
533    // `bridge::bus` (which uses `AsyncIsle::coroutine_call` to invoke Lua
534    // handlers from the EventBus dispatcher task).
535    let ctx = HostContext {
536        project_root,
537        mesh_agent,
538        mcp_manager: Arc::clone(&mcp_manager),
539        http_client,
540        sql_conn,
541        sql_interrupt,
542        kv_conn,
543        kv_interrupt,
544        ts_conn,
545        ts_interrupt,
546        isle: Arc::clone(&isle),
547        handler_isle: Arc::clone(&handler_isle),
548        bus_tx: bus_tx.clone(),
549        event_bus: Arc::clone(&event_bus),
550    };
551
552    {
553        let ctx = ctx.clone();
554        isle.exec(move |lua| {
555            bridge::register_all(lua, &ctx)
556                .map_err(|e| mlua_isle::IsleError::Lua(format!("bridge register failed: {e}")))?;
557            Ok(String::new())
558        })
559        .await
560        .map_err(|e| BlockError::Runtime(format!("bridge register: {e}")))?;
561    }
562
563    {
564        let ctx = ctx.clone();
565        handler_isle
566            .exec(move |lua| {
567                bridge::register_all_handler_side(lua, &ctx).map_err(|e| {
568                    mlua_isle::IsleError::Lua(format!("handler bridge register failed: {e}"))
569                })?;
570                Ok(String::new())
571            })
572            .await
573            .map_err(|e| BlockError::Runtime(format!("handler bridge register: {e}")))?;
574    }
575
576    drop(_init_span);
577
578    // ── Execute ───────────────────────────────────────────────────
579    // When `shutdown_token` is supplied, race the script future against
580    // the caller's cancellation signal. On cancel, propagate to the Isle
581    // via the AsyncTask's cancel token so the debug hook unwinds the Lua
582    // VM, then continue into the shutdown sequence below (we still want
583    // to release MCP/mesh handles and join the auto-serve dispatcher
584    // before returning).
585    let script_result: Result<(), BlockError> = {
586        let _exec_span = info_span!("execute", script = %script_name);
587
588        let script = std::fs::read_to_string(&script_path)
589            .map_err(|e| BlockError::Script(format!("{}: {e}", script_path.display())))?;
590
591        let mut task = isle.spawn_coroutine_eval(&script);
592        let task_cancel = task.cancel_token().clone();
593        match config.shutdown_token.as_ref() {
594            Some(token) => {
595                tokio::select! {
596                    biased;
597                    _ = token.cancelled() => {
598                        task_cancel.cancel();
599                        // Wait for the Isle to unwind so the VM is in a
600                        // consistent state before driver shutdown. The
601                        // debug hook fires at the next HOOK_INTERVAL.
602                        let _ = (&mut task).await;
603                        info!("shutdown_token: cancelled by caller");
604                        Err(BlockError::Cancelled)
605                    }
606                    res = &mut task => res.map(|_| ()).map_err(|e| BlockError::Script(format!("{e}"))),
607                }
608            }
609            None => (&mut task)
610                .await
611                .map(|_| ())
612                .map_err(|e| BlockError::Script(format!("{e}"))),
613        }
614    };
615
616    // ── auto-serve drain + cancel ─────────────────────────────────
617    // Let the dispatcher drain events queued by the script, then signal
618    // shutdown and bound the join. Mirrors `bus.serve`'s grace pattern.
619    if let Some((handle, token)) = auto_serve_state {
620        let grace_ms = crate::bridge::config::task_grace_ms();
621        let grace = Duration::from_millis(grace_ms);
622        tokio::time::sleep(grace).await;
623        token.cancel();
624        match tokio::time::timeout(grace, handle).await {
625            Ok(Ok(())) => info!("auto-serve: dispatcher shut down cleanly"),
626            Ok(Err(join_err)) => {
627                tracing::error!(error = %join_err, "auto-serve: dispatcher task join error");
628            }
629            Err(_) => {
630                tracing::warn!(
631                    grace_ms,
632                    "auto-serve: dispatcher join timed out after cancel; forcing exit"
633                );
634            }
635        }
636    }
637
638    // ── Shutdown ──────────────────────────────────────────────────
639    {
640        let _shutdown_span = info_span!("shutdown");
641
642        mcp_manager.write().await.disconnect_all().await?;
643
644        driver
645            .shutdown()
646            .await
647            .map_err(|e| BlockError::Runtime(format!("AsyncIsle shutdown failed: {e}")))?;
648
649        // Handler Isle shutdown is independent of main shutdown: a failure
650        // here (e.g. ThreadPanic on the handler thread) is logged but does
651        // not poison the main process exit. The main Isle has already
652        // been stopped cleanly above.
653        match handler_driver.shutdown().await {
654            Ok(()) => info!(
655                thread_name = "agent-block-handler-isle",
656                "handler Isle shut down"
657            ),
658            Err(e) => tracing::error!(
659                error = %e,
660                thread_name = "agent-block-handler-isle",
661                "handler Isle shutdown failed"
662            ),
663        }
664    }
665
666    script_result
667}
668
669/// mesh → bus source adapter.
670///
671/// Implements [`agent_mesh_sdk::RequestHandler`] by packaging every incoming
672/// mesh request into an [`Event`] with `kind = "mesh"`, pushing it onto the
673/// bounded `bus_tx` channel, and awaiting the Lua handler's ack over a
674/// oneshot channel carried inside the event.
675///
676/// Error paths (all `tracing::error!`-logged — silent-err-drop policy):
677///
678/// | Failure                   | Return value                           |
679/// |---------------------------|----------------------------------------|
680/// | `bus_tx.send` closed/full | `{"error": "bus channel closed"}`      |
681/// | ack receiver dropped      | `{"error": "ack dropped"}`             |
682/// | Lua handler `BlockError`  | `{"error": "<handler error>"}`         |
683/// | Handler exceeded 30s      | `{"error": "handler timeout"}`         |
684///
685/// The 30s ack timeout mirrors the client-side timeout on `mesh.request`
686/// (see `src/bridge/mesh.rs`).
687struct BusRelayHandler {
688    tx: mpsc::Sender<Event>,
689}
690
691impl BusRelayHandler {
692    fn new(tx: mpsc::Sender<Event>) -> Self {
693        Self { tx }
694    }
695}
696
697/// Bound used for both the mesh-adapter ack wait and other source timeouts.
698const BUS_ACK_TIMEOUT: Duration = Duration::from_secs(30);
699
700#[async_trait::async_trait]
701impl agent_mesh_sdk::RequestHandler for BusRelayHandler {
702    async fn handle(
703        &self,
704        from: &agent_mesh_core::identity::AgentId,
705        payload: &serde_json::Value,
706        _cancel: agent_mesh_sdk::CancelToken,
707    ) -> serde_json::Value {
708        let id = uuid::Uuid::new_v4().to_string();
709        let meta = serde_json::json!({"from": from.to_string()});
710        let (ack_tx, ack_rx) = oneshot::channel();
711        let event = Event {
712            kind: "mesh".into(),
713            id: id.clone(),
714            payload: payload.clone(),
715            meta,
716            ack_tx: Some(ack_tx),
717        };
718
719        if let Err(e) = self.tx.send(event).await {
720            tracing::error!(error = %e, id = %id, "bus channel closed; rejecting mesh request");
721            return serde_json::json!({"error": "bus channel closed"});
722        }
723
724        match tokio::time::timeout(BUS_ACK_TIMEOUT, ack_rx).await {
725            Ok(Ok(Ok(v))) => v,
726            Ok(Ok(Err(e))) => {
727                tracing::error!(id = %id, error = %e, "mesh handler returned error");
728                serde_json::json!({"error": e.to_string()})
729            }
730            Ok(Err(e)) => {
731                tracing::error!(id = %id, error = %e, "mesh ack receiver dropped");
732                serde_json::json!({"error": "ack dropped"})
733            }
734            Err(_) => {
735                tracing::error!(id = %id, timeout_secs = BUS_ACK_TIMEOUT.as_secs(), "mesh handler timeout");
736                serde_json::json!({"error": "handler timeout"})
737            }
738        }
739    }
740}