Skip to main content

agent_block_core/
host.rs

1//! Host — the thin Rust shell that wires up Lua VM, Mesh, HTTP, and MCP.
2//!
3//! # Responsibilities
4//!
5//! 1. Spawn an mlua-isle `AsyncIsle` (dedicated Lua VM thread with coroutine support)
6//! 2. Optionally connect to agent-mesh relay
7//! 3. Initialize the MCP manager for stdio-based MCP server connections
8//! 4. Inject all Lua stdlib bridges (`mesh.*`, `http.*`, `sh.*`, `tool.*`, `log.*`, `mcp.*`)
9//! 5. Execute the user-provided Lua script via `coroutine_eval` (async-aware)
10//! 6. Graceful shutdown (Isle + MCP servers + mesh)
11
12use std::collections::HashMap;
13use std::path::{Path, PathBuf};
14use std::sync::{Arc, Mutex};
15use std::time::Duration;
16use tokio::sync::{mpsc, oneshot, RwLock};
17
18use mlua_isle::{AsyncIsle, AsyncIsleDriver};
19use tracing::{info, info_span, warn};
20
21use crate::bridge;
22use crate::bus::{Event, EventBus, Handler};
23use agent_block_mcp::McpManager;
24use agent_block_types::error::{BlockError, BlockResult};
25use tokio_util::sync::CancellationToken;
26
27/// Embedded Lua sources for blocks/ StdPkg modules.
28/// These are baked into the binary at compile time so `cargo install` works
29/// without any extra file distribution.
30const EMBEDDED_BLOCKS: &[(&str, &str)] = &[
31    ("agent", include_str!("../blocks/agent/init.lua")),
32    ("session", include_str!("../blocks/session/init.lua")),
33    (
34        "compile_loop",
35        include_str!("../blocks/compile_loop/init.lua"),
36    ),
37];
38
39/// Embedded default agent invoker used by [`ScriptSource::DefaultAgent`].
40///
41/// Runs the StdPkg `agent` module with `_PROMPT` / `_CONTEXT` injected and
42/// emits the result on the EventBus under kind `"agent_result"` so SDK
43/// consumers can wire a host handler without bundling any Lua file.
44const DEFAULT_AGENT_INVOKER: &str = r#"
45local agent = require("agent")
46local r = agent.run({
47    prompt = _PROMPT,
48    system = _CONTEXT,
49})
50bus.emit("agent_result", r)
51"#;
52
53/// How the Lua script source for `run()` is supplied.
54///
55/// `Path` matches the CLI form (`agent-block -s <path>`), reading from
56/// the filesystem at start. `Inline` lets SDK consumers pass a script
57/// they hold in memory (compile-time `include_str!`, dynamically built
58/// string, etc.) without writing it to a tempfile. `DefaultAgent` uses
59/// an embedded invoker that runs the StdPkg `agent` module with the
60/// caller-supplied prompt/context and emits the result via
61/// `bus.emit("agent_result", ...)`.
62#[derive(Debug, Clone)]
63pub enum ScriptSource {
64    /// Read the script from a filesystem path at start.
65    Path(PathBuf),
66    /// Use the supplied source code directly.
67    Inline {
68        /// Lua source code.
69        source: String,
70        /// Display name used in tracing, error messages, and the Lua
71        /// `_SCRIPT_NAME` global (e.g. `"agent_invoker.lua"`).
72        name: String,
73    },
74    /// Use the embedded default agent invoker. `prompt` / `context`
75    /// are forwarded as `_PROMPT` / `_CONTEXT` Lua globals and the
76    /// result is emitted on the EventBus under kind `"agent_result"`.
77    /// SDK consumers typically pair this with `host_handlers` keyed on
78    /// `"agent_result"` and `auto_serve_bus = true`.
79    DefaultAgent,
80}
81
82/// How a string payload (prompt / system context) is supplied.
83///
84/// `Inline` is the literal string variant (CLI `--prompt` / `--context`).
85/// `File` reads the contents from disk at `run()` start (CLI
86/// `--prompt-file` / `--context-file`).
87#[derive(Debug, Clone)]
88pub enum PromptSource {
89    /// Literal string.
90    Inline(String),
91    /// Filesystem path; contents are read at `run()` start.
92    File(PathBuf),
93}
94
95/// How the Ed25519 mesh identity secret key is supplied.
96///
97/// `Inline` is a 64-hex literal. `Env` reads the named environment
98/// variable at `run()` start (CLI default uses
99/// `AGENT_BLOCK_MESH_SECRET_KEY`). Absence of any `SecretKeySource`
100/// (i.e. `BlockConfig.secret_key = None`) causes a random keypair to
101/// be generated, matching the prior behavior.
102#[derive(Debug, Clone)]
103pub enum SecretKeySource {
104    /// 64-character hex literal.
105    Inline(String),
106    /// Environment variable name to read at start.
107    Env(String),
108}
109
110/// Build the `blocks/` portion of `package.path` from filesystem locations.
111///
112/// Priority (highest first):
113/// 1. `project_root/blocks/` — user-customisable, overrides embedded StdPkg
114/// 2. `exe_dir/blocks/`      — development hot-reload (next to the binary)
115///
116/// Returns a semicolon-terminated string ready to prepend to `package.path`,
117/// or an empty string when no `blocks/` directories are found.
118fn build_blocks_path(project_root: &Path) -> String {
119    let mut out = String::new();
120
121    // 1. project_root/blocks/
122    let project_blocks = project_root.join("blocks");
123    if project_blocks.is_dir() {
124        let pb = project_blocks.to_string_lossy();
125        out.push_str(&format!("{pb}/?.lua;{pb}/?/init.lua;"));
126    }
127
128    // 2. exe_dir/blocks/
129    match std::env::current_exe() {
130        Ok(exe) => {
131            if let Some(exe_dir) = exe.parent() {
132                let exe_blocks = exe_dir.join("blocks");
133                if exe_blocks.is_dir() {
134                    let eb = exe_blocks.to_string_lossy();
135                    out.push_str(&format!("{eb}/?.lua;{eb}/?/init.lua;"));
136                }
137            }
138        }
139        Err(e) => {
140            warn!(error = %e, "current_exe() failed; skipping exe_dir/blocks/ from package.path");
141        }
142    }
143
144    out
145}
146
147pub struct BlockConfig {
148    /// Lua script to execute. See [`ScriptSource`] for the supported
149    /// shapes (filesystem path / inline source / embedded default
150    /// agent invoker).
151    pub script: ScriptSource,
152    pub project_root: PathBuf,
153    pub relay_url: Option<String>,
154    /// Ed25519 secret key for mesh identity. See [`SecretKeySource`]
155    /// for the supported shapes (inline 64-hex / environment variable).
156    /// `None` generates a random keypair. Required to talk to
157    /// registry/ACL-gated hosted meshes.
158    pub secret_key: Option<SecretKeySource>,
159    /// Per-RPC timeout for every MCP round-trip (connect / list / call).
160    /// Defaults to [`agent_block_mcp::DEFAULT_RPC_TIMEOUT`].
161    pub mcp_rpc_timeout: Duration,
162    /// Prompt payload injected as `_PROMPT` Lua global. See
163    /// [`PromptSource`] for the supported shapes. `None` leaves the
164    /// global unset.
165    pub prompt: Option<PromptSource>,
166    /// Context payload injected as `_CONTEXT` Lua global (typically
167    /// the system prompt). Same shape rules as [`Self::prompt`].
168    pub context: Option<PromptSource>,
169    /// Host-side Rust handlers pre-installed on the EventBus before the user
170    /// script starts. Each entry registers `handler` against `kind` via
171    /// [`EventBus::on`], so a script-side `bus.emit(kind, payload)` is
172    /// captured by the Rust handler rather than dispatched to a Lua function.
173    ///
174    /// Intended for SDK consumers that embed `agent-block-core` and need to
175    /// receive script output programmatically (e.g. a Spawner adapter that
176    /// turns LLM script output into a typed `WorkerResult`). Lua-side
177    /// `bus.on(kind, fn)` registrations layered on top of the handler Isle
178    /// are still possible, but the EventBus dispatches a single handler per
179    /// `kind` (last-write-wins), so host-side and Lua-side registrations on
180    /// the same `kind` collide; choose one side per routing key.
181    ///
182    /// Defaults to an empty map (no host handlers).
183    pub host_handlers: HashMap<String, Arc<dyn Handler>>,
184    /// When `true`, the EventBus dispatcher loop is driven in the background
185    /// for the duration of the script and shut down gracefully after the
186    /// script completes. Required for SDK-embed callers that supply
187    /// [`Self::host_handlers`] and need `bus.emit(kind, payload)` events
188    /// emitted from the script to actually reach those handlers without
189    /// requiring the script to call `bus.serve()` (which blocks on
190    /// SIGTERM / Ctrl+C and never returns under programmatic embedding).
191    ///
192    /// After the script finishes, the dispatcher is given a grace window
193    /// (`AGENT_BLOCK_TASK_GRACE_MS`, default 1000ms) to drain queued events
194    /// and finish any in-flight handler, then is cancelled.
195    ///
196    /// Mutually exclusive with Lua-side `bus.serve()`: enabling this flag
197    /// takes ownership of the EventBus before the script runs, so a script
198    /// that calls `bus.on(...)` followed by `bus.serve()` will error
199    /// ("bus.serve() has already taken ownership"). Use this flag when the
200    /// script's sole purpose is to push events to host handlers.
201    ///
202    /// Defaults to `false` (legacy behavior: dispatcher only runs when the
203    /// script calls `bus.serve()`).
204    pub auto_serve_bus: bool,
205    /// Optional caller-supplied cancellation token. When cancelled, the
206    /// in-flight script is interrupted via the Isle's debug-hook cancel
207    /// path, the auto-serve dispatcher (if any) is shut down, and `run()`
208    /// returns `Err(BlockError::Cancelled)`.
209    ///
210    /// Intended for SDK consumers that spawn `run()` as a tokio task and
211    /// need an out-of-band abort signal (timeouts, parent-task cancellation
212    /// propagation, user-driven stop). The token is observed across the
213    /// `coroutine_eval` await; once cancellation propagates, the shutdown
214    /// sequence (MCP disconnect, Isle drivers, auto-serve dispatcher)
215    /// still runs so file descriptors and remote handles are released.
216    ///
217    /// Defaults to `None` (legacy behavior: `run()` only completes when
218    /// the script returns naturally).
219    pub shutdown_token: Option<CancellationToken>,
220}
221
222/// Shared context passed into Lua bridge functions.
223#[derive(Clone)]
224pub struct HostContext {
225    pub project_root: PathBuf,
226    pub mesh_agent: Option<Arc<agent_mesh_sdk::MeshAgent>>,
227    pub mcp_manager: Arc<RwLock<McpManager>>,
228    /// Shared async HTTP client for `http.*` bridge.
229    pub http_client: reqwest::Client,
230    /// Shared SQLite connection for `sql.*` bridge (user tables).
231    pub sql_conn: Arc<Mutex<rusqlite::Connection>>,
232    /// Interrupt handle for the sql connection.
233    /// Used to cancel in-flight queries on timeout (see `bridge/sql.rs`).
234    pub sql_interrupt: Arc<rusqlite::InterruptHandle>,
235    /// Shared SQLite connection for `kv.*` bridge (`__kv` table only).
236    /// Separate from sql_conn so KV scratch state and user SQL data don't
237    /// share WAL, page cache, or backup lifecycle.
238    pub kv_conn: Arc<Mutex<rusqlite::Connection>>,
239    /// Interrupt handle for the kv connection.
240    pub kv_interrupt: Arc<rusqlite::InterruptHandle>,
241    /// Shared SQLite connection for `ts.*` bridge (TSDB — time-series table).
242    /// Separate DB file so TSDB WAL does not share page cache with kv/sql.
243    pub ts_conn: Arc<Mutex<rusqlite::Connection>>,
244    /// Interrupt handle for the ts connection.
245    /// Used by `bridge::ts` to cancel in-flight queries on timeout (Subtask 2).
246    #[allow(dead_code)]
247    pub ts_interrupt: Arc<rusqlite::InterruptHandle>,
248    /// Async handle to the main Isle Lua VM that runs the user script via
249    /// `coroutine_eval`. After Subtask 2, `bridge::bus` no longer dispatches
250    /// handlers against this Isle; handlers live on `handler_isle` instead.
251    /// The field is retained because bridge code still keyed to the main
252    /// Isle (future `coroutine_call` back-edges, introspection APIs) may
253    /// need it, and removing it would force another HostContext reshape.
254    #[allow(dead_code)]
255    pub isle: Arc<AsyncIsle>,
256    /// Dedicated Isle for EventBus handler execution. Lua handlers
257    /// registered via `bus.on` / `bus.on_any` run here so that CPU-bound
258    /// handler code does not occupy the main Isle's LocalSet and block
259    /// grace timers / shutdown wakers on the main VM side.
260    ///
261    /// Used by `bridge::bus` to forward handler bytecode
262    /// (`Function::dump(true)` → `handler_isle.exec(...)`) and by
263    /// [`LuaHandler::call`](crate::bridge::bus) to dispatch via
264    /// `coroutine_call("__bus_dispatch", ...)`.
265    pub handler_isle: Arc<AsyncIsle>,
266    /// Ingress sender for the EventBus. Adapters (mesh / webhook / …)
267    /// clone this and push `Event`s. The ST3 mesh adapter captures its own
268    /// clone at `MeshAgent::connect` time, so the field itself is not read
269    /// elsewhere in the ST3 cut — kept `pub` for ST4+ adapter wiring.
270    #[allow(dead_code)]
271    pub bus_tx: mpsc::Sender<Event>,
272    /// Mutex-wrapped `Option<EventBus>` so `bus.on` / `bus.on_any` can lock
273    /// briefly from sync Lua context, and `bus.serve` can `Option::take`
274    /// ownership before entering the long-lived `run()` await (avoiding the
275    /// await-holding-lock anti-pattern on a `std::sync::Mutex`).
276    pub event_bus: Arc<Mutex<Option<EventBus>>>,
277}
278
279/// Open a SQLite connection at `path` (or `:memory:`) and apply the shared
280/// pragmas driven by ENV (`journal_mode`, `busy_timeout`). Returns the
281/// connection wrapped in Arc<Mutex<_>> together with its interrupt handle.
282///
283/// `label` is used only for the init log line (`sql` / `kv`) so that the two
284/// databases are distinguishable in tracing output.
285fn open_sqlite(
286    path: &Path,
287    label: &'static str,
288) -> BlockResult<(
289    Arc<Mutex<rusqlite::Connection>>,
290    Arc<rusqlite::InterruptHandle>,
291)> {
292    let is_memory = crate::bridge::config::is_memory_sql(path);
293    if !is_memory {
294        if let Some(parent) = path.parent() {
295            std::fs::create_dir_all(parent)
296                .map_err(|e| BlockError::Runtime(format!("{label} dir create: {e}")))?;
297        }
298    }
299    let conn = rusqlite::Connection::open(path)
300        .map_err(|e| BlockError::Runtime(format!("sqlite open {}: {e}", path.display())))?;
301    if !is_memory {
302        let journal = crate::bridge::config::sql_journal_mode();
303        conn.pragma_update(None, "journal_mode", &journal)
304            .map_err(|e| BlockError::Runtime(format!("journal_mode={journal}: {e}")))?;
305    }
306    let busy_ms = crate::bridge::config::sql_busy_timeout().as_millis() as i64;
307    conn.pragma_update(None, "busy_timeout", busy_ms)
308        .map_err(|e| BlockError::Runtime(format!("busy_timeout pragma: {e}")))?;
309    info!(label, path = %path.display(), busy_ms, "sqlite initialized");
310    let interrupt = Arc::new(conn.get_interrupt_handle());
311    let conn = Arc::new(Mutex::new(conn));
312    Ok((conn, interrupt))
313}
314
315/// Build the init closure shared between the main Isle and the handler
316/// Isle.  Sets `_SCRIPT_NAME`, registers `mlua-batteries` `std.*`, and
317/// configures `package.path` / `package.searchers` so `require "agent"`
318/// (and any `blocks/` module) works inside the Lua VM.
319///
320/// Returns an `FnOnce` so each call produces a fresh closure; this lets
321/// both Isles be spawned from the same config without `Clone` bounds on
322/// the captured `HashMap`.
323fn build_isle_init(
324    script_name: String,
325    script_dir: String,
326    blocks_paths: String,
327    prompt: Option<String>,
328    context: Option<String>,
329) -> impl FnOnce(&mlua::Lua) -> mlua::Result<()> + Send + 'static {
330    move |lua| {
331        // Set script name before registering bridges (used by log.* for attribution)
332        lua.globals().set("_SCRIPT_NAME", script_name.as_str())?;
333        if let Some(ref p) = prompt {
334            lua.globals().set("_PROMPT", p.as_str())?;
335        }
336        if let Some(ref c) = context {
337            lua.globals().set("_CONTEXT", c.as_str())?;
338        }
339
340        mlua_batteries::register_all(lua, "std")?;
341
342        // ── package.path ──────────────────────────────────────────────
343        // Priority: script_dir > project_root/blocks/ > exe_dir/blocks/ > default
344        let package: mlua::Table = lua.globals().get("package")?;
345        let current_path: String = package.get("path")?;
346        let new_path =
347            format!("{script_dir}/?.lua;{script_dir}/?/init.lua;{blocks_paths}{current_path}");
348        package.set("path", new_path)?;
349
350        // ── package.searchers — embedded fallback ─────────────────────
351        // Register a custom searcher that loads blocks/ modules from the
352        // sources baked in at compile time.  This is the lowest-priority
353        // searcher so filesystem copies always win.
354        let embedded: HashMap<&'static str, &'static str> =
355            EMBEDDED_BLOCKS.iter().copied().collect();
356
357        let searchers: mlua::Table = package.get("searchers")?;
358        let loader =
359            lua.create_function(move |lua, name: String| match embedded.get(name.as_str()) {
360                Some(source) => {
361                    let chunk = lua
362                        .load(*source)
363                        .set_name(format!("@embedded:blocks/{name}/init.lua"));
364                    let func = chunk.into_function()?;
365                    Ok(mlua::Value::Function(func))
366                }
367                None => {
368                    let msg = lua.create_string(format!("\n\tno embedded block '{name}'"))?;
369                    Ok(mlua::Value::String(msg))
370                }
371            })?;
372        // Append as the last searcher so filesystem paths remain preferred.
373        let next_idx = searchers.raw_len() + 1;
374        searchers.raw_set(next_idx, loader)?;
375
376        Ok(())
377    }
378}
379
380/// Spawn the dedicated handler Isle.
381///
382/// The handler Isle runs Lua bus handlers (`bus.on` / `bus.on_any`) on a
383/// separate OS thread with its own `tokio` current-thread runtime, keeping
384/// CPU-bound handlers from starving the main Isle's grace timers.
385///
386/// Bridge registration is deferred to a follow-up `exec` in `run()` because
387/// `HostContext` is not constructible until both Isles exist (the struct
388/// itself holds `Arc<AsyncIsle>` for both).
389async fn spawn_handler_isle(
390    script_name: String,
391    script_dir: String,
392    blocks_paths: String,
393    prompt: Option<String>,
394    context: Option<String>,
395) -> BlockResult<(Arc<AsyncIsle>, AsyncIsleDriver)> {
396    let init = build_isle_init(script_name, script_dir, blocks_paths, prompt, context);
397    let (isle, driver) = AsyncIsle::builder()
398        .thread_name("agent-block-handler-isle")
399        .spawn(init)
400        .await
401        .map_err(|e| BlockError::Runtime(format!("handler isle spawn failed: {e}")))?;
402    info!(
403        thread_name = "agent-block-handler-isle",
404        "handler Isle spawned"
405    );
406    Ok((Arc::new(isle), driver))
407}
408
409fn hex_decode_32(s: &str) -> Result<[u8; 32], String> {
410    let s = s.trim();
411    if s.len() != 64 {
412        return Err(format!("expected 64 hex chars, got {}", s.len()));
413    }
414    let mut out = [0u8; 32];
415    for (i, byte) in out.iter_mut().enumerate() {
416        let hi = u8::from_str_radix(&s[2 * i..2 * i + 1], 16)
417            .map_err(|e| format!("invalid hex at position {}: {e}", 2 * i))?;
418        let lo = u8::from_str_radix(&s[2 * i + 1..2 * i + 2], 16)
419            .map_err(|e| format!("invalid hex at position {}: {e}", 2 * i + 1))?;
420        *byte = (hi << 4) | lo;
421    }
422    Ok(out)
423}
424
425pub async fn run(config: BlockConfig) -> BlockResult<()> {
426    // ── Resolve sources ───────────────────────────────────────────
427    // Convert the `Source` enums on `BlockConfig` to their concrete
428    // payloads before any Isle setup. `File`/`Path`/`Env` variants
429    // read from disk / environment exactly once, here at the start.
430    let (script_source, script_name, script_dir_pathbuf) = match &config.script {
431        ScriptSource::Path(p) => {
432            let source = std::fs::read_to_string(p)
433                .map_err(|e| BlockError::Script(format!("{}: {e}", p.display())))?;
434            let name = p
435                .file_name()
436                .map(|n| n.to_string_lossy().to_string())
437                .unwrap_or_else(|| "unknown".to_string());
438            let dir = p
439                .parent()
440                .map(|d| d.to_path_buf())
441                .unwrap_or_else(|| PathBuf::from("."));
442            (source, name, dir)
443        }
444        ScriptSource::Inline { source, name } => {
445            (source.clone(), name.clone(), config.project_root.clone())
446        }
447        ScriptSource::DefaultAgent => (
448            DEFAULT_AGENT_INVOKER.to_string(),
449            "default_agent_invoker.lua".to_string(),
450            config.project_root.clone(),
451        ),
452    };
453
454    let prompt_resolved: Option<String> = match &config.prompt {
455        Some(PromptSource::Inline(s)) => Some(s.clone()),
456        Some(PromptSource::File(p)) => Some(
457            std::fs::read_to_string(p)
458                .map_err(|e| BlockError::Script(format!("prompt file {}: {e}", p.display())))?,
459        ),
460        None => None,
461    };
462    let context_resolved: Option<String> = match &config.context {
463        Some(PromptSource::Inline(s)) => Some(s.clone()),
464        Some(PromptSource::File(p)) => Some(
465            std::fs::read_to_string(p)
466                .map_err(|e| BlockError::Script(format!("context file {}: {e}", p.display())))?,
467        ),
468        None => None,
469    };
470    let secret_key_resolved: Option<String> = match &config.secret_key {
471        Some(SecretKeySource::Inline(s)) => Some(s.clone()),
472        Some(SecretKeySource::Env(var)) => std::env::var(var).ok(),
473        None => None,
474    };
475
476    // NOTE: We previously held entered span guards across awaits for nested
477    // span context. That made the `run()` future `!Send`, which prevents
478    // SDK consumers from `tokio::spawn(run(config))`. Span context is
479    // attached to events via fields on the `info_span!` calls below; the
480    // missing nesting is an acceptable trade-off for `Send` correctness.
481    let _root_span = info_span!("agent_block", script = %script_name);
482
483    // ── .env ──────────────────────────────────────────────────────
484    // Load .env from project_root if present. Variables are merged into
485    // the process environment so Lua's `std.env.get()` picks them up.
486    let env_path = config.project_root.join(".env");
487    match dotenvy::from_path(&env_path) {
488        Ok(()) => info!(path = %env_path.display(), ".env loaded"),
489        Err(dotenvy::Error::Io(_)) => {} // file not found — fine
490        Err(e) => tracing::warn!(path = %env_path.display(), error = %e, ".env parse error"),
491    }
492
493    // ── Init ──────────────────────────────────────────────────────
494    let _init_span = info_span!("init");
495
496    // ── EventBus channel ─────────────────────────────────────────────
497    // Construct the bounded mpsc BEFORE MeshAgent::connect so the relay
498    // handler can hold a `bus_tx` clone and forward incoming requests
499    // into the dispatcher. Capacity is ENV-driven (see bridge::config).
500    let bus_capacity = crate::bridge::config::bus_capacity();
501    let (bus_tx, bus_rx) = mpsc::channel::<Event>(bus_capacity);
502    let event_bus = Arc::new(Mutex::new(Some(EventBus::new(bus_rx))));
503
504    // ── Pre-install host-side Rust handlers ───────────────────────────
505    // SDK consumers attach Rust handlers via `BlockConfig.host_handlers`
506    // so that script-side `bus.emit(kind, payload)` is captured by a Rust
507    // `Arc<dyn Handler>` instead of being dispatched to a Lua function.
508    // Registered here (before any Lua bridge registers handlers and before
509    // `bus.serve` takes ownership of the bus) so the EventBus already
510    // carries the host handlers when the script starts.
511    if !config.host_handlers.is_empty() {
512        let mut guard = event_bus
513            .lock()
514            .map_err(|_| BlockError::Bus("event_bus mutex poisoned".into()))?;
515        let bus = guard
516            .as_mut()
517            .ok_or_else(|| BlockError::Bus("event_bus already taken".into()))?;
518        for (kind, handler) in &config.host_handlers {
519            bus.on(kind.clone(), Arc::clone(handler))
520                .map_err(|e| BlockError::Bus(format!("host_handlers on({kind}): {e}")))?;
521        }
522        info!(
523            count = config.host_handlers.len(),
524            "host handlers pre-installed"
525        );
526    }
527
528    // ── auto-serve: background dispatcher for SDK-embed callers ───────
529    // When `auto_serve_bus` is on and host handlers are installed, take the
530    // EventBus out of the Mutex *before* the script runs and spawn the
531    // dispatcher loop on the runtime. This lets `bus.emit(kind, payload)`
532    // from the script reach the host handler without requiring the script
533    // to call `bus.serve()` (which blocks on signals and never returns
534    // under programmatic embedding).
535    let auto_serve = config.auto_serve_bus && !config.host_handlers.is_empty();
536    let auto_serve_state: Option<(tokio::task::JoinHandle<()>, CancellationToken)> = if auto_serve {
537        let bus = {
538            let mut guard = event_bus
539                .lock()
540                .map_err(|_| BlockError::Bus("event_bus mutex poisoned".into()))?;
541            guard
542                .take()
543                .ok_or_else(|| BlockError::Bus("event_bus already taken".into()))?
544        };
545        let token = CancellationToken::new();
546        let token_for_task = token.clone();
547        let handle = tokio::spawn(async move {
548            let mut bus = bus;
549            if let Err(e) = bus.run(token_for_task).await {
550                tracing::error!(error = %e, "auto-serve: dispatcher loop returned error");
551            }
552        });
553        info!("auto-serve: dispatcher spawned");
554        Some((handle, token))
555    } else {
556        None
557    };
558
559    let mesh_agent = if let Some(ref relay_url) = config.relay_url {
560        let keypair = match &secret_key_resolved {
561            Some(hex_str) => {
562                let bytes = hex_decode_32(hex_str)
563                    .map_err(|e| BlockError::Runtime(format!("secret-key: {e}")))?;
564                agent_mesh_core::identity::AgentKeypair::from_bytes(&bytes)
565            }
566            None => agent_mesh_core::identity::AgentKeypair::generate(),
567        };
568        info!(agent_id = %keypair.agent_id(), "mesh identity");
569        let acl = agent_mesh_core::acl::AclPolicy {
570            default_deny: false,
571            rules: vec![],
572        };
573        let handler: Arc<dyn agent_mesh_sdk::RequestHandler> =
574            Arc::new(BusRelayHandler::new(bus_tx.clone()));
575        let url = relay_url.clone();
576        let agent = agent_mesh_sdk::MeshAgent::connect(keypair, &url, acl, handler)
577            .await
578            .map_err(|e| BlockError::Mesh(format!("connect to {relay_url} failed: {e}")))?;
579        info!(relay_url = %relay_url, "mesh connected");
580        Some(Arc::new(agent))
581    } else {
582        None
583    };
584
585    let mcp_manager = Arc::new(RwLock::new(McpManager::with_rpc_timeout(
586        config.mcp_rpc_timeout,
587    )?));
588
589    // Resolve project_root to absolute path.
590    // canonicalize() can fail if the path doesn't exist; fall back to
591    // joining with current_dir to guarantee an absolute path.
592    let project_root = config
593        .project_root
594        .canonicalize()
595        .or_else(|_| std::env::current_dir().map(|cwd| cwd.join(&config.project_root)))?;
596
597    let http_client = reqwest::Client::new();
598
599    // ── SQLite init (kv + sql get separate DB files) ──────────────────────
600    // All knobs are ENV-driven (see `bridge/config.rs`).
601    let sql_path = crate::bridge::config::sql_path().map_err(BlockError::Runtime)?;
602    let (sql_conn, sql_interrupt) = open_sqlite(&sql_path, "sql")?;
603
604    let kv_path = crate::bridge::config::kv_path().map_err(BlockError::Runtime)?;
605    let (kv_conn, kv_interrupt) = open_sqlite(&kv_path, "kv")?;
606
607    let ts_path = crate::bridge::config::ts_path().map_err(BlockError::Runtime)?;
608    let (ts_conn, ts_interrupt) = open_sqlite(&ts_path, "ts")?;
609
610    // Use the script dir derived from the resolved `ScriptSource` for
611    // `package.path` lookups. For inline / default-agent variants the dir
612    // falls back to `project_root` (set during source resolution above).
613    let script_dir = script_dir_pathbuf.to_string_lossy().to_string();
614
615    // Precompute values captured by the init closure so we don't need to
616    // move the full `HostContext` into it (HostContext now holds
617    // `Arc<AsyncIsle>`, which is available only after `AsyncIsle::spawn`
618    // returns — classic chicken-and-egg). All bridge registrations run in a
619    // second pass via `isle.exec` below.
620    let blocks_paths = build_blocks_path(&project_root);
621    let prompt = prompt_resolved.clone();
622    let context = context_resolved.clone();
623
624    // ── main Isle ─────────────────────────────────────────────────
625    let (isle, driver) = AsyncIsle::spawn(build_isle_init(
626        script_name.clone(),
627        script_dir.clone(),
628        blocks_paths.clone(),
629        prompt.clone(),
630        context.clone(),
631    ))
632    .await
633    .map_err(|e| BlockError::Runtime(format!("AsyncIsle spawn failed: {e}")))?;
634    let isle = Arc::new(isle);
635
636    // ── handler Isle (sequential, dependencies are trivial) ────────
637    let (handler_isle, handler_driver) = spawn_handler_isle(
638        script_name.clone(),
639        script_dir.clone(),
640        blocks_paths.clone(),
641        prompt,
642        context,
643    )
644    .await?;
645
646    // Wire both Isles into McpManager so Lua notification callbacks can be
647    // dispatched from the rmcp task thread.
648    // - handler_isle: sampling/createMessage dispatch (exec on handler Isle)
649    // - main_isle: progress/log notification dispatch (exec on main Isle so
650    //   user callback upvalues are preserved — no bytecode dump/reload needed)
651    {
652        let mut mgr = mcp_manager.write().await;
653        mgr.set_handler_isle(Arc::clone(&handler_isle));
654        mgr.set_main_isle(Arc::clone(&isle));
655    }
656
657    // ── HostContext + bridge registration ──────────────────────────────
658    // Wrap the isle in an Arc so `HostContext` can hand it to
659    // `bridge::bus` (which uses `AsyncIsle::coroutine_call` to invoke Lua
660    // handlers from the EventBus dispatcher task).
661    let ctx = HostContext {
662        project_root,
663        mesh_agent,
664        mcp_manager: Arc::clone(&mcp_manager),
665        http_client,
666        sql_conn,
667        sql_interrupt,
668        kv_conn,
669        kv_interrupt,
670        ts_conn,
671        ts_interrupt,
672        isle: Arc::clone(&isle),
673        handler_isle: Arc::clone(&handler_isle),
674        bus_tx: bus_tx.clone(),
675        event_bus: Arc::clone(&event_bus),
676    };
677
678    {
679        let ctx = ctx.clone();
680        isle.exec(move |lua| {
681            bridge::register_all(lua, &ctx)
682                .map_err(|e| mlua_isle::IsleError::Lua(format!("bridge register failed: {e}")))?;
683            Ok(String::new())
684        })
685        .await
686        .map_err(|e| BlockError::Runtime(format!("bridge register: {e}")))?;
687    }
688
689    {
690        let ctx = ctx.clone();
691        handler_isle
692            .exec(move |lua| {
693                bridge::register_all_handler_side(lua, &ctx).map_err(|e| {
694                    mlua_isle::IsleError::Lua(format!("handler bridge register failed: {e}"))
695                })?;
696                Ok(String::new())
697            })
698            .await
699            .map_err(|e| BlockError::Runtime(format!("handler bridge register: {e}")))?;
700    }
701
702    drop(_init_span);
703
704    // ── Execute ───────────────────────────────────────────────────
705    // When `shutdown_token` is supplied, race the script future against
706    // the caller's cancellation signal. On cancel, propagate to the Isle
707    // via the AsyncTask's cancel token so the debug hook unwinds the Lua
708    // VM, then continue into the shutdown sequence below (we still want
709    // to release MCP/mesh handles and join the auto-serve dispatcher
710    // before returning).
711    let script_result: Result<(), BlockError> = {
712        let _exec_span = info_span!("execute", script = %script_name);
713
714        let mut task = isle.spawn_coroutine_eval(&script_source);
715        let task_cancel = task.cancel_token().clone();
716        match config.shutdown_token.as_ref() {
717            Some(token) => {
718                tokio::select! {
719                    biased;
720                    _ = token.cancelled() => {
721                        task_cancel.cancel();
722                        // Wait for the Isle to unwind so the VM is in a
723                        // consistent state before driver shutdown. The
724                        // debug hook fires at the next HOOK_INTERVAL.
725                        let _ = (&mut task).await;
726                        info!("shutdown_token: cancelled by caller");
727                        Err(BlockError::Cancelled)
728                    }
729                    res = &mut task => res.map(|_| ()).map_err(|e| BlockError::Script(format!("{e}"))),
730                }
731            }
732            None => (&mut task)
733                .await
734                .map(|_| ())
735                .map_err(|e| BlockError::Script(format!("{e}"))),
736        }
737    };
738
739    // ── auto-serve drain + cancel ─────────────────────────────────
740    // Let the dispatcher drain events queued by the script, then signal
741    // shutdown and bound the join. Mirrors `bus.serve`'s grace pattern.
742    if let Some((handle, token)) = auto_serve_state {
743        let grace_ms = crate::bridge::config::task_grace_ms();
744        let grace = Duration::from_millis(grace_ms);
745        tokio::time::sleep(grace).await;
746        token.cancel();
747        match tokio::time::timeout(grace, handle).await {
748            Ok(Ok(())) => info!("auto-serve: dispatcher shut down cleanly"),
749            Ok(Err(join_err)) => {
750                tracing::error!(error = %join_err, "auto-serve: dispatcher task join error");
751            }
752            Err(_) => {
753                tracing::warn!(
754                    grace_ms,
755                    "auto-serve: dispatcher join timed out after cancel; forcing exit"
756                );
757            }
758        }
759    }
760
761    // ── Shutdown ──────────────────────────────────────────────────
762    {
763        let _shutdown_span = info_span!("shutdown");
764
765        mcp_manager.write().await.disconnect_all().await?;
766
767        driver
768            .shutdown()
769            .await
770            .map_err(|e| BlockError::Runtime(format!("AsyncIsle shutdown failed: {e}")))?;
771
772        // Handler Isle shutdown is independent of main shutdown: a failure
773        // here (e.g. ThreadPanic on the handler thread) is logged but does
774        // not poison the main process exit. The main Isle has already
775        // been stopped cleanly above.
776        match handler_driver.shutdown().await {
777            Ok(()) => info!(
778                thread_name = "agent-block-handler-isle",
779                "handler Isle shut down"
780            ),
781            Err(e) => tracing::error!(
782                error = %e,
783                thread_name = "agent-block-handler-isle",
784                "handler Isle shutdown failed"
785            ),
786        }
787    }
788
789    script_result
790}
791
792/// mesh → bus source adapter.
793///
794/// Implements [`agent_mesh_sdk::RequestHandler`] by packaging every incoming
795/// mesh request into an [`Event`] with `kind = "mesh"`, pushing it onto the
796/// bounded `bus_tx` channel, and awaiting the Lua handler's ack over a
797/// oneshot channel carried inside the event.
798///
799/// Error paths (all `tracing::error!`-logged — silent-err-drop policy):
800///
801/// | Failure                   | Return value                           |
802/// |---------------------------|----------------------------------------|
803/// | `bus_tx.send` closed/full | `{"error": "bus channel closed"}`      |
804/// | ack receiver dropped      | `{"error": "ack dropped"}`             |
805/// | Lua handler `BlockError`  | `{"error": "<handler error>"}`         |
806/// | Handler exceeded 30s      | `{"error": "handler timeout"}`         |
807///
808/// The 30s ack timeout mirrors the client-side timeout on `mesh.request`
809/// (see `src/bridge/mesh.rs`).
810struct BusRelayHandler {
811    tx: mpsc::Sender<Event>,
812}
813
814impl BusRelayHandler {
815    fn new(tx: mpsc::Sender<Event>) -> Self {
816        Self { tx }
817    }
818}
819
820/// Bound used for both the mesh-adapter ack wait and other source timeouts.
821const BUS_ACK_TIMEOUT: Duration = Duration::from_secs(30);
822
823#[async_trait::async_trait]
824impl agent_mesh_sdk::RequestHandler for BusRelayHandler {
825    async fn handle(
826        &self,
827        from: &agent_mesh_core::identity::AgentId,
828        payload: &serde_json::Value,
829        _cancel: agent_mesh_sdk::CancelToken,
830    ) -> serde_json::Value {
831        let id = uuid::Uuid::new_v4().to_string();
832        let meta = serde_json::json!({"from": from.to_string()});
833        let (ack_tx, ack_rx) = oneshot::channel();
834        let event = Event {
835            kind: "mesh".into(),
836            id: id.clone(),
837            payload: payload.clone(),
838            meta,
839            ack_tx: Some(ack_tx),
840        };
841
842        if let Err(e) = self.tx.send(event).await {
843            tracing::error!(error = %e, id = %id, "bus channel closed; rejecting mesh request");
844            return serde_json::json!({"error": "bus channel closed"});
845        }
846
847        match tokio::time::timeout(BUS_ACK_TIMEOUT, ack_rx).await {
848            Ok(Ok(Ok(v))) => v,
849            Ok(Ok(Err(e))) => {
850                tracing::error!(id = %id, error = %e, "mesh handler returned error");
851                serde_json::json!({"error": e.to_string()})
852            }
853            Ok(Err(e)) => {
854                tracing::error!(id = %id, error = %e, "mesh ack receiver dropped");
855                serde_json::json!({"error": "ack dropped"})
856            }
857            Err(_) => {
858                tracing::error!(id = %id, timeout_secs = BUS_ACK_TIMEOUT.as_secs(), "mesh handler timeout");
859                serde_json::json!({"error": "handler timeout"})
860            }
861        }
862    }
863}