Skip to main content

agent_block_core/
host.rs

1//! Host — the thin Rust shell that wires up Lua VM, Mesh, HTTP, and MCP.
2//!
3//! # Responsibilities
4//!
5//! 1. Spawn an mlua-isle `AsyncIsle` (dedicated Lua VM thread with coroutine support)
6//! 2. Optionally connect to agent-mesh relay
7//! 3. Initialize the MCP manager for stdio-based MCP server connections
8//! 4. Inject all Lua stdlib bridges (`mesh.*`, `http.*`, `sh.*`, `tool.*`, `log.*`, `mcp.*`)
9//! 5. Execute the user-provided Lua script via `coroutine_eval` (async-aware)
10//! 6. Graceful shutdown (Isle + MCP servers + mesh)
11
12use std::collections::HashMap;
13use std::path::{Path, PathBuf};
14use std::sync::{Arc, Mutex};
15use std::time::Duration;
16use tokio::sync::{mpsc, oneshot, RwLock};
17
18use mlua_isle::{AsyncIsle, AsyncIsleDriver};
19use tracing::{info, info_span, warn};
20
21use crate::bridge;
22use crate::bus::{Event, EventBus, Handler};
23use agent_block_mcp::McpManager;
24use agent_block_types::error::{BlockError, BlockResult};
25use tokio_util::sync::CancellationToken;
26
27/// Embedded Lua sources for blocks/ StdPkg modules.
28/// These are baked into the binary at compile time so `cargo install` works
29/// without any extra file distribution.
30const EMBEDDED_BLOCKS: &[(&str, &str)] = &[
31    ("agent", include_str!("../blocks/agent/init.lua")),
32    ("session", include_str!("../blocks/session/init.lua")),
33];
34
35/// Build the `blocks/` portion of `package.path` from filesystem locations.
36///
37/// Priority (highest first):
38/// 1. `project_root/blocks/` — user-customisable, overrides embedded StdPkg
39/// 2. `exe_dir/blocks/`      — development hot-reload (next to the binary)
40///
41/// Returns a semicolon-terminated string ready to prepend to `package.path`,
42/// or an empty string when no `blocks/` directories are found.
43fn build_blocks_path(project_root: &Path) -> String {
44    let mut out = String::new();
45
46    // 1. project_root/blocks/
47    let project_blocks = project_root.join("blocks");
48    if project_blocks.is_dir() {
49        let pb = project_blocks.to_string_lossy();
50        out.push_str(&format!("{pb}/?.lua;{pb}/?/init.lua;"));
51    }
52
53    // 2. exe_dir/blocks/
54    match std::env::current_exe() {
55        Ok(exe) => {
56            if let Some(exe_dir) = exe.parent() {
57                let exe_blocks = exe_dir.join("blocks");
58                if exe_blocks.is_dir() {
59                    let eb = exe_blocks.to_string_lossy();
60                    out.push_str(&format!("{eb}/?.lua;{eb}/?/init.lua;"));
61                }
62            }
63        }
64        Err(e) => {
65            warn!(error = %e, "current_exe() failed; skipping exe_dir/blocks/ from package.path");
66        }
67    }
68
69    out
70}
71
72pub struct BlockConfig {
73    pub script_path: PathBuf,
74    pub project_root: PathBuf,
75    pub relay_url: Option<String>,
76    /// Ed25519 secret key (64 hex chars). If `None`, a random keypair is
77    /// generated. Required to talk to registry/ACL-gated hosted meshes.
78    pub secret_key: Option<String>,
79    /// Per-RPC timeout for every MCP round-trip (connect / list / call).
80    /// Defaults to [`agent_block_mcp::DEFAULT_RPC_TIMEOUT`].
81    pub mcp_rpc_timeout: Duration,
82    /// Prompt string injected as `_PROMPT` Lua global. `None` = global not set.
83    pub prompt: Option<String>,
84    /// Context string injected as `_CONTEXT` Lua global. `None` = global not set.
85    pub context: Option<String>,
86    /// Host-side Rust handlers pre-installed on the EventBus before the user
87    /// script starts. Each entry registers `handler` against `kind` via
88    /// [`EventBus::on`], so a script-side `bus.emit(kind, payload)` is
89    /// captured by the Rust handler rather than dispatched to a Lua function.
90    ///
91    /// Intended for SDK consumers that embed `agent-block-core` and need to
92    /// receive script output programmatically (e.g. a Spawner adapter that
93    /// turns LLM script output into a typed `WorkerResult`). Lua-side
94    /// `bus.on(kind, fn)` registrations layered on top of the handler Isle
95    /// are still possible, but the EventBus dispatches a single handler per
96    /// `kind` (last-write-wins), so host-side and Lua-side registrations on
97    /// the same `kind` collide; choose one side per routing key.
98    ///
99    /// Defaults to an empty map (no host handlers).
100    pub host_handlers: HashMap<String, Arc<dyn Handler>>,
101    /// When `true`, the EventBus dispatcher loop is driven in the background
102    /// for the duration of the script and shut down gracefully after the
103    /// script completes. Required for SDK-embed callers that supply
104    /// [`Self::host_handlers`] and need `bus.emit(kind, payload)` events
105    /// emitted from the script to actually reach those handlers without
106    /// requiring the script to call `bus.serve()` (which blocks on
107    /// SIGTERM / Ctrl+C and never returns under programmatic embedding).
108    ///
109    /// After the script finishes, the dispatcher is given a grace window
110    /// (`AGENT_BLOCK_TASK_GRACE_MS`, default 1000ms) to drain queued events
111    /// and finish any in-flight handler, then is cancelled.
112    ///
113    /// Mutually exclusive with Lua-side `bus.serve()`: enabling this flag
114    /// takes ownership of the EventBus before the script runs, so a script
115    /// that calls `bus.on(...)` followed by `bus.serve()` will error
116    /// ("bus.serve() has already taken ownership"). Use this flag when the
117    /// script's sole purpose is to push events to host handlers.
118    ///
119    /// Defaults to `false` (legacy behavior: dispatcher only runs when the
120    /// script calls `bus.serve()`).
121    pub auto_serve_bus: bool,
122}
123
124/// Shared context passed into Lua bridge functions.
125#[derive(Clone)]
126pub struct HostContext {
127    pub project_root: PathBuf,
128    pub mesh_agent: Option<Arc<agent_mesh_sdk::MeshAgent>>,
129    pub mcp_manager: Arc<RwLock<McpManager>>,
130    /// Shared async HTTP client for `http.*` bridge.
131    pub http_client: reqwest::Client,
132    /// Shared SQLite connection for `sql.*` bridge (user tables).
133    pub sql_conn: Arc<Mutex<rusqlite::Connection>>,
134    /// Interrupt handle for the sql connection.
135    /// Used to cancel in-flight queries on timeout (see `bridge/sql.rs`).
136    pub sql_interrupt: Arc<rusqlite::InterruptHandle>,
137    /// Shared SQLite connection for `kv.*` bridge (`__kv` table only).
138    /// Separate from sql_conn so KV scratch state and user SQL data don't
139    /// share WAL, page cache, or backup lifecycle.
140    pub kv_conn: Arc<Mutex<rusqlite::Connection>>,
141    /// Interrupt handle for the kv connection.
142    pub kv_interrupt: Arc<rusqlite::InterruptHandle>,
143    /// Shared SQLite connection for `ts.*` bridge (TSDB — time-series table).
144    /// Separate DB file so TSDB WAL does not share page cache with kv/sql.
145    pub ts_conn: Arc<Mutex<rusqlite::Connection>>,
146    /// Interrupt handle for the ts connection.
147    /// Used by `bridge::ts` to cancel in-flight queries on timeout (Subtask 2).
148    #[allow(dead_code)]
149    pub ts_interrupt: Arc<rusqlite::InterruptHandle>,
150    /// Async handle to the main Isle Lua VM that runs the user script via
151    /// `coroutine_eval`. After Subtask 2, `bridge::bus` no longer dispatches
152    /// handlers against this Isle; handlers live on `handler_isle` instead.
153    /// The field is retained because bridge code still keyed to the main
154    /// Isle (future `coroutine_call` back-edges, introspection APIs) may
155    /// need it, and removing it would force another HostContext reshape.
156    #[allow(dead_code)]
157    pub isle: Arc<AsyncIsle>,
158    /// Dedicated Isle for EventBus handler execution. Lua handlers
159    /// registered via `bus.on` / `bus.on_any` run here so that CPU-bound
160    /// handler code does not occupy the main Isle's LocalSet and block
161    /// grace timers / shutdown wakers on the main VM side.
162    ///
163    /// Used by `bridge::bus` to forward handler bytecode
164    /// (`Function::dump(true)` → `handler_isle.exec(...)`) and by
165    /// [`LuaHandler::call`](crate::bridge::bus) to dispatch via
166    /// `coroutine_call("__bus_dispatch", ...)`.
167    pub handler_isle: Arc<AsyncIsle>,
168    /// Ingress sender for the EventBus. Adapters (mesh / webhook / …)
169    /// clone this and push `Event`s. The ST3 mesh adapter captures its own
170    /// clone at `MeshAgent::connect` time, so the field itself is not read
171    /// elsewhere in the ST3 cut — kept `pub` for ST4+ adapter wiring.
172    #[allow(dead_code)]
173    pub bus_tx: mpsc::Sender<Event>,
174    /// Mutex-wrapped `Option<EventBus>` so `bus.on` / `bus.on_any` can lock
175    /// briefly from sync Lua context, and `bus.serve` can `Option::take`
176    /// ownership before entering the long-lived `run()` await (avoiding the
177    /// await-holding-lock anti-pattern on a `std::sync::Mutex`).
178    pub event_bus: Arc<Mutex<Option<EventBus>>>,
179}
180
181/// Open a SQLite connection at `path` (or `:memory:`) and apply the shared
182/// pragmas driven by ENV (`journal_mode`, `busy_timeout`). Returns the
183/// connection wrapped in Arc<Mutex<_>> together with its interrupt handle.
184///
185/// `label` is used only for the init log line (`sql` / `kv`) so that the two
186/// databases are distinguishable in tracing output.
187fn open_sqlite(
188    path: &Path,
189    label: &'static str,
190) -> BlockResult<(
191    Arc<Mutex<rusqlite::Connection>>,
192    Arc<rusqlite::InterruptHandle>,
193)> {
194    let is_memory = crate::bridge::config::is_memory_sql(path);
195    if !is_memory {
196        if let Some(parent) = path.parent() {
197            std::fs::create_dir_all(parent)
198                .map_err(|e| BlockError::Runtime(format!("{label} dir create: {e}")))?;
199        }
200    }
201    let conn = rusqlite::Connection::open(path)
202        .map_err(|e| BlockError::Runtime(format!("sqlite open {}: {e}", path.display())))?;
203    if !is_memory {
204        let journal = crate::bridge::config::sql_journal_mode();
205        conn.pragma_update(None, "journal_mode", &journal)
206            .map_err(|e| BlockError::Runtime(format!("journal_mode={journal}: {e}")))?;
207    }
208    let busy_ms = crate::bridge::config::sql_busy_timeout().as_millis() as i64;
209    conn.pragma_update(None, "busy_timeout", busy_ms)
210        .map_err(|e| BlockError::Runtime(format!("busy_timeout pragma: {e}")))?;
211    info!(label, path = %path.display(), busy_ms, "sqlite initialized");
212    let interrupt = Arc::new(conn.get_interrupt_handle());
213    let conn = Arc::new(Mutex::new(conn));
214    Ok((conn, interrupt))
215}
216
217/// Build the init closure shared between the main Isle and the handler
218/// Isle.  Sets `_SCRIPT_NAME`, registers `mlua-batteries` `std.*`, and
219/// configures `package.path` / `package.searchers` so `require "agent"`
220/// (and any `blocks/` module) works inside the Lua VM.
221///
222/// Returns an `FnOnce` so each call produces a fresh closure; this lets
223/// both Isles be spawned from the same config without `Clone` bounds on
224/// the captured `HashMap`.
225fn build_isle_init(
226    script_name: String,
227    script_dir: String,
228    blocks_paths: String,
229    prompt: Option<String>,
230    context: Option<String>,
231) -> impl FnOnce(&mlua::Lua) -> mlua::Result<()> + Send + 'static {
232    move |lua| {
233        // Set script name before registering bridges (used by log.* for attribution)
234        lua.globals().set("_SCRIPT_NAME", script_name.as_str())?;
235        if let Some(ref p) = prompt {
236            lua.globals().set("_PROMPT", p.as_str())?;
237        }
238        if let Some(ref c) = context {
239            lua.globals().set("_CONTEXT", c.as_str())?;
240        }
241
242        mlua_batteries::register_all(lua, "std")?;
243
244        // ── package.path ──────────────────────────────────────────────
245        // Priority: script_dir > project_root/blocks/ > exe_dir/blocks/ > default
246        let package: mlua::Table = lua.globals().get("package")?;
247        let current_path: String = package.get("path")?;
248        let new_path =
249            format!("{script_dir}/?.lua;{script_dir}/?/init.lua;{blocks_paths}{current_path}");
250        package.set("path", new_path)?;
251
252        // ── package.searchers — embedded fallback ─────────────────────
253        // Register a custom searcher that loads blocks/ modules from the
254        // sources baked in at compile time.  This is the lowest-priority
255        // searcher so filesystem copies always win.
256        let embedded: HashMap<&'static str, &'static str> =
257            EMBEDDED_BLOCKS.iter().copied().collect();
258
259        let searchers: mlua::Table = package.get("searchers")?;
260        let loader =
261            lua.create_function(move |lua, name: String| match embedded.get(name.as_str()) {
262                Some(source) => {
263                    let chunk = lua
264                        .load(*source)
265                        .set_name(format!("@embedded:blocks/{name}/init.lua"));
266                    let func = chunk.into_function()?;
267                    Ok(mlua::Value::Function(func))
268                }
269                None => {
270                    let msg = lua.create_string(format!("\n\tno embedded block '{name}'"))?;
271                    Ok(mlua::Value::String(msg))
272                }
273            })?;
274        // Append as the last searcher so filesystem paths remain preferred.
275        let next_idx = searchers.raw_len() + 1;
276        searchers.raw_set(next_idx, loader)?;
277
278        Ok(())
279    }
280}
281
282/// Spawn the dedicated handler Isle.
283///
284/// The handler Isle runs Lua bus handlers (`bus.on` / `bus.on_any`) on a
285/// separate OS thread with its own `tokio` current-thread runtime, keeping
286/// CPU-bound handlers from starving the main Isle's grace timers.
287///
288/// Bridge registration is deferred to a follow-up `exec` in `run()` because
289/// `HostContext` is not constructible until both Isles exist (the struct
290/// itself holds `Arc<AsyncIsle>` for both).
291async fn spawn_handler_isle(
292    script_name: String,
293    script_dir: String,
294    blocks_paths: String,
295    prompt: Option<String>,
296    context: Option<String>,
297) -> BlockResult<(Arc<AsyncIsle>, AsyncIsleDriver)> {
298    let init = build_isle_init(script_name, script_dir, blocks_paths, prompt, context);
299    let (isle, driver) = AsyncIsle::builder()
300        .thread_name("agent-block-handler-isle")
301        .spawn(init)
302        .await
303        .map_err(|e| BlockError::Runtime(format!("handler isle spawn failed: {e}")))?;
304    info!(
305        thread_name = "agent-block-handler-isle",
306        "handler Isle spawned"
307    );
308    Ok((Arc::new(isle), driver))
309}
310
311fn hex_decode_32(s: &str) -> Result<[u8; 32], String> {
312    let s = s.trim();
313    if s.len() != 64 {
314        return Err(format!("expected 64 hex chars, got {}", s.len()));
315    }
316    let mut out = [0u8; 32];
317    for (i, byte) in out.iter_mut().enumerate() {
318        let hi = u8::from_str_radix(&s[2 * i..2 * i + 1], 16)
319            .map_err(|e| format!("invalid hex at position {}: {e}", 2 * i))?;
320        let lo = u8::from_str_radix(&s[2 * i + 1..2 * i + 2], 16)
321            .map_err(|e| format!("invalid hex at position {}: {e}", 2 * i + 1))?;
322        *byte = (hi << 4) | lo;
323    }
324    Ok(out)
325}
326
327pub async fn run(config: BlockConfig) -> BlockResult<()> {
328    let script_name = config
329        .script_path
330        .file_name()
331        .map(|n| n.to_string_lossy().to_string())
332        .unwrap_or_else(|| "unknown".to_string());
333
334    let root_span = info_span!("agent_block", script = %script_name);
335    let _root_guard = root_span.enter();
336
337    // ── .env ──────────────────────────────────────────────────────
338    // Load .env from project_root if present. Variables are merged into
339    // the process environment so Lua's `std.env.get()` picks them up.
340    let env_path = config.project_root.join(".env");
341    match dotenvy::from_path(&env_path) {
342        Ok(()) => info!(path = %env_path.display(), ".env loaded"),
343        Err(dotenvy::Error::Io(_)) => {} // file not found — fine
344        Err(e) => tracing::warn!(path = %env_path.display(), error = %e, ".env parse error"),
345    }
346
347    // ── Init ──────────────────────────────────────────────────────
348    let _init_guard = info_span!("init").entered();
349
350    // ── EventBus channel ─────────────────────────────────────────────
351    // Construct the bounded mpsc BEFORE MeshAgent::connect so the relay
352    // handler can hold a `bus_tx` clone and forward incoming requests
353    // into the dispatcher. Capacity is ENV-driven (see bridge::config).
354    let bus_capacity = crate::bridge::config::bus_capacity();
355    let (bus_tx, bus_rx) = mpsc::channel::<Event>(bus_capacity);
356    let event_bus = Arc::new(Mutex::new(Some(EventBus::new(bus_rx))));
357
358    // ── Pre-install host-side Rust handlers ───────────────────────────
359    // SDK consumers attach Rust handlers via `BlockConfig.host_handlers`
360    // so that script-side `bus.emit(kind, payload)` is captured by a Rust
361    // `Arc<dyn Handler>` instead of being dispatched to a Lua function.
362    // Registered here (before any Lua bridge registers handlers and before
363    // `bus.serve` takes ownership of the bus) so the EventBus already
364    // carries the host handlers when the script starts.
365    if !config.host_handlers.is_empty() {
366        let mut guard = event_bus
367            .lock()
368            .map_err(|_| BlockError::Bus("event_bus mutex poisoned".into()))?;
369        let bus = guard
370            .as_mut()
371            .ok_or_else(|| BlockError::Bus("event_bus already taken".into()))?;
372        for (kind, handler) in &config.host_handlers {
373            bus.on(kind.clone(), Arc::clone(handler))
374                .map_err(|e| BlockError::Bus(format!("host_handlers on({kind}): {e}")))?;
375        }
376        info!(
377            count = config.host_handlers.len(),
378            "host handlers pre-installed"
379        );
380    }
381
382    // ── auto-serve: background dispatcher for SDK-embed callers ───────
383    // When `auto_serve_bus` is on and host handlers are installed, take the
384    // EventBus out of the Mutex *before* the script runs and spawn the
385    // dispatcher loop on the runtime. This lets `bus.emit(kind, payload)`
386    // from the script reach the host handler without requiring the script
387    // to call `bus.serve()` (which blocks on signals and never returns
388    // under programmatic embedding).
389    let auto_serve = config.auto_serve_bus && !config.host_handlers.is_empty();
390    let auto_serve_state: Option<(tokio::task::JoinHandle<()>, CancellationToken)> = if auto_serve {
391        let bus = {
392            let mut guard = event_bus
393                .lock()
394                .map_err(|_| BlockError::Bus("event_bus mutex poisoned".into()))?;
395            guard
396                .take()
397                .ok_or_else(|| BlockError::Bus("event_bus already taken".into()))?
398        };
399        let token = CancellationToken::new();
400        let token_for_task = token.clone();
401        let handle = tokio::spawn(async move {
402            let mut bus = bus;
403            if let Err(e) = bus.run(token_for_task).await {
404                tracing::error!(error = %e, "auto-serve: dispatcher loop returned error");
405            }
406        });
407        info!("auto-serve: dispatcher spawned");
408        Some((handle, token))
409    } else {
410        None
411    };
412
413    let mesh_agent = if let Some(ref relay_url) = config.relay_url {
414        let keypair = match &config.secret_key {
415            Some(hex_str) => {
416                let bytes = hex_decode_32(hex_str)
417                    .map_err(|e| BlockError::Runtime(format!("--secret-key: {e}")))?;
418                agent_mesh_core::identity::AgentKeypair::from_bytes(&bytes)
419            }
420            None => agent_mesh_core::identity::AgentKeypair::generate(),
421        };
422        info!(agent_id = %keypair.agent_id(), "mesh identity");
423        let acl = agent_mesh_core::acl::AclPolicy {
424            default_deny: false,
425            rules: vec![],
426        };
427        let handler: Arc<dyn agent_mesh_sdk::RequestHandler> =
428            Arc::new(BusRelayHandler::new(bus_tx.clone()));
429        let url = relay_url.clone();
430        let agent = agent_mesh_sdk::MeshAgent::connect(keypair, &url, acl, handler)
431            .await
432            .map_err(|e| BlockError::Mesh(format!("connect to {relay_url} failed: {e}")))?;
433        info!(relay_url = %relay_url, "mesh connected");
434        Some(Arc::new(agent))
435    } else {
436        None
437    };
438
439    let mcp_manager = Arc::new(RwLock::new(McpManager::with_rpc_timeout(
440        config.mcp_rpc_timeout,
441    )?));
442
443    // Resolve project_root to absolute path.
444    // canonicalize() can fail if the path doesn't exist; fall back to
445    // joining with current_dir to guarantee an absolute path.
446    let project_root = config
447        .project_root
448        .canonicalize()
449        .or_else(|_| std::env::current_dir().map(|cwd| cwd.join(&config.project_root)))?;
450
451    let http_client = reqwest::Client::new();
452
453    // ── SQLite init (kv + sql get separate DB files) ──────────────────────
454    // All knobs are ENV-driven (see `bridge/config.rs`).
455    let sql_path = crate::bridge::config::sql_path().map_err(BlockError::Runtime)?;
456    let (sql_conn, sql_interrupt) = open_sqlite(&sql_path, "sql")?;
457
458    let kv_path = crate::bridge::config::kv_path().map_err(BlockError::Runtime)?;
459    let (kv_conn, kv_interrupt) = open_sqlite(&kv_path, "kv")?;
460
461    let ts_path = crate::bridge::config::ts_path().map_err(BlockError::Runtime)?;
462    let (ts_conn, ts_interrupt) = open_sqlite(&ts_path, "ts")?;
463
464    let script_path = config.script_path.clone();
465    let script_dir = script_path
466        .parent()
467        .map(|p| p.to_string_lossy().to_string())
468        .unwrap_or_else(|| ".".to_string());
469
470    // Precompute values captured by the init closure so we don't need to
471    // move the full `HostContext` into it (HostContext now holds
472    // `Arc<AsyncIsle>`, which is available only after `AsyncIsle::spawn`
473    // returns — classic chicken-and-egg). All bridge registrations run in a
474    // second pass via `isle.exec` below.
475    let blocks_paths = build_blocks_path(&project_root);
476    let prompt = config.prompt.clone();
477    let context = config.context.clone();
478
479    // ── main Isle ─────────────────────────────────────────────────
480    let (isle, driver) = AsyncIsle::spawn(build_isle_init(
481        script_name.clone(),
482        script_dir.clone(),
483        blocks_paths.clone(),
484        prompt.clone(),
485        context.clone(),
486    ))
487    .await
488    .map_err(|e| BlockError::Runtime(format!("AsyncIsle spawn failed: {e}")))?;
489    let isle = Arc::new(isle);
490
491    // ── handler Isle (sequential, dependencies are trivial) ────────
492    let (handler_isle, handler_driver) = spawn_handler_isle(
493        script_name.clone(),
494        script_dir.clone(),
495        blocks_paths.clone(),
496        prompt,
497        context,
498    )
499    .await?;
500
501    // Wire both Isles into McpManager so Lua notification callbacks can be
502    // dispatched from the rmcp task thread.
503    // - handler_isle: sampling/createMessage dispatch (exec on handler Isle)
504    // - main_isle: progress/log notification dispatch (exec on main Isle so
505    //   user callback upvalues are preserved — no bytecode dump/reload needed)
506    {
507        let mut mgr = mcp_manager.write().await;
508        mgr.set_handler_isle(Arc::clone(&handler_isle));
509        mgr.set_main_isle(Arc::clone(&isle));
510    }
511
512    // ── HostContext + bridge registration ──────────────────────────────
513    // Wrap the isle in an Arc so `HostContext` can hand it to
514    // `bridge::bus` (which uses `AsyncIsle::coroutine_call` to invoke Lua
515    // handlers from the EventBus dispatcher task).
516    let ctx = HostContext {
517        project_root,
518        mesh_agent,
519        mcp_manager: Arc::clone(&mcp_manager),
520        http_client,
521        sql_conn,
522        sql_interrupt,
523        kv_conn,
524        kv_interrupt,
525        ts_conn,
526        ts_interrupt,
527        isle: Arc::clone(&isle),
528        handler_isle: Arc::clone(&handler_isle),
529        bus_tx: bus_tx.clone(),
530        event_bus: Arc::clone(&event_bus),
531    };
532
533    {
534        let ctx = ctx.clone();
535        isle.exec(move |lua| {
536            bridge::register_all(lua, &ctx)
537                .map_err(|e| mlua_isle::IsleError::Lua(format!("bridge register failed: {e}")))?;
538            Ok(String::new())
539        })
540        .await
541        .map_err(|e| BlockError::Runtime(format!("bridge register: {e}")))?;
542    }
543
544    {
545        let ctx = ctx.clone();
546        handler_isle
547            .exec(move |lua| {
548                bridge::register_all_handler_side(lua, &ctx).map_err(|e| {
549                    mlua_isle::IsleError::Lua(format!("handler bridge register failed: {e}"))
550                })?;
551                Ok(String::new())
552            })
553            .await
554            .map_err(|e| BlockError::Runtime(format!("handler bridge register: {e}")))?;
555    }
556
557    drop(_init_guard);
558
559    // ── Execute ───────────────────────────────────────────────────
560    {
561        let _exec_guard = info_span!("execute", script = %script_name).entered();
562
563        let script = std::fs::read_to_string(&script_path)
564            .map_err(|e| BlockError::Script(format!("{}: {e}", script_path.display())))?;
565
566        isle.coroutine_eval(&script)
567            .await
568            .map_err(|e| BlockError::Script(format!("{e}")))?;
569    }
570
571    // ── auto-serve drain + cancel ─────────────────────────────────
572    // Let the dispatcher drain events queued by the script, then signal
573    // shutdown and bound the join. Mirrors `bus.serve`'s grace pattern.
574    if let Some((handle, token)) = auto_serve_state {
575        let grace_ms = crate::bridge::config::task_grace_ms();
576        let grace = Duration::from_millis(grace_ms);
577        tokio::time::sleep(grace).await;
578        token.cancel();
579        match tokio::time::timeout(grace, handle).await {
580            Ok(Ok(())) => info!("auto-serve: dispatcher shut down cleanly"),
581            Ok(Err(join_err)) => {
582                tracing::error!(error = %join_err, "auto-serve: dispatcher task join error");
583            }
584            Err(_) => {
585                tracing::warn!(
586                    grace_ms,
587                    "auto-serve: dispatcher join timed out after cancel; forcing exit"
588                );
589            }
590        }
591    }
592
593    // ── Shutdown ──────────────────────────────────────────────────
594    {
595        let _shutdown_guard = info_span!("shutdown").entered();
596
597        mcp_manager.write().await.disconnect_all().await?;
598
599        driver
600            .shutdown()
601            .await
602            .map_err(|e| BlockError::Runtime(format!("AsyncIsle shutdown failed: {e}")))?;
603
604        // Handler Isle shutdown is independent of main shutdown: a failure
605        // here (e.g. ThreadPanic on the handler thread) is logged but does
606        // not poison the main process exit. The main Isle has already
607        // been stopped cleanly above.
608        match handler_driver.shutdown().await {
609            Ok(()) => info!(
610                thread_name = "agent-block-handler-isle",
611                "handler Isle shut down"
612            ),
613            Err(e) => tracing::error!(
614                error = %e,
615                thread_name = "agent-block-handler-isle",
616                "handler Isle shutdown failed"
617            ),
618        }
619    }
620
621    Ok(())
622}
623
624/// mesh → bus source adapter.
625///
626/// Implements [`agent_mesh_sdk::RequestHandler`] by packaging every incoming
627/// mesh request into an [`Event`] with `kind = "mesh"`, pushing it onto the
628/// bounded `bus_tx` channel, and awaiting the Lua handler's ack over a
629/// oneshot channel carried inside the event.
630///
631/// Error paths (all `tracing::error!`-logged — silent-err-drop policy):
632///
633/// | Failure                   | Return value                           |
634/// |---------------------------|----------------------------------------|
635/// | `bus_tx.send` closed/full | `{"error": "bus channel closed"}`      |
636/// | ack receiver dropped      | `{"error": "ack dropped"}`             |
637/// | Lua handler `BlockError`  | `{"error": "<handler error>"}`         |
638/// | Handler exceeded 30s      | `{"error": "handler timeout"}`         |
639///
640/// The 30s ack timeout mirrors the client-side timeout on `mesh.request`
641/// (see `src/bridge/mesh.rs`).
642struct BusRelayHandler {
643    tx: mpsc::Sender<Event>,
644}
645
646impl BusRelayHandler {
647    fn new(tx: mpsc::Sender<Event>) -> Self {
648        Self { tx }
649    }
650}
651
652/// Bound used for both the mesh-adapter ack wait and other source timeouts.
653const BUS_ACK_TIMEOUT: Duration = Duration::from_secs(30);
654
655#[async_trait::async_trait]
656impl agent_mesh_sdk::RequestHandler for BusRelayHandler {
657    async fn handle(
658        &self,
659        from: &agent_mesh_core::identity::AgentId,
660        payload: &serde_json::Value,
661        _cancel: agent_mesh_sdk::CancelToken,
662    ) -> serde_json::Value {
663        let id = uuid::Uuid::new_v4().to_string();
664        let meta = serde_json::json!({"from": from.to_string()});
665        let (ack_tx, ack_rx) = oneshot::channel();
666        let event = Event {
667            kind: "mesh".into(),
668            id: id.clone(),
669            payload: payload.clone(),
670            meta,
671            ack_tx: Some(ack_tx),
672        };
673
674        if let Err(e) = self.tx.send(event).await {
675            tracing::error!(error = %e, id = %id, "bus channel closed; rejecting mesh request");
676            return serde_json::json!({"error": "bus channel closed"});
677        }
678
679        match tokio::time::timeout(BUS_ACK_TIMEOUT, ack_rx).await {
680            Ok(Ok(Ok(v))) => v,
681            Ok(Ok(Err(e))) => {
682                tracing::error!(id = %id, error = %e, "mesh handler returned error");
683                serde_json::json!({"error": e.to_string()})
684            }
685            Ok(Err(e)) => {
686                tracing::error!(id = %id, error = %e, "mesh ack receiver dropped");
687                serde_json::json!({"error": "ack dropped"})
688            }
689            Err(_) => {
690                tracing::error!(id = %id, timeout_secs = BUS_ACK_TIMEOUT.as_secs(), "mesh handler timeout");
691                serde_json::json!({"error": "handler timeout"})
692            }
693        }
694    }
695}