Skip to main content

agent_block_core/
host.rs

1//! Host — the thin Rust shell that wires up Lua VM, Mesh, HTTP, and MCP.
2//!
3//! # Responsibilities
4//!
5//! 1. Spawn an mlua-isle `AsyncIsle` (dedicated Lua VM thread with coroutine support)
6//! 2. Optionally connect to agent-mesh relay
7//! 3. Initialize the MCP manager for stdio-based MCP server connections
8//! 4. Inject all Lua stdlib bridges (`mesh.*`, `http.*`, `sh.*`, `tool.*`, `log.*`, `mcp.*`)
9//! 5. Execute the user-provided Lua script via `coroutine_eval` (async-aware)
10//! 6. Graceful shutdown (Isle + MCP servers + mesh)
11
12use std::collections::HashMap;
13use std::path::{Path, PathBuf};
14use std::sync::{Arc, Mutex};
15use std::time::Duration;
16use tokio::sync::{mpsc, oneshot, RwLock};
17
18use mlua_isle::{AsyncIsle, AsyncIsleDriver};
19use tracing::{info, info_span, warn};
20
21use crate::bridge;
22use crate::bus::{Event, EventBus, Handler};
23use agent_block_mcp::McpManager;
24use agent_block_types::error::{BlockError, BlockResult};
25use tokio_util::sync::CancellationToken;
26
27/// Embedded Lua sources for blocks/ StdPkg modules.
28/// These are baked into the binary at compile time so `cargo install` works
29/// without any extra file distribution.
30const EMBEDDED_BLOCKS: &[(&str, &str)] = &[
31    ("agent", include_str!("../blocks/agent/init.lua")),
32    ("session", include_str!("../blocks/session/init.lua")),
33    (
34        "compile_loop",
35        include_str!("../blocks/compile_loop/init.lua"),
36    ),
37];
38
39/// Embedded default agent invoker used by [`ScriptSource::DefaultAgent`].
40///
41/// Runs the StdPkg `agent` module with `_PROMPT` / `_CONTEXT` injected and
42/// emits the result on the EventBus. The emit kind is `"_"` — a neutral
43/// label with no SDK-side meaning. The result is intended to be received
44/// via [`BlockConfig::host_handler`] (the kind-agnostic single sink); the
45/// literal label is irrelevant to SDK consumers.
46const DEFAULT_AGENT_INVOKER: &str = r#"
47local agent = require("agent")
48local r = agent.run({
49    prompt = _PROMPT,
50    system = _CONTEXT,
51})
52bus.emit("_", r)
53"#;
54
55/// How the Lua script source for `run()` is supplied.
56///
57/// `Path` matches the CLI form (`agent-block -s <path>`), reading from
58/// the filesystem at start. `Inline` lets SDK consumers pass a script
59/// they hold in memory (compile-time `include_str!`, dynamically built
60/// string, etc.) without writing it to a tempfile. `DefaultAgent` uses
61/// an embedded invoker that runs the StdPkg `agent` module with the
62/// caller-supplied prompt/context and emits the result via
63/// `bus.emit("agent_result", ...)`.
64#[derive(Debug, Clone)]
65pub enum ScriptSource {
66    /// Read the script from a filesystem path at start.
67    Path(PathBuf),
68    /// Use the supplied source code directly.
69    Inline {
70        /// Lua source code.
71        source: String,
72        /// Display name used in tracing, error messages, and the Lua
73        /// `_SCRIPT_NAME` global (e.g. `"agent_invoker.lua"`).
74        name: String,
75    },
76    /// Use the embedded default agent invoker. `prompt` / `context`
77    /// are forwarded as `_PROMPT` / `_CONTEXT` Lua globals and the
78    /// agent result is emitted on the EventBus under a neutral label
79    /// (`"_"`). SDK consumers should pair this with
80    /// [`BlockConfig::host_handler`] (the kind-agnostic single sink)
81    /// and `auto_serve_bus = true`. The emit-kind is intentionally
82    /// meaningless; consumers that need string-keyed routing should
83    /// supply [`ScriptSource::Inline`] with their own invoker.
84    DefaultAgent,
85}
86
87/// How a string payload (prompt / system context) is supplied.
88///
89/// `Inline` is the literal string variant (CLI `--prompt` / `--context`).
90/// `File` reads the contents from disk at `run()` start (CLI
91/// `--prompt-file` / `--context-file`).
92#[derive(Debug, Clone)]
93pub enum PromptSource {
94    /// Literal string.
95    Inline(String),
96    /// Filesystem path; contents are read at `run()` start.
97    File(PathBuf),
98}
99
100/// How the Ed25519 mesh identity secret key is supplied.
101///
102/// `Inline` is a 64-hex literal. `Env` reads the named environment
103/// variable at `run()` start (CLI default uses
104/// `AGENT_BLOCK_MESH_SECRET_KEY`). Absence of any `SecretKeySource`
105/// (i.e. `BlockConfig.secret_key = None`) causes a random keypair to
106/// be generated, matching the prior behavior.
107#[derive(Debug, Clone)]
108pub enum SecretKeySource {
109    /// 64-character hex literal.
110    Inline(String),
111    /// Environment variable name to read at start.
112    Env(String),
113}
114
115/// Build the `blocks/` portion of `package.path` from filesystem locations.
116///
117/// Priority (highest first):
118/// 1. `project_root/blocks/` — user-customisable, overrides embedded StdPkg
119/// 2. `exe_dir/blocks/`      — development hot-reload (next to the binary)
120///
121/// Returns a semicolon-terminated string ready to prepend to `package.path`,
122/// or an empty string when no `blocks/` directories are found.
123fn build_blocks_path(project_root: &Path) -> String {
124    let mut out = String::new();
125
126    // 1. project_root/blocks/
127    let project_blocks = project_root.join("blocks");
128    if project_blocks.is_dir() {
129        let pb = project_blocks.to_string_lossy();
130        out.push_str(&format!("{pb}/?.lua;{pb}/?/init.lua;"));
131    }
132
133    // 2. exe_dir/blocks/
134    match std::env::current_exe() {
135        Ok(exe) => {
136            if let Some(exe_dir) = exe.parent() {
137                let exe_blocks = exe_dir.join("blocks");
138                if exe_blocks.is_dir() {
139                    let eb = exe_blocks.to_string_lossy();
140                    out.push_str(&format!("{eb}/?.lua;{eb}/?/init.lua;"));
141                }
142            }
143        }
144        Err(e) => {
145            warn!(error = %e, "current_exe() failed; skipping exe_dir/blocks/ from package.path");
146        }
147    }
148
149    out
150}
151
152pub struct BlockConfig {
153    /// Lua script to execute. See [`ScriptSource`] for the supported
154    /// shapes (filesystem path / inline source / embedded default
155    /// agent invoker).
156    pub script: ScriptSource,
157    pub project_root: PathBuf,
158    pub relay_url: Option<String>,
159    /// Ed25519 secret key for mesh identity. See [`SecretKeySource`]
160    /// for the supported shapes (inline 64-hex / environment variable).
161    /// `None` generates a random keypair. Required to talk to
162    /// registry/ACL-gated hosted meshes.
163    pub secret_key: Option<SecretKeySource>,
164    /// Per-RPC timeout for every MCP round-trip (connect / list / call).
165    /// Defaults to [`agent_block_mcp::DEFAULT_RPC_TIMEOUT`].
166    pub mcp_rpc_timeout: Duration,
167    /// Prompt payload injected as `_PROMPT` Lua global. See
168    /// [`PromptSource`] for the supported shapes. `None` leaves the
169    /// global unset.
170    pub prompt: Option<PromptSource>,
171    /// Context payload injected as `_CONTEXT` Lua global (typically
172    /// the system prompt). Same shape rules as [`Self::prompt`].
173    pub context: Option<PromptSource>,
174    /// Host-side Rust handlers pre-installed on the EventBus before the user
175    /// script starts. Each entry registers `handler` against `kind` via
176    /// [`EventBus::on`], so a script-side `bus.emit(kind, payload)` is
177    /// captured by the Rust handler rather than dispatched to a Lua function.
178    ///
179    /// Intended for SDK consumers that embed `agent-block-core` and need to
180    /// receive script output programmatically (e.g. a Spawner adapter that
181    /// turns LLM script output into a typed `WorkerResult`). Lua-side
182    /// `bus.on(kind, fn)` registrations layered on top of the handler Isle
183    /// are still possible, but the EventBus dispatches a single handler per
184    /// `kind` (last-write-wins), so host-side and Lua-side registrations on
185    /// the same `kind` collide; choose one side per routing key.
186    ///
187    /// Defaults to an empty map (no host handlers).
188    pub host_handlers: HashMap<String, Arc<dyn Handler>>,
189    /// Single host-side Rust handler that catches every event regardless
190    /// of `kind`. Internally registered via [`EventBus::on_any`], so it
191    /// acts as a fallback when no entry in [`Self::host_handlers`]
192    /// matches the incoming `kind`.
193    ///
194    /// This is the SDK-embed 1-shot sink: SDK consumers do not need to
195    /// invent or coordinate a string `kind` between the Lua script and
196    /// their Rust code. The agent invoker's emit-kind is irrelevant —
197    /// the handler receives every event.
198    ///
199    /// Use this when you want a single Rust handler to receive results
200    /// (typical embedded use). Use [`Self::host_handlers`] instead when
201    /// you actually need string-keyed routing (multi-source / multi-
202    /// handler dispatch). The two may coexist: kind-specific handlers
203    /// in `host_handlers` take precedence, and this single handler is
204    /// the fallback for unmatched kinds.
205    ///
206    /// Defaults to `None`.
207    pub host_handler: Option<Arc<dyn Handler>>,
208    /// When `true`, the EventBus dispatcher loop is driven in the background
209    /// for the duration of the script and shut down gracefully after the
210    /// script completes. Required for SDK-embed callers that supply
211    /// [`Self::host_handlers`] and need `bus.emit(kind, payload)` events
212    /// emitted from the script to actually reach those handlers without
213    /// requiring the script to call `bus.serve()` (which blocks on
214    /// SIGTERM / Ctrl+C and never returns under programmatic embedding).
215    ///
216    /// After the script finishes, the dispatcher is given a grace window
217    /// (`AGENT_BLOCK_TASK_GRACE_MS`, default 1000ms) to drain queued events
218    /// and finish any in-flight handler, then is cancelled.
219    ///
220    /// Mutually exclusive with Lua-side `bus.serve()`: enabling this flag
221    /// takes ownership of the EventBus before the script runs, so a script
222    /// that calls `bus.on(...)` followed by `bus.serve()` will error
223    /// ("bus.serve() has already taken ownership"). Use this flag when the
224    /// script's sole purpose is to push events to host handlers.
225    ///
226    /// Defaults to `false` (legacy behavior: dispatcher only runs when the
227    /// script calls `bus.serve()`).
228    pub auto_serve_bus: bool,
229    /// Optional caller-supplied cancellation token. When cancelled, the
230    /// in-flight script is interrupted via the Isle's debug-hook cancel
231    /// path, the auto-serve dispatcher (if any) is shut down, and `run()`
232    /// returns `Err(BlockError::Cancelled)`.
233    ///
234    /// Intended for SDK consumers that spawn `run()` as a tokio task and
235    /// need an out-of-band abort signal (timeouts, parent-task cancellation
236    /// propagation, user-driven stop). The token is observed across the
237    /// `coroutine_eval` await; once cancellation propagates, the shutdown
238    /// sequence (MCP disconnect, Isle drivers, auto-serve dispatcher)
239    /// still runs so file descriptors and remote handles are released.
240    ///
241    /// Defaults to `None` (legacy behavior: `run()` only completes when
242    /// the script returns naturally).
243    pub shutdown_token: Option<CancellationToken>,
244}
245
246/// Shared context passed into Lua bridge functions.
247#[derive(Clone)]
248pub struct HostContext {
249    pub project_root: PathBuf,
250    pub mesh_agent: Option<Arc<agent_mesh_sdk::MeshAgent>>,
251    pub mcp_manager: Arc<RwLock<McpManager>>,
252    /// Shared async HTTP client for `http.*` bridge.
253    pub http_client: reqwest::Client,
254    /// Shared SQLite connection for `sql.*` bridge (user tables).
255    pub sql_conn: Arc<Mutex<rusqlite::Connection>>,
256    /// Interrupt handle for the sql connection.
257    /// Used to cancel in-flight queries on timeout (see `bridge/sql.rs`).
258    pub sql_interrupt: Arc<rusqlite::InterruptHandle>,
259    /// Shared SQLite connection for `kv.*` bridge (`__kv` table only).
260    /// Separate from sql_conn so KV scratch state and user SQL data don't
261    /// share WAL, page cache, or backup lifecycle.
262    pub kv_conn: Arc<Mutex<rusqlite::Connection>>,
263    /// Interrupt handle for the kv connection.
264    pub kv_interrupt: Arc<rusqlite::InterruptHandle>,
265    /// Shared SQLite connection for `ts.*` bridge (TSDB — time-series table).
266    /// Separate DB file so TSDB WAL does not share page cache with kv/sql.
267    pub ts_conn: Arc<Mutex<rusqlite::Connection>>,
268    /// Interrupt handle for the ts connection.
269    /// Used by `bridge::ts` to cancel in-flight queries on timeout (Subtask 2).
270    #[allow(dead_code)]
271    pub ts_interrupt: Arc<rusqlite::InterruptHandle>,
272    /// Async handle to the main Isle Lua VM that runs the user script via
273    /// `coroutine_eval`. After Subtask 2, `bridge::bus` no longer dispatches
274    /// handlers against this Isle; handlers live on `handler_isle` instead.
275    /// The field is retained because bridge code still keyed to the main
276    /// Isle (future `coroutine_call` back-edges, introspection APIs) may
277    /// need it, and removing it would force another HostContext reshape.
278    #[allow(dead_code)]
279    pub isle: Arc<AsyncIsle>,
280    /// Dedicated Isle for EventBus handler execution. Lua handlers
281    /// registered via `bus.on` / `bus.on_any` run here so that CPU-bound
282    /// handler code does not occupy the main Isle's LocalSet and block
283    /// grace timers / shutdown wakers on the main VM side.
284    ///
285    /// Used by `bridge::bus` to forward handler bytecode
286    /// (`Function::dump(true)` → `handler_isle.exec(...)`) and by
287    /// [`LuaHandler::call`](crate::bridge::bus) to dispatch via
288    /// `coroutine_call("__bus_dispatch", ...)`.
289    pub handler_isle: Arc<AsyncIsle>,
290    /// Ingress sender for the EventBus. Adapters (mesh / webhook / …)
291    /// clone this and push `Event`s. The ST3 mesh adapter captures its own
292    /// clone at `MeshAgent::connect` time, so the field itself is not read
293    /// elsewhere in the ST3 cut — kept `pub` for ST4+ adapter wiring.
294    #[allow(dead_code)]
295    pub bus_tx: mpsc::Sender<Event>,
296    /// Mutex-wrapped `Option<EventBus>` so `bus.on` / `bus.on_any` can lock
297    /// briefly from sync Lua context, and `bus.serve` can `Option::take`
298    /// ownership before entering the long-lived `run()` await (avoiding the
299    /// await-holding-lock anti-pattern on a `std::sync::Mutex`).
300    pub event_bus: Arc<Mutex<Option<EventBus>>>,
301}
302
303/// Open a SQLite connection at `path` (or `:memory:`) and apply the shared
304/// pragmas driven by ENV (`journal_mode`, `busy_timeout`). Returns the
305/// connection wrapped in Arc<Mutex<_>> together with its interrupt handle.
306///
307/// `label` is used only for the init log line (`sql` / `kv`) so that the two
308/// databases are distinguishable in tracing output.
309fn open_sqlite(
310    path: &Path,
311    label: &'static str,
312) -> BlockResult<(
313    Arc<Mutex<rusqlite::Connection>>,
314    Arc<rusqlite::InterruptHandle>,
315)> {
316    let is_memory = crate::bridge::config::is_memory_sql(path);
317    if !is_memory {
318        if let Some(parent) = path.parent() {
319            std::fs::create_dir_all(parent)
320                .map_err(|e| BlockError::Runtime(format!("{label} dir create: {e}")))?;
321        }
322    }
323    let conn = rusqlite::Connection::open(path)
324        .map_err(|e| BlockError::Runtime(format!("sqlite open {}: {e}", path.display())))?;
325    if !is_memory {
326        let journal = crate::bridge::config::sql_journal_mode();
327        conn.pragma_update(None, "journal_mode", &journal)
328            .map_err(|e| BlockError::Runtime(format!("journal_mode={journal}: {e}")))?;
329    }
330    let busy_ms = crate::bridge::config::sql_busy_timeout().as_millis() as i64;
331    conn.pragma_update(None, "busy_timeout", busy_ms)
332        .map_err(|e| BlockError::Runtime(format!("busy_timeout pragma: {e}")))?;
333    info!(label, path = %path.display(), busy_ms, "sqlite initialized");
334    let interrupt = Arc::new(conn.get_interrupt_handle());
335    let conn = Arc::new(Mutex::new(conn));
336    Ok((conn, interrupt))
337}
338
339/// Build the init closure shared between the main Isle and the handler
340/// Isle.  Sets `_SCRIPT_NAME`, registers `mlua-batteries` `std.*`, and
341/// configures `package.path` / `package.searchers` so `require "agent"`
342/// (and any `blocks/` module) works inside the Lua VM.
343///
344/// Returns an `FnOnce` so each call produces a fresh closure; this lets
345/// both Isles be spawned from the same config without `Clone` bounds on
346/// the captured `HashMap`.
347fn build_isle_init(
348    script_name: String,
349    script_dir: String,
350    blocks_paths: String,
351    prompt: Option<String>,
352    context: Option<String>,
353) -> impl FnOnce(&mlua::Lua) -> mlua::Result<()> + Send + 'static {
354    move |lua| {
355        // Set script name before registering bridges (used by log.* for attribution)
356        lua.globals().set("_SCRIPT_NAME", script_name.as_str())?;
357        if let Some(ref p) = prompt {
358            lua.globals().set("_PROMPT", p.as_str())?;
359        }
360        if let Some(ref c) = context {
361            lua.globals().set("_CONTEXT", c.as_str())?;
362        }
363
364        mlua_batteries::register_all(lua, "std")?;
365
366        // ── package.path ──────────────────────────────────────────────
367        // Priority: script_dir > project_root/blocks/ > exe_dir/blocks/ > default
368        let package: mlua::Table = lua.globals().get("package")?;
369        let current_path: String = package.get("path")?;
370        let new_path =
371            format!("{script_dir}/?.lua;{script_dir}/?/init.lua;{blocks_paths}{current_path}");
372        package.set("path", new_path)?;
373
374        // ── package.searchers — embedded fallback ─────────────────────
375        // Register a custom searcher that loads blocks/ modules from the
376        // sources baked in at compile time.  This is the lowest-priority
377        // searcher so filesystem copies always win.
378        let embedded: HashMap<&'static str, &'static str> =
379            EMBEDDED_BLOCKS.iter().copied().collect();
380
381        let searchers: mlua::Table = package.get("searchers")?;
382        let loader =
383            lua.create_function(move |lua, name: String| match embedded.get(name.as_str()) {
384                Some(source) => {
385                    let chunk = lua
386                        .load(*source)
387                        .set_name(format!("@embedded:blocks/{name}/init.lua"));
388                    let func = chunk.into_function()?;
389                    Ok(mlua::Value::Function(func))
390                }
391                None => {
392                    let msg = lua.create_string(format!("\n\tno embedded block '{name}'"))?;
393                    Ok(mlua::Value::String(msg))
394                }
395            })?;
396        // Append as the last searcher so filesystem paths remain preferred.
397        let next_idx = searchers.raw_len() + 1;
398        searchers.raw_set(next_idx, loader)?;
399
400        Ok(())
401    }
402}
403
404/// Spawn the dedicated handler Isle.
405///
406/// The handler Isle runs Lua bus handlers (`bus.on` / `bus.on_any`) on a
407/// separate OS thread with its own `tokio` current-thread runtime, keeping
408/// CPU-bound handlers from starving the main Isle's grace timers.
409///
410/// Bridge registration is deferred to a follow-up `exec` in `run()` because
411/// `HostContext` is not constructible until both Isles exist (the struct
412/// itself holds `Arc<AsyncIsle>` for both).
413async fn spawn_handler_isle(
414    script_name: String,
415    script_dir: String,
416    blocks_paths: String,
417    prompt: Option<String>,
418    context: Option<String>,
419) -> BlockResult<(Arc<AsyncIsle>, AsyncIsleDriver)> {
420    let init = build_isle_init(script_name, script_dir, blocks_paths, prompt, context);
421    let (isle, driver) = AsyncIsle::builder()
422        .thread_name("agent-block-handler-isle")
423        .spawn(init)
424        .await
425        .map_err(|e| BlockError::Runtime(format!("handler isle spawn failed: {e}")))?;
426    info!(
427        thread_name = "agent-block-handler-isle",
428        "handler Isle spawned"
429    );
430    Ok((Arc::new(isle), driver))
431}
432
433fn hex_decode_32(s: &str) -> Result<[u8; 32], String> {
434    let s = s.trim();
435    if s.len() != 64 {
436        return Err(format!("expected 64 hex chars, got {}", s.len()));
437    }
438    let mut out = [0u8; 32];
439    for (i, byte) in out.iter_mut().enumerate() {
440        let hi = u8::from_str_radix(&s[2 * i..2 * i + 1], 16)
441            .map_err(|e| format!("invalid hex at position {}: {e}", 2 * i))?;
442        let lo = u8::from_str_radix(&s[2 * i + 1..2 * i + 2], 16)
443            .map_err(|e| format!("invalid hex at position {}: {e}", 2 * i + 1))?;
444        *byte = (hi << 4) | lo;
445    }
446    Ok(out)
447}
448
449pub async fn run(config: BlockConfig) -> BlockResult<()> {
450    // ── Resolve sources ───────────────────────────────────────────
451    // Convert the `Source` enums on `BlockConfig` to their concrete
452    // payloads before any Isle setup. `File`/`Path`/`Env` variants
453    // read from disk / environment exactly once, here at the start.
454    let (script_source, script_name, script_dir_pathbuf) = match &config.script {
455        ScriptSource::Path(p) => {
456            let source = std::fs::read_to_string(p)
457                .map_err(|e| BlockError::Script(format!("{}: {e}", p.display())))?;
458            let name = p
459                .file_name()
460                .map(|n| n.to_string_lossy().to_string())
461                .unwrap_or_else(|| "unknown".to_string());
462            let dir = p
463                .parent()
464                .map(|d| d.to_path_buf())
465                .unwrap_or_else(|| PathBuf::from("."));
466            (source, name, dir)
467        }
468        ScriptSource::Inline { source, name } => {
469            (source.clone(), name.clone(), config.project_root.clone())
470        }
471        ScriptSource::DefaultAgent => (
472            DEFAULT_AGENT_INVOKER.to_string(),
473            "default_agent_invoker.lua".to_string(),
474            config.project_root.clone(),
475        ),
476    };
477
478    let prompt_resolved: Option<String> = match &config.prompt {
479        Some(PromptSource::Inline(s)) => Some(s.clone()),
480        Some(PromptSource::File(p)) => Some(
481            std::fs::read_to_string(p)
482                .map_err(|e| BlockError::Script(format!("prompt file {}: {e}", p.display())))?,
483        ),
484        None => None,
485    };
486    let context_resolved: Option<String> = match &config.context {
487        Some(PromptSource::Inline(s)) => Some(s.clone()),
488        Some(PromptSource::File(p)) => Some(
489            std::fs::read_to_string(p)
490                .map_err(|e| BlockError::Script(format!("context file {}: {e}", p.display())))?,
491        ),
492        None => None,
493    };
494    let secret_key_resolved: Option<String> = match &config.secret_key {
495        Some(SecretKeySource::Inline(s)) => Some(s.clone()),
496        Some(SecretKeySource::Env(var)) => std::env::var(var).ok(),
497        None => None,
498    };
499
500    // NOTE: We previously held entered span guards across awaits for nested
501    // span context. That made the `run()` future `!Send`, which prevents
502    // SDK consumers from `tokio::spawn(run(config))`. Span context is
503    // attached to events via fields on the `info_span!` calls below; the
504    // missing nesting is an acceptable trade-off for `Send` correctness.
505    let _root_span = info_span!("agent_block", script = %script_name);
506
507    // ── .env ──────────────────────────────────────────────────────
508    // Load .env from project_root if present. Variables are merged into
509    // the process environment so Lua's `std.env.get()` picks them up.
510    let env_path = config.project_root.join(".env");
511    match dotenvy::from_path(&env_path) {
512        Ok(()) => info!(path = %env_path.display(), ".env loaded"),
513        Err(dotenvy::Error::Io(_)) => {} // file not found — fine
514        Err(e) => tracing::warn!(path = %env_path.display(), error = %e, ".env parse error"),
515    }
516
517    // ── Init ──────────────────────────────────────────────────────
518    let _init_span = info_span!("init");
519
520    // ── EventBus channel ─────────────────────────────────────────────
521    // Construct the bounded mpsc BEFORE MeshAgent::connect so the relay
522    // handler can hold a `bus_tx` clone and forward incoming requests
523    // into the dispatcher. Capacity is ENV-driven (see bridge::config).
524    let bus_capacity = crate::bridge::config::bus_capacity();
525    let (bus_tx, bus_rx) = mpsc::channel::<Event>(bus_capacity);
526    let event_bus = Arc::new(Mutex::new(Some(EventBus::new(bus_rx))));
527
528    // ── Pre-install host-side Rust handlers ───────────────────────────
529    // SDK consumers attach Rust handlers via `BlockConfig.host_handlers`
530    // so that script-side `bus.emit(kind, payload)` is captured by a Rust
531    // `Arc<dyn Handler>` instead of being dispatched to a Lua function.
532    // Registered here (before any Lua bridge registers handlers and before
533    // `bus.serve` takes ownership of the bus) so the EventBus already
534    // carries the host handlers when the script starts.
535    // Install host-side Rust handlers: kind-specific entries from
536    // `host_handlers` and, when set, the kind-agnostic `host_handler`
537    // (registered via `on_any` as the fallback for unmatched kinds).
538    // SDK-embed 1-shot callers typically only set `host_handler`.
539    let has_kind_handlers = !config.host_handlers.is_empty();
540    let has_any_handler = config.host_handler.is_some();
541    if has_kind_handlers || has_any_handler {
542        let mut guard = event_bus
543            .lock()
544            .map_err(|_| BlockError::Bus("event_bus mutex poisoned".into()))?;
545        let bus = guard
546            .as_mut()
547            .ok_or_else(|| BlockError::Bus("event_bus already taken".into()))?;
548        for (kind, handler) in &config.host_handlers {
549            bus.on(kind.clone(), Arc::clone(handler))
550                .map_err(|e| BlockError::Bus(format!("host_handlers on({kind}): {e}")))?;
551        }
552        if let Some(any_handler) = &config.host_handler {
553            bus.on_any(Arc::clone(any_handler))
554                .map_err(|e| BlockError::Bus(format!("host_handler on_any: {e}")))?;
555        }
556        info!(
557            kind_handlers = config.host_handlers.len(),
558            any_handler = has_any_handler,
559            "host handlers pre-installed"
560        );
561    }
562
563    // ── auto-serve: background dispatcher for SDK-embed callers ───────
564    // When `auto_serve_bus` is on and at least one host-side handler
565    // (kind-specific or kind-agnostic) is installed, take the EventBus
566    // out of the Mutex *before* the script runs and spawn the dispatcher
567    // loop on the runtime. This lets `bus.emit(kind, payload)` from the
568    // script reach the host handler without requiring the script to call
569    // `bus.serve()` (which blocks on signals and never returns under
570    // programmatic embedding).
571    let auto_serve = config.auto_serve_bus && (has_kind_handlers || has_any_handler);
572    let auto_serve_state: Option<(tokio::task::JoinHandle<()>, CancellationToken)> = if auto_serve {
573        let bus = {
574            let mut guard = event_bus
575                .lock()
576                .map_err(|_| BlockError::Bus("event_bus mutex poisoned".into()))?;
577            guard
578                .take()
579                .ok_or_else(|| BlockError::Bus("event_bus already taken".into()))?
580        };
581        let token = CancellationToken::new();
582        let token_for_task = token.clone();
583        let handle = tokio::spawn(async move {
584            let mut bus = bus;
585            if let Err(e) = bus.run(token_for_task).await {
586                tracing::error!(error = %e, "auto-serve: dispatcher loop returned error");
587            }
588        });
589        info!("auto-serve: dispatcher spawned");
590        Some((handle, token))
591    } else {
592        None
593    };
594
595    let mesh_agent = if let Some(ref relay_url) = config.relay_url {
596        let keypair = match &secret_key_resolved {
597            Some(hex_str) => {
598                let bytes = hex_decode_32(hex_str)
599                    .map_err(|e| BlockError::Runtime(format!("secret-key: {e}")))?;
600                agent_mesh_core::identity::AgentKeypair::from_bytes(&bytes)
601            }
602            None => agent_mesh_core::identity::AgentKeypair::generate(),
603        };
604        info!(agent_id = %keypair.agent_id(), "mesh identity");
605        let acl = agent_mesh_core::acl::AclPolicy {
606            default_deny: false,
607            rules: vec![],
608        };
609        let handler: Arc<dyn agent_mesh_sdk::RequestHandler> =
610            Arc::new(BusRelayHandler::new(bus_tx.clone()));
611        let url = relay_url.clone();
612        let agent = agent_mesh_sdk::MeshAgent::connect(keypair, &url, acl, handler)
613            .await
614            .map_err(|e| BlockError::Mesh(format!("connect to {relay_url} failed: {e}")))?;
615        info!(relay_url = %relay_url, "mesh connected");
616        Some(Arc::new(agent))
617    } else {
618        None
619    };
620
621    let mcp_manager = Arc::new(RwLock::new(McpManager::with_rpc_timeout(
622        config.mcp_rpc_timeout,
623    )?));
624
625    // Resolve project_root to absolute path.
626    // canonicalize() can fail if the path doesn't exist; fall back to
627    // joining with current_dir to guarantee an absolute path.
628    let project_root = config
629        .project_root
630        .canonicalize()
631        .or_else(|_| std::env::current_dir().map(|cwd| cwd.join(&config.project_root)))?;
632
633    let http_client = reqwest::Client::new();
634
635    // ── SQLite init (kv + sql get separate DB files) ──────────────────────
636    // All knobs are ENV-driven (see `bridge/config.rs`).
637    let sql_path = crate::bridge::config::sql_path().map_err(BlockError::Runtime)?;
638    let (sql_conn, sql_interrupt) = open_sqlite(&sql_path, "sql")?;
639
640    let kv_path = crate::bridge::config::kv_path().map_err(BlockError::Runtime)?;
641    let (kv_conn, kv_interrupt) = open_sqlite(&kv_path, "kv")?;
642
643    let ts_path = crate::bridge::config::ts_path().map_err(BlockError::Runtime)?;
644    let (ts_conn, ts_interrupt) = open_sqlite(&ts_path, "ts")?;
645
646    // Use the script dir derived from the resolved `ScriptSource` for
647    // `package.path` lookups. For inline / default-agent variants the dir
648    // falls back to `project_root` (set during source resolution above).
649    let script_dir = script_dir_pathbuf.to_string_lossy().to_string();
650
651    // Precompute values captured by the init closure so we don't need to
652    // move the full `HostContext` into it (HostContext now holds
653    // `Arc<AsyncIsle>`, which is available only after `AsyncIsle::spawn`
654    // returns — classic chicken-and-egg). All bridge registrations run in a
655    // second pass via `isle.exec` below.
656    let blocks_paths = build_blocks_path(&project_root);
657    let prompt = prompt_resolved.clone();
658    let context = context_resolved.clone();
659
660    // ── main Isle ─────────────────────────────────────────────────
661    let (isle, driver) = AsyncIsle::spawn(build_isle_init(
662        script_name.clone(),
663        script_dir.clone(),
664        blocks_paths.clone(),
665        prompt.clone(),
666        context.clone(),
667    ))
668    .await
669    .map_err(|e| BlockError::Runtime(format!("AsyncIsle spawn failed: {e}")))?;
670    let isle = Arc::new(isle);
671
672    // ── handler Isle (sequential, dependencies are trivial) ────────
673    let (handler_isle, handler_driver) = spawn_handler_isle(
674        script_name.clone(),
675        script_dir.clone(),
676        blocks_paths.clone(),
677        prompt,
678        context,
679    )
680    .await?;
681
682    // Wire both Isles into McpManager so Lua notification callbacks can be
683    // dispatched from the rmcp task thread.
684    // - handler_isle: sampling/createMessage dispatch (exec on handler Isle)
685    // - main_isle: progress/log notification dispatch (exec on main Isle so
686    //   user callback upvalues are preserved — no bytecode dump/reload needed)
687    {
688        let mut mgr = mcp_manager.write().await;
689        mgr.set_handler_isle(Arc::clone(&handler_isle));
690        mgr.set_main_isle(Arc::clone(&isle));
691    }
692
693    // ── HostContext + bridge registration ──────────────────────────────
694    // Wrap the isle in an Arc so `HostContext` can hand it to
695    // `bridge::bus` (which uses `AsyncIsle::coroutine_call` to invoke Lua
696    // handlers from the EventBus dispatcher task).
697    let ctx = HostContext {
698        project_root,
699        mesh_agent,
700        mcp_manager: Arc::clone(&mcp_manager),
701        http_client,
702        sql_conn,
703        sql_interrupt,
704        kv_conn,
705        kv_interrupt,
706        ts_conn,
707        ts_interrupt,
708        isle: Arc::clone(&isle),
709        handler_isle: Arc::clone(&handler_isle),
710        bus_tx: bus_tx.clone(),
711        event_bus: Arc::clone(&event_bus),
712    };
713
714    {
715        let ctx = ctx.clone();
716        isle.exec(move |lua| {
717            bridge::register_all(lua, &ctx)
718                .map_err(|e| mlua_isle::IsleError::Lua(format!("bridge register failed: {e}")))?;
719            Ok(String::new())
720        })
721        .await
722        .map_err(|e| BlockError::Runtime(format!("bridge register: {e}")))?;
723    }
724
725    {
726        let ctx = ctx.clone();
727        handler_isle
728            .exec(move |lua| {
729                bridge::register_all_handler_side(lua, &ctx).map_err(|e| {
730                    mlua_isle::IsleError::Lua(format!("handler bridge register failed: {e}"))
731                })?;
732                Ok(String::new())
733            })
734            .await
735            .map_err(|e| BlockError::Runtime(format!("handler bridge register: {e}")))?;
736    }
737
738    drop(_init_span);
739
740    // ── Execute ───────────────────────────────────────────────────
741    // When `shutdown_token` is supplied, race the script future against
742    // the caller's cancellation signal. On cancel, propagate to the Isle
743    // via the AsyncTask's cancel token so the debug hook unwinds the Lua
744    // VM, then continue into the shutdown sequence below (we still want
745    // to release MCP/mesh handles and join the auto-serve dispatcher
746    // before returning).
747    let script_result: Result<(), BlockError> = {
748        let _exec_span = info_span!("execute", script = %script_name);
749
750        let mut task = isle.spawn_coroutine_eval(&script_source);
751        let task_cancel = task.cancel_token().clone();
752        match config.shutdown_token.as_ref() {
753            Some(token) => {
754                tokio::select! {
755                    biased;
756                    _ = token.cancelled() => {
757                        task_cancel.cancel();
758                        // Wait for the Isle to unwind so the VM is in a
759                        // consistent state before driver shutdown. The
760                        // debug hook fires at the next HOOK_INTERVAL.
761                        let _ = (&mut task).await;
762                        info!("shutdown_token: cancelled by caller");
763                        Err(BlockError::Cancelled)
764                    }
765                    res = &mut task => res.map(|_| ()).map_err(|e| BlockError::Script(format!("{e}"))),
766                }
767            }
768            None => (&mut task)
769                .await
770                .map(|_| ())
771                .map_err(|e| BlockError::Script(format!("{e}"))),
772        }
773    };
774
775    // ── auto-serve drain + cancel ─────────────────────────────────
776    // Let the dispatcher drain events queued by the script, then signal
777    // shutdown and bound the join. Mirrors `bus.serve`'s grace pattern.
778    if let Some((handle, token)) = auto_serve_state {
779        let grace_ms = crate::bridge::config::task_grace_ms();
780        let grace = Duration::from_millis(grace_ms);
781        tokio::time::sleep(grace).await;
782        token.cancel();
783        match tokio::time::timeout(grace, handle).await {
784            Ok(Ok(())) => info!("auto-serve: dispatcher shut down cleanly"),
785            Ok(Err(join_err)) => {
786                tracing::error!(error = %join_err, "auto-serve: dispatcher task join error");
787            }
788            Err(_) => {
789                tracing::warn!(
790                    grace_ms,
791                    "auto-serve: dispatcher join timed out after cancel; forcing exit"
792                );
793            }
794        }
795    }
796
797    // ── Shutdown ──────────────────────────────────────────────────
798    {
799        let _shutdown_span = info_span!("shutdown");
800
801        mcp_manager.write().await.disconnect_all().await?;
802
803        driver
804            .shutdown()
805            .await
806            .map_err(|e| BlockError::Runtime(format!("AsyncIsle shutdown failed: {e}")))?;
807
808        // Handler Isle shutdown is independent of main shutdown: a failure
809        // here (e.g. ThreadPanic on the handler thread) is logged but does
810        // not poison the main process exit. The main Isle has already
811        // been stopped cleanly above.
812        match handler_driver.shutdown().await {
813            Ok(()) => info!(
814                thread_name = "agent-block-handler-isle",
815                "handler Isle shut down"
816            ),
817            Err(e) => tracing::error!(
818                error = %e,
819                thread_name = "agent-block-handler-isle",
820                "handler Isle shutdown failed"
821            ),
822        }
823    }
824
825    script_result
826}
827
828/// mesh → bus source adapter.
829///
830/// Implements [`agent_mesh_sdk::RequestHandler`] by packaging every incoming
831/// mesh request into an [`Event`] with `kind = "mesh"`, pushing it onto the
832/// bounded `bus_tx` channel, and awaiting the Lua handler's ack over a
833/// oneshot channel carried inside the event.
834///
835/// Error paths (all `tracing::error!`-logged — silent-err-drop policy):
836///
837/// | Failure                   | Return value                           |
838/// |---------------------------|----------------------------------------|
839/// | `bus_tx.send` closed/full | `{"error": "bus channel closed"}`      |
840/// | ack receiver dropped      | `{"error": "ack dropped"}`             |
841/// | Lua handler `BlockError`  | `{"error": "<handler error>"}`         |
842/// | Handler exceeded 30s      | `{"error": "handler timeout"}`         |
843///
844/// The 30s ack timeout mirrors the client-side timeout on `mesh.request`
845/// (see `src/bridge/mesh.rs`).
846struct BusRelayHandler {
847    tx: mpsc::Sender<Event>,
848}
849
850impl BusRelayHandler {
851    fn new(tx: mpsc::Sender<Event>) -> Self {
852        Self { tx }
853    }
854}
855
856/// Bound used for both the mesh-adapter ack wait and other source timeouts.
857const BUS_ACK_TIMEOUT: Duration = Duration::from_secs(30);
858
859#[async_trait::async_trait]
860impl agent_mesh_sdk::RequestHandler for BusRelayHandler {
861    async fn handle(
862        &self,
863        from: &agent_mesh_core::identity::AgentId,
864        payload: &serde_json::Value,
865        _cancel: agent_mesh_sdk::CancelToken,
866    ) -> serde_json::Value {
867        let id = uuid::Uuid::new_v4().to_string();
868        let meta = serde_json::json!({"from": from.to_string()});
869        let (ack_tx, ack_rx) = oneshot::channel();
870        let event = Event {
871            kind: "mesh".into(),
872            id: id.clone(),
873            payload: payload.clone(),
874            meta,
875            ack_tx: Some(ack_tx),
876        };
877
878        if let Err(e) = self.tx.send(event).await {
879            tracing::error!(error = %e, id = %id, "bus channel closed; rejecting mesh request");
880            return serde_json::json!({"error": "bus channel closed"});
881        }
882
883        match tokio::time::timeout(BUS_ACK_TIMEOUT, ack_rx).await {
884            Ok(Ok(Ok(v))) => v,
885            Ok(Ok(Err(e))) => {
886                tracing::error!(id = %id, error = %e, "mesh handler returned error");
887                serde_json::json!({"error": e.to_string()})
888            }
889            Ok(Err(e)) => {
890                tracing::error!(id = %id, error = %e, "mesh ack receiver dropped");
891                serde_json::json!({"error": "ack dropped"})
892            }
893            Err(_) => {
894                tracing::error!(id = %id, timeout_secs = BUS_ACK_TIMEOUT.as_secs(), "mesh handler timeout");
895                serde_json::json!({"error": "handler timeout"})
896            }
897        }
898    }
899}