Skip to main content

agent_block_core/
host.rs

1//! Host — the thin Rust shell that wires up Lua VM, Mesh, HTTP, and MCP.
2//!
3//! # Responsibilities
4//!
5//! 1. Spawn an mlua-isle `AsyncIsle` (dedicated Lua VM thread with coroutine support)
6//! 2. Optionally connect to agent-mesh relay
7//! 3. Initialize the MCP manager for stdio-based MCP server connections
8//! 4. Inject all Lua stdlib bridges (`mesh.*`, `http.*`, `sh.*`, `tool.*`, `log.*`, `mcp.*`)
9//! 5. Execute the user-provided Lua script via `coroutine_eval` (async-aware)
10//! 6. Graceful shutdown (Isle + MCP servers + mesh)
11
12use std::collections::HashMap;
13use std::path::{Path, PathBuf};
14use std::sync::{Arc, Mutex};
15use std::time::Duration;
16use tokio::sync::{mpsc, oneshot, RwLock};
17
18use mlua_isle::{AsyncIsle, AsyncIsleDriver};
19use tracing::{info, info_span, warn};
20
21use crate::bridge;
22use crate::bus::{Event, EventBus, Handler};
23use agent_block_mcp::McpManager;
24use agent_block_types::error::{BlockError, BlockResult};
25use tokio_util::sync::CancellationToken;
26
27/// Embedded Lua sources for blocks/ StdPkg modules.
28/// These are baked into the binary at compile time so `cargo install` works
29/// without any extra file distribution.
30const EMBEDDED_BLOCKS: &[(&str, &str)] = &[
31    ("agent", include_str!("../blocks/agent/init.lua")),
32    ("session", include_str!("../blocks/session/init.lua")),
33    (
34        "compile_loop",
35        include_str!("../blocks/compile_loop/init.lua"),
36    ),
37];
38
39/// Embedded default agent invoker used by [`ScriptSource::DefaultAgent`].
40///
41/// Runs the StdPkg `agent` module with `_PROMPT` / `_CONTEXT` injected and
42/// emits the result on the EventBus. The emit kind is `"_"` — a neutral
43/// label with no SDK-side meaning. The result is intended to be received
44/// via [`BlockConfig::host_handler`] (the kind-agnostic single sink); the
45/// literal label is irrelevant to SDK consumers.
46const DEFAULT_AGENT_INVOKER: &str = r#"
47local agent = require("agent")
48local r = agent.run({
49    prompt = _PROMPT,
50    system = _CONTEXT,
51})
52bus.emit("_", r)
53"#;
54
55/// How the Lua script source for `run()` is supplied.
56///
57/// `Path` matches the CLI form (`agent-block -s <path>`), reading from
58/// the filesystem at start. `Inline` lets SDK consumers pass a script
59/// they hold in memory (compile-time `include_str!`, dynamically built
60/// string, etc.) without writing it to a tempfile. `DefaultAgent` uses
61/// an embedded invoker that runs the StdPkg `agent` module with the
62/// caller-supplied prompt/context and emits the result via
63/// `bus.emit("agent_result", ...)`.
64#[derive(Debug, Clone)]
65pub enum ScriptSource {
66    /// Read the script from a filesystem path at start.
67    Path(PathBuf),
68    /// Use the supplied source code directly.
69    Inline {
70        /// Lua source code.
71        source: String,
72        /// Display name used in tracing, error messages, and the Lua
73        /// `_SCRIPT_NAME` global (e.g. `"agent_invoker.lua"`).
74        name: String,
75    },
76    /// Use the embedded default agent invoker. `prompt` / `context`
77    /// are forwarded as `_PROMPT` / `_CONTEXT` Lua globals and the
78    /// agent result is emitted on the EventBus under a neutral label
79    /// (`"_"`). SDK consumers should pair this with
80    /// [`BlockConfig::host_handler`] (the kind-agnostic single sink)
81    /// and `auto_serve_bus = true`. The emit-kind is intentionally
82    /// meaningless; consumers that need string-keyed routing should
83    /// supply [`ScriptSource::Inline`] with their own invoker.
84    DefaultAgent,
85}
86
87/// How a string payload (prompt / system context) is supplied.
88///
89/// `Inline` is the literal string variant (CLI `--prompt` / `--context`).
90/// `File` reads the contents from disk at `run()` start (CLI
91/// `--prompt-file` / `--context-file`).
92#[derive(Debug, Clone)]
93pub enum PromptSource {
94    /// Literal string.
95    Inline(String),
96    /// Filesystem path; contents are read at `run()` start.
97    File(PathBuf),
98}
99
100/// How the Ed25519 mesh identity secret key is supplied.
101///
102/// `Inline` is a 64-hex literal. `Env` reads the named environment
103/// variable at `run()` start (CLI default uses
104/// `AGENT_BLOCK_MESH_SECRET_KEY`). Absence of any `SecretKeySource`
105/// (i.e. `BlockConfig.secret_key = None`) causes a random keypair to
106/// be generated, matching the prior behavior.
107#[derive(Debug, Clone)]
108pub enum SecretKeySource {
109    /// 64-character hex literal.
110    Inline(String),
111    /// Environment variable name to read at start.
112    Env(String),
113}
114
115/// Async handler invoked when the LLM (or a Lua call to
116/// `tool.call(name, ...)`) targets a Rust-implemented tool supplied via
117/// [`BlockConfig::host_tools`].
118///
119/// `input` arrives as a `serde_json::Value` (converted from Lua before
120/// the handler is invoked). The returned value is converted back to a
121/// Lua value and delivered to the caller. Errors are propagated as
122/// `LuaError::external` (visible inside the script) and as `BlockError`
123/// on the Rust side.
124#[async_trait::async_trait]
125pub trait ToolHandler: Send + Sync + 'static {
126    async fn call(&self, input: serde_json::Value) -> Result<serde_json::Value, BlockError>;
127}
128
129/// Declarative spec for a Rust-implemented tool injected into the Lua
130/// tool registry before the user script runs. The resulting entry is
131/// indistinguishable from a Lua-defined tool from the script's view:
132/// `tool.call("<name>", input)`, `agent.run({ ... })` tool dispatch,
133/// and `tool.schema()` enumeration all work uniformly.
134#[derive(Clone)]
135pub struct HostToolSpec {
136    /// Tool name. Becomes the routing key in `_TOOL_REGISTRY` and the
137    /// `name` field exposed by `tool.schema()` (Anthropic tool spec).
138    pub name: String,
139    /// Free-form description shown to the LLM. Becomes the
140    /// `description` field of the Anthropic tool spec.
141    pub description: String,
142    /// Input schema (Anthropic-compatible JSON Schema object).
143    pub input_schema: serde_json::Value,
144    /// Optional group label for [`agent.run`'s `tool_groups`] filter
145    /// and for [`BlockConfig::tool_policy`] (planned).
146    pub group: Option<String>,
147    /// Rust callback dispatched on every invocation.
148    pub handler: Arc<dyn ToolHandler>,
149}
150
151impl std::fmt::Debug for HostToolSpec {
152    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153        f.debug_struct("HostToolSpec")
154            .field("name", &self.name)
155            .field("description", &self.description)
156            .field("input_schema", &self.input_schema)
157            .field("group", &self.group)
158            .field("handler", &"<dyn ToolHandler>")
159            .finish()
160    }
161}
162
163/// Snapshot of a tool that a given [`BlockConfig`] will (statically)
164/// expose to the LLM. Produced by [`inspect_tools`] without running
165/// the script. MCP server tools are *not* included because they are
166/// only known after the MCP `initialize` handshake completes; callers
167/// that need that view should run the script and call `tool.schema()`
168/// from Lua.
169#[derive(Debug, Clone)]
170pub struct ToolMeta {
171    pub name: String,
172    pub description: String,
173    pub group: Option<String>,
174    pub source: ToolSource,
175}
176
177/// Origin of a tool listed by [`inspect_tools`].
178#[derive(Debug, Clone, PartialEq, Eq)]
179pub enum ToolSource {
180    /// Supplied via [`BlockConfig::host_tools`] (Rust-implemented).
181    HostRust,
182    /// Embedded StdPkg block (`agent`, `compile_loop`, …) — discovered
183    /// statically from [`EMBEDDED_BLOCKS`]. Note: not every embedded
184    /// block exposes a registered tool; this entry simply records that
185    /// the module is available via `require(...)`.
186    EmbeddedBlock,
187}
188
189/// Inspect the tools a [`BlockConfig`] will expose to the LLM without
190/// actually running the script. Returns the merged list of
191/// `host_tools` (declared in the config) and embedded-block sources.
192///
193/// MCP server tools are deliberately omitted — they only become known
194/// after the MCP `initialize` handshake. Use `tool.schema()` from
195/// inside the running script for that view.
196pub fn inspect_tools(config: &BlockConfig) -> Vec<ToolMeta> {
197    let mut out = Vec::new();
198    for t in &config.host_tools {
199        out.push(ToolMeta {
200            name: t.name.clone(),
201            description: t.description.clone(),
202            group: t.group.clone(),
203            source: ToolSource::HostRust,
204        });
205    }
206    for (name, _src) in EMBEDDED_BLOCKS {
207        out.push(ToolMeta {
208            name: (*name).to_string(),
209            description: format!("Embedded StdPkg block (require(\"{name}\"))"),
210            group: None,
211            source: ToolSource::EmbeddedBlock,
212        });
213    }
214    out
215}
216
217/// Build the `blocks/` portion of `package.path` from filesystem locations.
218///
219/// Priority (highest first):
220/// 1. `project_root/blocks/` — user-customisable, overrides embedded StdPkg
221/// 2. `exe_dir/blocks/`      — development hot-reload (next to the binary)
222///
223/// Returns a semicolon-terminated string ready to prepend to `package.path`,
224/// or an empty string when no `blocks/` directories are found.
225fn build_blocks_path(project_root: &Path) -> String {
226    let mut out = String::new();
227
228    // 1. project_root/blocks/
229    let project_blocks = project_root.join("blocks");
230    if project_blocks.is_dir() {
231        let pb = project_blocks.to_string_lossy();
232        out.push_str(&format!("{pb}/?.lua;{pb}/?/init.lua;"));
233    }
234
235    // 2. exe_dir/blocks/
236    match std::env::current_exe() {
237        Ok(exe) => {
238            if let Some(exe_dir) = exe.parent() {
239                let exe_blocks = exe_dir.join("blocks");
240                if exe_blocks.is_dir() {
241                    let eb = exe_blocks.to_string_lossy();
242                    out.push_str(&format!("{eb}/?.lua;{eb}/?/init.lua;"));
243                }
244            }
245        }
246        Err(e) => {
247            warn!(error = %e, "current_exe() failed; skipping exe_dir/blocks/ from package.path");
248        }
249    }
250
251    out
252}
253
254pub struct BlockConfig {
255    /// Lua script to execute. See [`ScriptSource`] for the supported
256    /// shapes (filesystem path / inline source / embedded default
257    /// agent invoker).
258    pub script: ScriptSource,
259    pub project_root: PathBuf,
260    pub relay_url: Option<String>,
261    /// Ed25519 secret key for mesh identity. See [`SecretKeySource`]
262    /// for the supported shapes (inline 64-hex / environment variable).
263    /// `None` generates a random keypair. Required to talk to
264    /// registry/ACL-gated hosted meshes.
265    pub secret_key: Option<SecretKeySource>,
266    /// Per-RPC timeout for every MCP round-trip (connect / list / call).
267    /// Defaults to [`agent_block_mcp::DEFAULT_RPC_TIMEOUT`].
268    pub mcp_rpc_timeout: Duration,
269    /// Prompt payload injected as `_PROMPT` Lua global. See
270    /// [`PromptSource`] for the supported shapes. `None` leaves the
271    /// global unset.
272    pub prompt: Option<PromptSource>,
273    /// Context payload injected as `_CONTEXT` Lua global (typically
274    /// the system prompt). Same shape rules as [`Self::prompt`].
275    pub context: Option<PromptSource>,
276    /// Host-side Rust handlers pre-installed on the EventBus before the user
277    /// script starts. Each entry registers `handler` against `kind` via
278    /// [`EventBus::on`], so a script-side `bus.emit(kind, payload)` is
279    /// captured by the Rust handler rather than dispatched to a Lua function.
280    ///
281    /// Intended for SDK consumers that embed `agent-block-core` and need to
282    /// receive script output programmatically (e.g. a Spawner adapter that
283    /// turns LLM script output into a typed `WorkerResult`). Lua-side
284    /// `bus.on(kind, fn)` registrations layered on top of the handler Isle
285    /// are still possible, but the EventBus dispatches a single handler per
286    /// `kind` (last-write-wins), so host-side and Lua-side registrations on
287    /// the same `kind` collide; choose one side per routing key.
288    ///
289    /// Defaults to an empty map (no host handlers).
290    pub host_handlers: HashMap<String, Arc<dyn Handler>>,
291    /// Single host-side Rust handler that catches every event regardless
292    /// of `kind`. Internally registered via [`EventBus::on_any`], so it
293    /// acts as a fallback when no entry in [`Self::host_handlers`]
294    /// matches the incoming `kind`.
295    ///
296    /// This is the SDK-embed 1-shot sink: SDK consumers do not need to
297    /// invent or coordinate a string `kind` between the Lua script and
298    /// their Rust code. The agent invoker's emit-kind is irrelevant —
299    /// the handler receives every event.
300    ///
301    /// Use this when you want a single Rust handler to receive results
302    /// (typical embedded use). Use [`Self::host_handlers`] instead when
303    /// you actually need string-keyed routing (multi-source / multi-
304    /// handler dispatch). The two may coexist: kind-specific handlers
305    /// in `host_handlers` take precedence, and this single handler is
306    /// the fallback for unmatched kinds.
307    ///
308    /// Defaults to `None`.
309    pub host_handler: Option<Arc<dyn Handler>>,
310    /// Rust-implemented tools injected into the Lua tool registry
311    /// before the user script runs. Each entry becomes
312    /// indistinguishable from a Lua-defined tool: it is discoverable
313    /// via `tool.list()` / `tool.schema()`, dispatchable via
314    /// `tool.call(name, input)`, and visible to `agent.run`'s LLM
315    /// function-calling.
316    ///
317    /// SDK consumers can use this to expose Rust capabilities
318    /// (database lookups, business logic, etc.) to the LLM without
319    /// writing any Lua. See [`HostToolSpec`] and [`ToolHandler`].
320    ///
321    /// Defaults to an empty list.
322    pub host_tools: Vec<HostToolSpec>,
323    /// When `true`, the EventBus dispatcher loop is driven in the background
324    /// for the duration of the script and shut down gracefully after the
325    /// script completes. Required for SDK-embed callers that supply
326    /// [`Self::host_handlers`] and need `bus.emit(kind, payload)` events
327    /// emitted from the script to actually reach those handlers without
328    /// requiring the script to call `bus.serve()` (which blocks on
329    /// SIGTERM / Ctrl+C and never returns under programmatic embedding).
330    ///
331    /// After the script finishes, the dispatcher is given a grace window
332    /// (`AGENT_BLOCK_TASK_GRACE_MS`, default 1000ms) to drain queued events
333    /// and finish any in-flight handler, then is cancelled.
334    ///
335    /// Mutually exclusive with Lua-side `bus.serve()`: enabling this flag
336    /// takes ownership of the EventBus before the script runs, so a script
337    /// that calls `bus.on(...)` followed by `bus.serve()` will error
338    /// ("bus.serve() has already taken ownership"). Use this flag when the
339    /// script's sole purpose is to push events to host handlers.
340    ///
341    /// Defaults to `false` (legacy behavior: dispatcher only runs when the
342    /// script calls `bus.serve()`).
343    pub auto_serve_bus: bool,
344    /// Optional caller-supplied cancellation token. When cancelled, the
345    /// in-flight script is interrupted via the Isle's debug-hook cancel
346    /// path, the auto-serve dispatcher (if any) is shut down, and `run()`
347    /// returns `Err(BlockError::Cancelled)`.
348    ///
349    /// Intended for SDK consumers that spawn `run()` as a tokio task and
350    /// need an out-of-band abort signal (timeouts, parent-task cancellation
351    /// propagation, user-driven stop). The token is observed across the
352    /// `coroutine_eval` await; once cancellation propagates, the shutdown
353    /// sequence (MCP disconnect, Isle drivers, auto-serve dispatcher)
354    /// still runs so file descriptors and remote handles are released.
355    ///
356    /// Defaults to `None` (legacy behavior: `run()` only completes when
357    /// the script returns naturally).
358    pub shutdown_token: Option<CancellationToken>,
359}
360
361/// Shared context passed into Lua bridge functions.
362#[derive(Clone)]
363pub struct HostContext {
364    pub project_root: PathBuf,
365    pub mesh_agent: Option<Arc<agent_mesh_sdk::MeshAgent>>,
366    pub mcp_manager: Arc<RwLock<McpManager>>,
367    /// Shared async HTTP client for `http.*` bridge.
368    pub http_client: reqwest::Client,
369    /// Shared SQLite connection for `sql.*` bridge (user tables).
370    pub sql_conn: Arc<Mutex<rusqlite::Connection>>,
371    /// Interrupt handle for the sql connection.
372    /// Used to cancel in-flight queries on timeout (see `bridge/sql.rs`).
373    pub sql_interrupt: Arc<rusqlite::InterruptHandle>,
374    /// Shared SQLite connection for `kv.*` bridge (`__kv` table only).
375    /// Separate from sql_conn so KV scratch state and user SQL data don't
376    /// share WAL, page cache, or backup lifecycle.
377    pub kv_conn: Arc<Mutex<rusqlite::Connection>>,
378    /// Interrupt handle for the kv connection.
379    pub kv_interrupt: Arc<rusqlite::InterruptHandle>,
380    /// Shared SQLite connection for `ts.*` bridge (TSDB — time-series table).
381    /// Separate DB file so TSDB WAL does not share page cache with kv/sql.
382    pub ts_conn: Arc<Mutex<rusqlite::Connection>>,
383    /// Interrupt handle for the ts connection.
384    /// Used by `bridge::ts` to cancel in-flight queries on timeout (Subtask 2).
385    #[allow(dead_code)]
386    pub ts_interrupt: Arc<rusqlite::InterruptHandle>,
387    /// Async handle to the main Isle Lua VM that runs the user script via
388    /// `coroutine_eval`. After Subtask 2, `bridge::bus` no longer dispatches
389    /// handlers against this Isle; handlers live on `handler_isle` instead.
390    /// The field is retained because bridge code still keyed to the main
391    /// Isle (future `coroutine_call` back-edges, introspection APIs) may
392    /// need it, and removing it would force another HostContext reshape.
393    #[allow(dead_code)]
394    pub isle: Arc<AsyncIsle>,
395    /// Dedicated Isle for EventBus handler execution. Lua handlers
396    /// registered via `bus.on` / `bus.on_any` run here so that CPU-bound
397    /// handler code does not occupy the main Isle's LocalSet and block
398    /// grace timers / shutdown wakers on the main VM side.
399    ///
400    /// Used by `bridge::bus` to forward handler bytecode
401    /// (`Function::dump(true)` → `handler_isle.exec(...)`) and by
402    /// [`LuaHandler::call`](crate::bridge::bus) to dispatch via
403    /// `coroutine_call("__bus_dispatch", ...)`.
404    pub handler_isle: Arc<AsyncIsle>,
405    /// Ingress sender for the EventBus. Adapters (mesh / webhook / …)
406    /// clone this and push `Event`s. The ST3 mesh adapter captures its own
407    /// clone at `MeshAgent::connect` time, so the field itself is not read
408    /// elsewhere in the ST3 cut — kept `pub` for ST4+ adapter wiring.
409    #[allow(dead_code)]
410    pub bus_tx: mpsc::Sender<Event>,
411    /// Mutex-wrapped `Option<EventBus>` so `bus.on` / `bus.on_any` can lock
412    /// briefly from sync Lua context, and `bus.serve` can `Option::take`
413    /// ownership before entering the long-lived `run()` await (avoiding the
414    /// await-holding-lock anti-pattern on a `std::sync::Mutex`).
415    pub event_bus: Arc<Mutex<Option<EventBus>>>,
416}
417
418/// Open a SQLite connection at `path` (or `:memory:`) and apply the shared
419/// pragmas driven by ENV (`journal_mode`, `busy_timeout`). Returns the
420/// connection wrapped in Arc<Mutex<_>> together with its interrupt handle.
421///
422/// `label` is used only for the init log line (`sql` / `kv`) so that the two
423/// databases are distinguishable in tracing output.
424fn open_sqlite(
425    path: &Path,
426    label: &'static str,
427) -> BlockResult<(
428    Arc<Mutex<rusqlite::Connection>>,
429    Arc<rusqlite::InterruptHandle>,
430)> {
431    let is_memory = crate::bridge::config::is_memory_sql(path);
432    if !is_memory {
433        if let Some(parent) = path.parent() {
434            std::fs::create_dir_all(parent)
435                .map_err(|e| BlockError::Runtime(format!("{label} dir create: {e}")))?;
436        }
437    }
438    let conn = rusqlite::Connection::open(path)
439        .map_err(|e| BlockError::Runtime(format!("sqlite open {}: {e}", path.display())))?;
440    if !is_memory {
441        let journal = crate::bridge::config::sql_journal_mode();
442        conn.pragma_update(None, "journal_mode", &journal)
443            .map_err(|e| BlockError::Runtime(format!("journal_mode={journal}: {e}")))?;
444    }
445    let busy_ms = crate::bridge::config::sql_busy_timeout().as_millis() as i64;
446    conn.pragma_update(None, "busy_timeout", busy_ms)
447        .map_err(|e| BlockError::Runtime(format!("busy_timeout pragma: {e}")))?;
448    info!(label, path = %path.display(), busy_ms, "sqlite initialized");
449    let interrupt = Arc::new(conn.get_interrupt_handle());
450    let conn = Arc::new(Mutex::new(conn));
451    Ok((conn, interrupt))
452}
453
454/// Build the init closure shared between the main Isle and the handler
455/// Isle.  Sets `_SCRIPT_NAME`, registers `mlua-batteries` `std.*`, and
456/// configures `package.path` / `package.searchers` so `require "agent"`
457/// (and any `blocks/` module) works inside the Lua VM.
458///
459/// Returns an `FnOnce` so each call produces a fresh closure; this lets
460/// both Isles be spawned from the same config without `Clone` bounds on
461/// the captured `HashMap`.
462fn build_isle_init(
463    script_name: String,
464    script_dir: String,
465    blocks_paths: String,
466    prompt: Option<String>,
467    context: Option<String>,
468) -> impl FnOnce(&mlua::Lua) -> mlua::Result<()> + Send + 'static {
469    move |lua| {
470        // Set script name before registering bridges (used by log.* for attribution)
471        lua.globals().set("_SCRIPT_NAME", script_name.as_str())?;
472        if let Some(ref p) = prompt {
473            lua.globals().set("_PROMPT", p.as_str())?;
474        }
475        if let Some(ref c) = context {
476            lua.globals().set("_CONTEXT", c.as_str())?;
477        }
478
479        mlua_batteries::register_all(lua, "std")?;
480
481        // ── package.path ──────────────────────────────────────────────
482        // Priority: script_dir > project_root/blocks/ > exe_dir/blocks/ > default
483        let package: mlua::Table = lua.globals().get("package")?;
484        let current_path: String = package.get("path")?;
485        let new_path =
486            format!("{script_dir}/?.lua;{script_dir}/?/init.lua;{blocks_paths}{current_path}");
487        package.set("path", new_path)?;
488
489        // ── package.searchers — embedded fallback ─────────────────────
490        // Register a custom searcher that loads blocks/ modules from the
491        // sources baked in at compile time.  This is the lowest-priority
492        // searcher so filesystem copies always win.
493        let embedded: HashMap<&'static str, &'static str> =
494            EMBEDDED_BLOCKS.iter().copied().collect();
495
496        let searchers: mlua::Table = package.get("searchers")?;
497        let loader =
498            lua.create_function(move |lua, name: String| match embedded.get(name.as_str()) {
499                Some(source) => {
500                    let chunk = lua
501                        .load(*source)
502                        .set_name(format!("@embedded:blocks/{name}/init.lua"));
503                    let func = chunk.into_function()?;
504                    Ok(mlua::Value::Function(func))
505                }
506                None => {
507                    let msg = lua.create_string(format!("\n\tno embedded block '{name}'"))?;
508                    Ok(mlua::Value::String(msg))
509                }
510            })?;
511        // Append as the last searcher so filesystem paths remain preferred.
512        let next_idx = searchers.raw_len() + 1;
513        searchers.raw_set(next_idx, loader)?;
514
515        Ok(())
516    }
517}
518
519/// Spawn the dedicated handler Isle.
520///
521/// The handler Isle runs Lua bus handlers (`bus.on` / `bus.on_any`) on a
522/// separate OS thread with its own `tokio` current-thread runtime, keeping
523/// CPU-bound handlers from starving the main Isle's grace timers.
524///
525/// Bridge registration is deferred to a follow-up `exec` in `run()` because
526/// `HostContext` is not constructible until both Isles exist (the struct
527/// itself holds `Arc<AsyncIsle>` for both).
528async fn spawn_handler_isle(
529    script_name: String,
530    script_dir: String,
531    blocks_paths: String,
532    prompt: Option<String>,
533    context: Option<String>,
534) -> BlockResult<(Arc<AsyncIsle>, AsyncIsleDriver)> {
535    let init = build_isle_init(script_name, script_dir, blocks_paths, prompt, context);
536    let (isle, driver) = AsyncIsle::builder()
537        .thread_name("agent-block-handler-isle")
538        .spawn(init)
539        .await
540        .map_err(|e| BlockError::Runtime(format!("handler isle spawn failed: {e}")))?;
541    info!(
542        thread_name = "agent-block-handler-isle",
543        "handler Isle spawned"
544    );
545    Ok((Arc::new(isle), driver))
546}
547
548fn hex_decode_32(s: &str) -> Result<[u8; 32], String> {
549    let s = s.trim();
550    if s.len() != 64 {
551        return Err(format!("expected 64 hex chars, got {}", s.len()));
552    }
553    let mut out = [0u8; 32];
554    for (i, byte) in out.iter_mut().enumerate() {
555        let hi = u8::from_str_radix(&s[2 * i..2 * i + 1], 16)
556            .map_err(|e| format!("invalid hex at position {}: {e}", 2 * i))?;
557        let lo = u8::from_str_radix(&s[2 * i + 1..2 * i + 2], 16)
558            .map_err(|e| format!("invalid hex at position {}: {e}", 2 * i + 1))?;
559        *byte = (hi << 4) | lo;
560    }
561    Ok(out)
562}
563
564pub async fn run(config: BlockConfig) -> BlockResult<()> {
565    // ── Resolve sources ───────────────────────────────────────────
566    // Convert the `Source` enums on `BlockConfig` to their concrete
567    // payloads before any Isle setup. `File`/`Path`/`Env` variants
568    // read from disk / environment exactly once, here at the start.
569    let (script_source, script_name, script_dir_pathbuf) = match &config.script {
570        ScriptSource::Path(p) => {
571            let source = std::fs::read_to_string(p)
572                .map_err(|e| BlockError::Script(format!("{}: {e}", p.display())))?;
573            let name = p
574                .file_name()
575                .map(|n| n.to_string_lossy().to_string())
576                .unwrap_or_else(|| "unknown".to_string());
577            let dir = p
578                .parent()
579                .map(|d| d.to_path_buf())
580                .unwrap_or_else(|| PathBuf::from("."));
581            (source, name, dir)
582        }
583        ScriptSource::Inline { source, name } => {
584            (source.clone(), name.clone(), config.project_root.clone())
585        }
586        ScriptSource::DefaultAgent => (
587            DEFAULT_AGENT_INVOKER.to_string(),
588            "default_agent_invoker.lua".to_string(),
589            config.project_root.clone(),
590        ),
591    };
592
593    let prompt_resolved: Option<String> = match &config.prompt {
594        Some(PromptSource::Inline(s)) => Some(s.clone()),
595        Some(PromptSource::File(p)) => Some(
596            std::fs::read_to_string(p)
597                .map_err(|e| BlockError::Script(format!("prompt file {}: {e}", p.display())))?,
598        ),
599        None => None,
600    };
601    let context_resolved: Option<String> = match &config.context {
602        Some(PromptSource::Inline(s)) => Some(s.clone()),
603        Some(PromptSource::File(p)) => Some(
604            std::fs::read_to_string(p)
605                .map_err(|e| BlockError::Script(format!("context file {}: {e}", p.display())))?,
606        ),
607        None => None,
608    };
609    let secret_key_resolved: Option<String> = match &config.secret_key {
610        Some(SecretKeySource::Inline(s)) => Some(s.clone()),
611        Some(SecretKeySource::Env(var)) => std::env::var(var).ok(),
612        None => None,
613    };
614
615    // NOTE: We previously held entered span guards across awaits for nested
616    // span context. That made the `run()` future `!Send`, which prevents
617    // SDK consumers from `tokio::spawn(run(config))`. Span context is
618    // attached to events via fields on the `info_span!` calls below; the
619    // missing nesting is an acceptable trade-off for `Send` correctness.
620    let _root_span = info_span!("agent_block", script = %script_name);
621
622    // ── .env ──────────────────────────────────────────────────────
623    // Load .env from project_root if present. Variables are merged into
624    // the process environment so Lua's `std.env.get()` picks them up.
625    let env_path = config.project_root.join(".env");
626    match dotenvy::from_path(&env_path) {
627        Ok(()) => info!(path = %env_path.display(), ".env loaded"),
628        Err(dotenvy::Error::Io(_)) => {} // file not found — fine
629        Err(e) => tracing::warn!(path = %env_path.display(), error = %e, ".env parse error"),
630    }
631
632    // ── Init ──────────────────────────────────────────────────────
633    let _init_span = info_span!("init");
634
635    // ── EventBus channel ─────────────────────────────────────────────
636    // Construct the bounded mpsc BEFORE MeshAgent::connect so the relay
637    // handler can hold a `bus_tx` clone and forward incoming requests
638    // into the dispatcher. Capacity is ENV-driven (see bridge::config).
639    let bus_capacity = crate::bridge::config::bus_capacity();
640    let (bus_tx, bus_rx) = mpsc::channel::<Event>(bus_capacity);
641    let event_bus = Arc::new(Mutex::new(Some(EventBus::new(bus_rx))));
642
643    // ── Pre-install host-side Rust handlers ───────────────────────────
644    // SDK consumers attach Rust handlers via `BlockConfig.host_handlers`
645    // so that script-side `bus.emit(kind, payload)` is captured by a Rust
646    // `Arc<dyn Handler>` instead of being dispatched to a Lua function.
647    // Registered here (before any Lua bridge registers handlers and before
648    // `bus.serve` takes ownership of the bus) so the EventBus already
649    // carries the host handlers when the script starts.
650    // Install host-side Rust handlers: kind-specific entries from
651    // `host_handlers` and, when set, the kind-agnostic `host_handler`
652    // (registered via `on_any` as the fallback for unmatched kinds).
653    // SDK-embed 1-shot callers typically only set `host_handler`.
654    let has_kind_handlers = !config.host_handlers.is_empty();
655    let has_any_handler = config.host_handler.is_some();
656    if has_kind_handlers || has_any_handler {
657        let mut guard = event_bus
658            .lock()
659            .map_err(|_| BlockError::Bus("event_bus mutex poisoned".into()))?;
660        let bus = guard
661            .as_mut()
662            .ok_or_else(|| BlockError::Bus("event_bus already taken".into()))?;
663        for (kind, handler) in &config.host_handlers {
664            bus.on(kind.clone(), Arc::clone(handler))
665                .map_err(|e| BlockError::Bus(format!("host_handlers on({kind}): {e}")))?;
666        }
667        if let Some(any_handler) = &config.host_handler {
668            bus.on_any(Arc::clone(any_handler))
669                .map_err(|e| BlockError::Bus(format!("host_handler on_any: {e}")))?;
670        }
671        info!(
672            kind_handlers = config.host_handlers.len(),
673            any_handler = has_any_handler,
674            "host handlers pre-installed"
675        );
676    }
677
678    // ── auto-serve: background dispatcher for SDK-embed callers ───────
679    // When `auto_serve_bus` is on and at least one host-side handler
680    // (kind-specific or kind-agnostic) is installed, take the EventBus
681    // out of the Mutex *before* the script runs and spawn the dispatcher
682    // loop on the runtime. This lets `bus.emit(kind, payload)` from the
683    // script reach the host handler without requiring the script to call
684    // `bus.serve()` (which blocks on signals and never returns under
685    // programmatic embedding).
686    let auto_serve = config.auto_serve_bus && (has_kind_handlers || has_any_handler);
687    let auto_serve_state: Option<(tokio::task::JoinHandle<()>, CancellationToken)> = if auto_serve {
688        let bus = {
689            let mut guard = event_bus
690                .lock()
691                .map_err(|_| BlockError::Bus("event_bus mutex poisoned".into()))?;
692            guard
693                .take()
694                .ok_or_else(|| BlockError::Bus("event_bus already taken".into()))?
695        };
696        let token = CancellationToken::new();
697        let token_for_task = token.clone();
698        let handle = tokio::spawn(async move {
699            let mut bus = bus;
700            if let Err(e) = bus.run(token_for_task).await {
701                tracing::error!(error = %e, "auto-serve: dispatcher loop returned error");
702            }
703        });
704        info!("auto-serve: dispatcher spawned");
705        Some((handle, token))
706    } else {
707        None
708    };
709
710    let mesh_agent = if let Some(ref relay_url) = config.relay_url {
711        let keypair = match &secret_key_resolved {
712            Some(hex_str) => {
713                let bytes = hex_decode_32(hex_str)
714                    .map_err(|e| BlockError::Runtime(format!("secret-key: {e}")))?;
715                agent_mesh_core::identity::AgentKeypair::from_bytes(&bytes)
716            }
717            None => agent_mesh_core::identity::AgentKeypair::generate(),
718        };
719        info!(agent_id = %keypair.agent_id(), "mesh identity");
720        let acl = agent_mesh_core::acl::AclPolicy {
721            default_deny: false,
722            rules: vec![],
723        };
724        let handler: Arc<dyn agent_mesh_sdk::RequestHandler> =
725            Arc::new(BusRelayHandler::new(bus_tx.clone()));
726        let url = relay_url.clone();
727        let agent = agent_mesh_sdk::MeshAgent::connect(keypair, &url, acl, handler)
728            .await
729            .map_err(|e| BlockError::Mesh(format!("connect to {relay_url} failed: {e}")))?;
730        info!(relay_url = %relay_url, "mesh connected");
731        Some(Arc::new(agent))
732    } else {
733        None
734    };
735
736    let mcp_manager = Arc::new(RwLock::new(McpManager::with_rpc_timeout(
737        config.mcp_rpc_timeout,
738    )?));
739
740    // Resolve project_root to absolute path.
741    // canonicalize() can fail if the path doesn't exist; fall back to
742    // joining with current_dir to guarantee an absolute path.
743    let project_root = config
744        .project_root
745        .canonicalize()
746        .or_else(|_| std::env::current_dir().map(|cwd| cwd.join(&config.project_root)))?;
747
748    let http_client = reqwest::Client::new();
749
750    // ── SQLite init (kv + sql get separate DB files) ──────────────────────
751    // All knobs are ENV-driven (see `bridge/config.rs`).
752    let sql_path = crate::bridge::config::sql_path().map_err(BlockError::Runtime)?;
753    let (sql_conn, sql_interrupt) = open_sqlite(&sql_path, "sql")?;
754
755    let kv_path = crate::bridge::config::kv_path().map_err(BlockError::Runtime)?;
756    let (kv_conn, kv_interrupt) = open_sqlite(&kv_path, "kv")?;
757
758    let ts_path = crate::bridge::config::ts_path().map_err(BlockError::Runtime)?;
759    let (ts_conn, ts_interrupt) = open_sqlite(&ts_path, "ts")?;
760
761    // Use the script dir derived from the resolved `ScriptSource` for
762    // `package.path` lookups. For inline / default-agent variants the dir
763    // falls back to `project_root` (set during source resolution above).
764    let script_dir = script_dir_pathbuf.to_string_lossy().to_string();
765
766    // Precompute values captured by the init closure so we don't need to
767    // move the full `HostContext` into it (HostContext now holds
768    // `Arc<AsyncIsle>`, which is available only after `AsyncIsle::spawn`
769    // returns — classic chicken-and-egg). All bridge registrations run in a
770    // second pass via `isle.exec` below.
771    let blocks_paths = build_blocks_path(&project_root);
772    let prompt = prompt_resolved.clone();
773    let context = context_resolved.clone();
774
775    // ── main Isle ─────────────────────────────────────────────────
776    let (isle, driver) = AsyncIsle::spawn(build_isle_init(
777        script_name.clone(),
778        script_dir.clone(),
779        blocks_paths.clone(),
780        prompt.clone(),
781        context.clone(),
782    ))
783    .await
784    .map_err(|e| BlockError::Runtime(format!("AsyncIsle spawn failed: {e}")))?;
785    let isle = Arc::new(isle);
786
787    // ── handler Isle (sequential, dependencies are trivial) ────────
788    let (handler_isle, handler_driver) = spawn_handler_isle(
789        script_name.clone(),
790        script_dir.clone(),
791        blocks_paths.clone(),
792        prompt,
793        context,
794    )
795    .await?;
796
797    // Wire both Isles into McpManager so Lua notification callbacks can be
798    // dispatched from the rmcp task thread.
799    // - handler_isle: sampling/createMessage dispatch (exec on handler Isle)
800    // - main_isle: progress/log notification dispatch (exec on main Isle so
801    //   user callback upvalues are preserved — no bytecode dump/reload needed)
802    {
803        let mut mgr = mcp_manager.write().await;
804        mgr.set_handler_isle(Arc::clone(&handler_isle));
805        mgr.set_main_isle(Arc::clone(&isle));
806    }
807
808    // ── HostContext + bridge registration ──────────────────────────────
809    // Wrap the isle in an Arc so `HostContext` can hand it to
810    // `bridge::bus` (which uses `AsyncIsle::coroutine_call` to invoke Lua
811    // handlers from the EventBus dispatcher task).
812    let ctx = HostContext {
813        project_root,
814        mesh_agent,
815        mcp_manager: Arc::clone(&mcp_manager),
816        http_client,
817        sql_conn,
818        sql_interrupt,
819        kv_conn,
820        kv_interrupt,
821        ts_conn,
822        ts_interrupt,
823        isle: Arc::clone(&isle),
824        handler_isle: Arc::clone(&handler_isle),
825        bus_tx: bus_tx.clone(),
826        event_bus: Arc::clone(&event_bus),
827    };
828
829    {
830        let ctx = ctx.clone();
831        isle.exec(move |lua| {
832            bridge::register_all(lua, &ctx)
833                .map_err(|e| mlua_isle::IsleError::Lua(format!("bridge register failed: {e}")))?;
834            Ok(String::new())
835        })
836        .await
837        .map_err(|e| BlockError::Runtime(format!("bridge register: {e}")))?;
838    }
839
840    {
841        let ctx = ctx.clone();
842        handler_isle
843            .exec(move |lua| {
844                bridge::register_all_handler_side(lua, &ctx).map_err(|e| {
845                    mlua_isle::IsleError::Lua(format!("handler bridge register failed: {e}"))
846                })?;
847                Ok(String::new())
848            })
849            .await
850            .map_err(|e| BlockError::Runtime(format!("handler bridge register: {e}")))?;
851    }
852
853    // ── Inject host_tools into the Lua tool registry ───────────────
854    // Done after `bridge::register_all` so `_TOOL_REGISTRY` exists.
855    // Each entry becomes an Anthropic-shaped tool spec table
856    //   { name, schema = { description, input_schema }, handler, group? }
857    // where `handler` is a Lua async function that bridges back into
858    // the supplied `ToolHandler::call`. Lua-side `tool.list()` /
859    // `tool.schema()` / `agent.run` see these uniformly with native
860    // Lua-defined tools.
861    if !config.host_tools.is_empty() {
862        let host_tools = config.host_tools.clone();
863        let tool_count = host_tools.len();
864        isle.exec(move |lua| {
865            let registry: mlua::Table = lua
866                .globals()
867                .get("_TOOL_REGISTRY")
868                .map_err(|e| mlua_isle::IsleError::Lua(format!("get _TOOL_REGISTRY: {e}")))?;
869            for tool in host_tools {
870                let entry = lua
871                    .create_table()
872                    .map_err(|e| mlua_isle::IsleError::Lua(format!("create entry: {e}")))?;
873                entry
874                    .set("name", tool.name.as_str())
875                    .map_err(|e| mlua_isle::IsleError::Lua(format!("set name: {e}")))?;
876                // schema = { description, input_schema } — Anthropic shape
877                let schema = lua
878                    .create_table()
879                    .map_err(|e| mlua_isle::IsleError::Lua(format!("create schema: {e}")))?;
880                schema
881                    .set("description", tool.description.as_str())
882                    .map_err(|e| mlua_isle::IsleError::Lua(format!("set description: {e}")))?;
883                let input_schema_lua =
884                    crate::bridge::json_to_lua(lua, tool.input_schema.clone())
885                        .map_err(|e| mlua_isle::IsleError::Lua(format!("input_schema: {e}")))?;
886                schema
887                    .set("input_schema", input_schema_lua)
888                    .map_err(|e| mlua_isle::IsleError::Lua(format!("set input_schema: {e}")))?;
889                entry
890                    .set("schema", schema)
891                    .map_err(|e| mlua_isle::IsleError::Lua(format!("set schema: {e}")))?;
892                if let Some(group) = &tool.group {
893                    entry
894                        .set("group", group.as_str())
895                        .map_err(|e| mlua_isle::IsleError::Lua(format!("set group: {e}")))?;
896                }
897                let handler_arc = Arc::clone(&tool.handler);
898                let handler_fn = lua
899                    .create_async_function(move |lua, input: mlua::Value| {
900                        let handler = Arc::clone(&handler_arc);
901                        async move {
902                            let input_json = crate::bridge::lua_to_json(&lua, input)?;
903                            let result = handler
904                                .call(input_json)
905                                .await
906                                .map_err(mlua::Error::external)?;
907                            crate::bridge::json_to_lua(&lua, result)
908                        }
909                    })
910                    .map_err(|e| mlua_isle::IsleError::Lua(format!("create handler: {e}")))?;
911                entry
912                    .set("handler", handler_fn)
913                    .map_err(|e| mlua_isle::IsleError::Lua(format!("set handler: {e}")))?;
914                registry
915                    .set(tool.name.as_str(), entry)
916                    .map_err(|e| mlua_isle::IsleError::Lua(format!("registry set: {e}")))?;
917            }
918            Ok(String::new())
919        })
920        .await
921        .map_err(|e| BlockError::Runtime(format!("host_tools inject: {e}")))?;
922        info!(count = tool_count, "host tools injected into Lua registry");
923    }
924
925    drop(_init_span);
926
927    // ── Execute ───────────────────────────────────────────────────
928    // When `shutdown_token` is supplied, race the script future against
929    // the caller's cancellation signal. On cancel, propagate to the Isle
930    // via the AsyncTask's cancel token so the debug hook unwinds the Lua
931    // VM, then continue into the shutdown sequence below (we still want
932    // to release MCP/mesh handles and join the auto-serve dispatcher
933    // before returning).
934    let script_result: Result<(), BlockError> = {
935        let _exec_span = info_span!("execute", script = %script_name);
936
937        let mut task = isle.spawn_coroutine_eval(&script_source);
938        let task_cancel = task.cancel_token().clone();
939        match config.shutdown_token.as_ref() {
940            Some(token) => {
941                tokio::select! {
942                    biased;
943                    _ = token.cancelled() => {
944                        task_cancel.cancel();
945                        // Wait for the Isle to unwind so the VM is in a
946                        // consistent state before driver shutdown. The
947                        // debug hook fires at the next HOOK_INTERVAL.
948                        let _ = (&mut task).await;
949                        info!("shutdown_token: cancelled by caller");
950                        Err(BlockError::Cancelled)
951                    }
952                    res = &mut task => res.map(|_| ()).map_err(|e| BlockError::Script(format!("{e}"))),
953                }
954            }
955            None => (&mut task)
956                .await
957                .map(|_| ())
958                .map_err(|e| BlockError::Script(format!("{e}"))),
959        }
960    };
961
962    // ── auto-serve drain + cancel ─────────────────────────────────
963    // Let the dispatcher drain events queued by the script, then signal
964    // shutdown and bound the join. Mirrors `bus.serve`'s grace pattern.
965    if let Some((handle, token)) = auto_serve_state {
966        let grace_ms = crate::bridge::config::task_grace_ms();
967        let grace = Duration::from_millis(grace_ms);
968        tokio::time::sleep(grace).await;
969        token.cancel();
970        match tokio::time::timeout(grace, handle).await {
971            Ok(Ok(())) => info!("auto-serve: dispatcher shut down cleanly"),
972            Ok(Err(join_err)) => {
973                tracing::error!(error = %join_err, "auto-serve: dispatcher task join error");
974            }
975            Err(_) => {
976                tracing::warn!(
977                    grace_ms,
978                    "auto-serve: dispatcher join timed out after cancel; forcing exit"
979                );
980            }
981        }
982    }
983
984    // ── Shutdown ──────────────────────────────────────────────────
985    {
986        let _shutdown_span = info_span!("shutdown");
987
988        mcp_manager.write().await.disconnect_all().await?;
989
990        driver
991            .shutdown()
992            .await
993            .map_err(|e| BlockError::Runtime(format!("AsyncIsle shutdown failed: {e}")))?;
994
995        // Handler Isle shutdown is independent of main shutdown: a failure
996        // here (e.g. ThreadPanic on the handler thread) is logged but does
997        // not poison the main process exit. The main Isle has already
998        // been stopped cleanly above.
999        match handler_driver.shutdown().await {
1000            Ok(()) => info!(
1001                thread_name = "agent-block-handler-isle",
1002                "handler Isle shut down"
1003            ),
1004            Err(e) => tracing::error!(
1005                error = %e,
1006                thread_name = "agent-block-handler-isle",
1007                "handler Isle shutdown failed"
1008            ),
1009        }
1010    }
1011
1012    script_result
1013}
1014
1015/// mesh → bus source adapter.
1016///
1017/// Implements [`agent_mesh_sdk::RequestHandler`] by packaging every incoming
1018/// mesh request into an [`Event`] with `kind = "mesh"`, pushing it onto the
1019/// bounded `bus_tx` channel, and awaiting the Lua handler's ack over a
1020/// oneshot channel carried inside the event.
1021///
1022/// Error paths (all `tracing::error!`-logged — silent-err-drop policy):
1023///
1024/// | Failure                   | Return value                           |
1025/// |---------------------------|----------------------------------------|
1026/// | `bus_tx.send` closed/full | `{"error": "bus channel closed"}`      |
1027/// | ack receiver dropped      | `{"error": "ack dropped"}`             |
1028/// | Lua handler `BlockError`  | `{"error": "<handler error>"}`         |
1029/// | Handler exceeded 30s      | `{"error": "handler timeout"}`         |
1030///
1031/// The 30s ack timeout mirrors the client-side timeout on `mesh.request`
1032/// (see `src/bridge/mesh.rs`).
1033struct BusRelayHandler {
1034    tx: mpsc::Sender<Event>,
1035}
1036
1037impl BusRelayHandler {
1038    fn new(tx: mpsc::Sender<Event>) -> Self {
1039        Self { tx }
1040    }
1041}
1042
1043/// Bound used for both the mesh-adapter ack wait and other source timeouts.
1044const BUS_ACK_TIMEOUT: Duration = Duration::from_secs(30);
1045
1046#[async_trait::async_trait]
1047impl agent_mesh_sdk::RequestHandler for BusRelayHandler {
1048    async fn handle(
1049        &self,
1050        from: &agent_mesh_core::identity::AgentId,
1051        payload: &serde_json::Value,
1052        _cancel: agent_mesh_sdk::CancelToken,
1053    ) -> serde_json::Value {
1054        let id = uuid::Uuid::new_v4().to_string();
1055        let meta = serde_json::json!({"from": from.to_string()});
1056        let (ack_tx, ack_rx) = oneshot::channel();
1057        let event = Event {
1058            kind: "mesh".into(),
1059            id: id.clone(),
1060            payload: payload.clone(),
1061            meta,
1062            ack_tx: Some(ack_tx),
1063        };
1064
1065        if let Err(e) = self.tx.send(event).await {
1066            tracing::error!(error = %e, id = %id, "bus channel closed; rejecting mesh request");
1067            return serde_json::json!({"error": "bus channel closed"});
1068        }
1069
1070        match tokio::time::timeout(BUS_ACK_TIMEOUT, ack_rx).await {
1071            Ok(Ok(Ok(v))) => v,
1072            Ok(Ok(Err(e))) => {
1073                tracing::error!(id = %id, error = %e, "mesh handler returned error");
1074                serde_json::json!({"error": e.to_string()})
1075            }
1076            Ok(Err(e)) => {
1077                tracing::error!(id = %id, error = %e, "mesh ack receiver dropped");
1078                serde_json::json!({"error": "ack dropped"})
1079            }
1080            Err(_) => {
1081                tracing::error!(id = %id, timeout_secs = BUS_ACK_TIMEOUT.as_secs(), "mesh handler timeout");
1082                serde_json::json!({"error": "handler timeout"})
1083            }
1084        }
1085    }
1086}