Skip to main content

agent_block_core/
host.rs

1//! Host — the thin Rust shell that wires up Lua VM, Mesh, HTTP, and MCP.
2//!
3//! # Responsibilities
4//!
5//! 1. Spawn an mlua-isle `AsyncIsle` (dedicated Lua VM thread with coroutine support)
6//! 2. Optionally connect to agent-mesh relay
7//! 3. Initialize the MCP manager for stdio-based MCP server connections
8//! 4. Inject all Lua stdlib bridges (`mesh.*`, `http.*`, `sh.*`, `tool.*`, `log.*`, `mcp.*`)
9//! 5. Execute the user-provided Lua script via `coroutine_eval` (async-aware)
10//! 6. Graceful shutdown (Isle + MCP servers + mesh)
11
12use std::collections::HashMap;
13use std::path::{Path, PathBuf};
14use std::sync::{Arc, Mutex};
15use std::time::Duration;
16use tokio::sync::{mpsc, oneshot, RwLock};
17
18use mlua_isle::{AsyncIsle, AsyncIsleDriver};
19use tracing::{info, info_span, warn};
20
21use crate::bridge;
22use crate::bus::{Event, EventBus, Handler};
23use agent_block_mcp::McpManager;
24use agent_block_types::error::{BlockError, BlockResult};
25use tokio_util::sync::CancellationToken;
26
27/// Embedded Lua sources for blocks/ StdPkg modules.
28/// These are baked into the binary at compile time so `cargo install` works
29/// without any extra file distribution.
30const EMBEDDED_BLOCKS: &[(&str, &str)] = &[
31    ("agent", include_str!("../blocks/agent/init.lua")),
32    ("session", include_str!("../blocks/session/init.lua")),
33    (
34        "compile_loop",
35        include_str!("../blocks/compile_loop/init.lua"),
36    ),
37];
38
39/// Embedded default agent invoker used by [`ScriptSource::DefaultAgent`].
40///
41/// Runs the StdPkg `agent` module with `_PROMPT` / `_CONTEXT` injected and
42/// emits the result on the EventBus. The emit kind is `"_"` — a neutral
43/// label with no SDK-side meaning. The result is intended to be received
44/// via [`BlockConfig::host_handler`] (the kind-agnostic single sink); the
45/// literal label is irrelevant to SDK consumers.
46const DEFAULT_AGENT_INVOKER: &str = r#"
47local agent = require("agent")
48local r = agent.run({
49    prompt = _PROMPT,
50    system = _CONTEXT,
51})
52bus.emit("_", r)
53"#;
54
55/// How the Lua script source for `run()` is supplied.
56///
57/// `Path` matches the CLI form (`agent-block -s <path>`), reading from
58/// the filesystem at start. `Inline` lets SDK consumers pass a script
59/// they hold in memory (compile-time `include_str!`, dynamically built
60/// string, etc.) without writing it to a tempfile. `DefaultAgent` uses
61/// an embedded invoker that runs the StdPkg `agent` module with the
62/// caller-supplied prompt/context and emits the result via
63/// `bus.emit("agent_result", ...)`.
64#[derive(Debug, Clone)]
65pub enum ScriptSource {
66    /// Read the script from a filesystem path at start.
67    Path(PathBuf),
68    /// Use the supplied source code directly.
69    Inline {
70        /// Lua source code.
71        source: String,
72        /// Display name used in tracing, error messages, and the Lua
73        /// `_SCRIPT_NAME` global (e.g. `"agent_invoker.lua"`).
74        name: String,
75    },
76    /// Use the embedded default agent invoker. `prompt` / `context`
77    /// are forwarded as `_PROMPT` / `_CONTEXT` Lua globals and the
78    /// agent result is emitted on the EventBus under a neutral label
79    /// (`"_"`). SDK consumers should pair this with
80    /// [`BlockConfig::host_handler`] (the kind-agnostic single sink)
81    /// and `auto_serve_bus = true`. The emit-kind is intentionally
82    /// meaningless; consumers that need string-keyed routing should
83    /// supply [`ScriptSource::Inline`] with their own invoker.
84    DefaultAgent,
85}
86
87/// How a string payload (prompt / system context) is supplied.
88///
89/// `Inline` is the literal string variant (CLI `--prompt` / `--context`).
90/// `File` reads the contents from disk at `run()` start (CLI
91/// `--prompt-file` / `--context-file`).
92#[derive(Debug, Clone)]
93pub enum PromptSource {
94    /// Literal string.
95    Inline(String),
96    /// Filesystem path; contents are read at `run()` start.
97    File(PathBuf),
98}
99
100/// How the Ed25519 mesh identity secret key is supplied.
101///
102/// `Inline` is a 64-hex literal. `Env` reads the named environment
103/// variable at `run()` start (CLI default uses
104/// `AGENT_BLOCK_MESH_SECRET_KEY`). Absence of any `SecretKeySource`
105/// (i.e. `BlockConfig.secret_key = None`) causes a random keypair to
106/// be generated, matching the prior behavior.
107#[derive(Debug, Clone)]
108pub enum SecretKeySource {
109    /// 64-character hex literal.
110    Inline(String),
111    /// Environment variable name to read at start.
112    Env(String),
113}
114
115/// Async handler invoked when the LLM (or a Lua call to
116/// `tool.call(name, ...)`) targets a Rust-implemented tool supplied via
117/// [`BlockConfig::host_tools`].
118///
119/// `input` arrives as a `serde_json::Value` (converted from Lua before
120/// the handler is invoked). The returned value is converted back to a
121/// Lua value and delivered to the caller. Errors are propagated as
122/// `LuaError::external` (visible inside the script) and as `BlockError`
123/// on the Rust side.
124#[async_trait::async_trait]
125pub trait ToolHandler: Send + Sync + 'static {
126    async fn call(&self, input: serde_json::Value) -> Result<serde_json::Value, BlockError>;
127}
128
129/// Declarative spec for a Rust-implemented tool injected into the Lua
130/// tool registry before the user script runs. The resulting entry is
131/// indistinguishable from a Lua-defined tool from the script's view:
132/// `tool.call("<name>", input)`, `agent.run({ ... })` tool dispatch,
133/// and `tool.schema()` enumeration all work uniformly.
134#[derive(Clone)]
135pub struct HostToolSpec {
136    /// Tool name. Becomes the routing key in `_TOOL_REGISTRY` and the
137    /// `name` field exposed by `tool.schema()` (Anthropic tool spec).
138    pub name: String,
139    /// Free-form description shown to the LLM. Becomes the
140    /// `description` field of the Anthropic tool spec.
141    pub description: String,
142    /// Input schema (Anthropic-compatible JSON Schema object).
143    pub input_schema: serde_json::Value,
144    /// Optional group label for [`agent.run`'s `tool_groups`] filter
145    /// and for [`BlockConfig::tool_policy`] (planned).
146    pub group: Option<String>,
147    /// Rust callback dispatched on every invocation.
148    pub handler: Arc<dyn ToolHandler>,
149}
150
151impl std::fmt::Debug for HostToolSpec {
152    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153        f.debug_struct("HostToolSpec")
154            .field("name", &self.name)
155            .field("description", &self.description)
156            .field("input_schema", &self.input_schema)
157            .field("group", &self.group)
158            .field("handler", &"<dyn ToolHandler>")
159            .finish()
160    }
161}
162
163/// Snapshot of a tool that a given [`BlockConfig`] will (statically)
164/// expose to the LLM. Produced by [`inspect_tools`] without running
165/// the script. MCP server tools are *not* included because they are
166/// only known after the MCP `initialize` handshake completes; callers
167/// that need that view should run the script and call `tool.schema()`
168/// from Lua.
169#[derive(Debug, Clone)]
170pub struct ToolMeta {
171    pub name: String,
172    pub description: String,
173    pub group: Option<String>,
174    pub source: ToolSource,
175}
176
177/// Origin of a tool listed by [`inspect_tools`].
178#[derive(Debug, Clone, PartialEq, Eq)]
179pub enum ToolSource {
180    /// Supplied via [`BlockConfig::host_tools`] (Rust-implemented).
181    HostRust,
182    /// Embedded StdPkg block (`agent`, `compile_loop`, …) — discovered
183    /// statically from [`EMBEDDED_BLOCKS`]. Note: not every embedded
184    /// block exposes a registered tool; this entry simply records that
185    /// the module is available via `require(...)`.
186    EmbeddedBlock,
187}
188
189/// Inspect the tools a [`BlockConfig`] will expose to the LLM without
190/// actually running the script. Returns the merged list of
191/// `host_tools` (declared in the config) and embedded-block sources.
192///
193/// MCP server tools are deliberately omitted — they only become known
194/// after the MCP `initialize` handshake. Use `tool.schema()` from
195/// inside the running script for that view.
196pub fn inspect_tools(config: &BlockConfig) -> Vec<ToolMeta> {
197    let mut out = Vec::new();
198    for t in &config.host_tools {
199        out.push(ToolMeta {
200            name: t.name.clone(),
201            description: t.description.clone(),
202            group: t.group.clone(),
203            source: ToolSource::HostRust,
204        });
205    }
206    for (name, _src) in EMBEDDED_BLOCKS {
207        out.push(ToolMeta {
208            name: (*name).to_string(),
209            description: format!("Embedded StdPkg block (require(\"{name}\"))"),
210            group: None,
211            source: ToolSource::EmbeddedBlock,
212        });
213    }
214    out
215}
216
217/// Build the `blocks/` portion of `package.path` from filesystem locations.
218///
219/// Priority (highest first):
220/// 1. `project_root/blocks/` — user-customisable, overrides embedded StdPkg
221/// 2. `exe_dir/blocks/`      — development hot-reload (next to the binary)
222///
223/// Returns a semicolon-terminated string ready to prepend to `package.path`,
224/// or an empty string when no `blocks/` directories are found.
225fn build_blocks_path(project_root: &Path) -> String {
226    let mut out = String::new();
227
228    // 1. project_root/blocks/
229    let project_blocks = project_root.join("blocks");
230    if project_blocks.is_dir() {
231        let pb = project_blocks.to_string_lossy();
232        out.push_str(&format!("{pb}/?.lua;{pb}/?/init.lua;"));
233    }
234
235    // 2. exe_dir/blocks/
236    match std::env::current_exe() {
237        Ok(exe) => {
238            if let Some(exe_dir) = exe.parent() {
239                let exe_blocks = exe_dir.join("blocks");
240                if exe_blocks.is_dir() {
241                    let eb = exe_blocks.to_string_lossy();
242                    out.push_str(&format!("{eb}/?.lua;{eb}/?/init.lua;"));
243                }
244            }
245        }
246        Err(e) => {
247            warn!(error = %e, "current_exe() failed; skipping exe_dir/blocks/ from package.path");
248        }
249    }
250
251    out
252}
253
254pub struct BlockConfig {
255    /// Lua script to execute. See [`ScriptSource`] for the supported
256    /// shapes (filesystem path / inline source / embedded default
257    /// agent invoker).
258    pub script: ScriptSource,
259    pub project_root: PathBuf,
260    pub relay_url: Option<String>,
261    /// Ed25519 secret key for mesh identity. See [`SecretKeySource`]
262    /// for the supported shapes (inline 64-hex / environment variable).
263    /// `None` generates a random keypair. Required to talk to
264    /// registry/ACL-gated hosted meshes.
265    pub secret_key: Option<SecretKeySource>,
266    /// Per-RPC timeout for every MCP round-trip (connect / list / call).
267    /// Defaults to [`agent_block_mcp::DEFAULT_RPC_TIMEOUT`].
268    pub mcp_rpc_timeout: Duration,
269    /// Prompt payload injected as `_PROMPT` Lua global. See
270    /// [`PromptSource`] for the supported shapes. `None` leaves the
271    /// global unset.
272    pub prompt: Option<PromptSource>,
273    /// Context payload injected as `_CONTEXT` Lua global (typically
274    /// the system prompt). Same shape rules as [`Self::prompt`].
275    pub context: Option<PromptSource>,
276    /// Host-side Rust handlers pre-installed on the EventBus before the user
277    /// script starts. Each entry registers `handler` against `kind` via
278    /// [`EventBus::on`], so a script-side `bus.emit(kind, payload)` is
279    /// captured by the Rust handler rather than dispatched to a Lua function.
280    ///
281    /// Intended for SDK consumers that embed `agent-block-core` and need to
282    /// receive script output programmatically (e.g. a Spawner adapter that
283    /// turns LLM script output into a typed `WorkerResult`). Lua-side
284    /// `bus.on(kind, fn)` registrations layered on top of the handler Isle
285    /// are still possible, but the EventBus dispatches a single handler per
286    /// `kind` (last-write-wins), so host-side and Lua-side registrations on
287    /// the same `kind` collide; choose one side per routing key.
288    ///
289    /// Defaults to an empty map (no host handlers).
290    pub host_handlers: HashMap<String, Arc<dyn Handler>>,
291    /// Single host-side Rust handler that catches every event regardless
292    /// of `kind`. Internally registered via [`EventBus::on_any`], so it
293    /// acts as a fallback when no entry in [`Self::host_handlers`]
294    /// matches the incoming `kind`.
295    ///
296    /// This is the SDK-embed 1-shot sink: SDK consumers do not need to
297    /// invent or coordinate a string `kind` between the Lua script and
298    /// their Rust code. The agent invoker's emit-kind is irrelevant —
299    /// the handler receives every event.
300    ///
301    /// Use this when you want a single Rust handler to receive results
302    /// (typical embedded use). Use [`Self::host_handlers`] instead when
303    /// you actually need string-keyed routing (multi-source / multi-
304    /// handler dispatch). The two may coexist: kind-specific handlers
305    /// in `host_handlers` take precedence, and this single handler is
306    /// the fallback for unmatched kinds.
307    ///
308    /// Defaults to `None`.
309    pub host_handler: Option<Arc<dyn Handler>>,
310    /// Rust-implemented tools injected into the Lua tool registry
311    /// before the user script runs. Each entry becomes
312    /// indistinguishable from a Lua-defined tool: it is discoverable
313    /// via `tool.list()` / `tool.schema()`, dispatchable via
314    /// `tool.call(name, input)`, and visible to `agent.run`'s LLM
315    /// function-calling.
316    ///
317    /// SDK consumers can use this to expose Rust capabilities
318    /// (database lookups, business logic, etc.) to the LLM without
319    /// writing any Lua. See [`HostToolSpec`] and [`ToolHandler`].
320    ///
321    /// Defaults to an empty list.
322    pub host_tools: Vec<HostToolSpec>,
323    /// Optional custom `reqwest::Client` for the `http.*` Lua bridge
324    /// and any other in-process HTTP traffic. SDK consumers can wire
325    /// in their own TLS roots, proxy, default headers, connection
326    /// pool tuning, etc.
327    ///
328    /// `None` falls back to `reqwest::Client::new()` with default
329    /// settings (legacy behavior).
330    pub http_client: Option<reqwest::Client>,
331    /// Override path for the `std.sql` SQLite database file. `None`
332    /// reads the `AGENT_BLOCK_SQL_PATH` env var (CLI default), or
333    /// falls back to `{base_dir}/db.sqlite`. Pass `Some(":memory:")`
334    /// for an in-memory DB (useful for tests / isolation).
335    pub sql_path: Option<PathBuf>,
336    /// Override path for the `std.kv` SQLite database file. Same
337    /// semantics as [`Self::sql_path`].
338    pub kv_path: Option<PathBuf>,
339    /// Override path for the `std.ts` SQLite database file. Same
340    /// semantics as [`Self::sql_path`].
341    pub ts_path: Option<PathBuf>,
342    /// Extra Lua globals injected into both the main Isle and the
343    /// handler Isle before the user script runs. Each entry
344    /// `(name, value)` results in `_G[name] = json_to_lua(value)`.
345    ///
346    /// Use this to parameterize an inline script from Rust without
347    /// baking the values into the Lua source (`_USER_ID`,
348    /// `_TENANT`, `_FEATURE_FLAGS`, etc.). Keys must be valid Lua
349    /// identifiers; values are any `serde_json::Value`.
350    ///
351    /// `_PROMPT`, `_CONTEXT`, and `_SCRIPT_NAME` are reserved
352    /// (managed by other `BlockConfig` fields); colliding with them
353    /// silently overrides those defaults — use with care.
354    pub extra_globals: HashMap<String, serde_json::Value>,
355    /// When `true`, the EventBus dispatcher loop is driven in the background
356    /// for the duration of the script and shut down gracefully after the
357    /// script completes. Required for SDK-embed callers that supply
358    /// [`Self::host_handlers`] and need `bus.emit(kind, payload)` events
359    /// emitted from the script to actually reach those handlers without
360    /// requiring the script to call `bus.serve()` (which blocks on
361    /// SIGTERM / Ctrl+C and never returns under programmatic embedding).
362    ///
363    /// After the script finishes, the dispatcher is given a grace window
364    /// (`AGENT_BLOCK_TASK_GRACE_MS`, default 1000ms) to drain queued events
365    /// and finish any in-flight handler, then is cancelled.
366    ///
367    /// Mutually exclusive with Lua-side `bus.serve()`: enabling this flag
368    /// takes ownership of the EventBus before the script runs, so a script
369    /// that calls `bus.on(...)` followed by `bus.serve()` will error
370    /// ("bus.serve() has already taken ownership"). Use this flag when the
371    /// script's sole purpose is to push events to host handlers.
372    ///
373    /// Defaults to `false` (legacy behavior: dispatcher only runs when the
374    /// script calls `bus.serve()`).
375    pub auto_serve_bus: bool,
376    /// Optional caller-supplied cancellation token. When cancelled, the
377    /// in-flight script is interrupted via the Isle's debug-hook cancel
378    /// path, the auto-serve dispatcher (if any) is shut down, and `run()`
379    /// returns `Err(BlockError::Cancelled)`.
380    ///
381    /// Intended for SDK consumers that spawn `run()` as a tokio task and
382    /// need an out-of-band abort signal (timeouts, parent-task cancellation
383    /// propagation, user-driven stop). The token is observed across the
384    /// `coroutine_eval` await; once cancellation propagates, the shutdown
385    /// sequence (MCP disconnect, Isle drivers, auto-serve dispatcher)
386    /// still runs so file descriptors and remote handles are released.
387    ///
388    /// Defaults to `None` (legacy behavior: `run()` only completes when
389    /// the script returns naturally).
390    pub shutdown_token: Option<CancellationToken>,
391}
392
393/// Shared context passed into Lua bridge functions.
394#[derive(Clone)]
395pub struct HostContext {
396    pub project_root: PathBuf,
397    pub mesh_agent: Option<Arc<agent_mesh_sdk::MeshAgent>>,
398    pub mcp_manager: Arc<RwLock<McpManager>>,
399    /// Shared async HTTP client for `http.*` bridge.
400    pub http_client: reqwest::Client,
401    /// Shared SQLite connection for `sql.*` bridge (user tables).
402    pub sql_conn: Arc<Mutex<rusqlite::Connection>>,
403    /// Interrupt handle for the sql connection.
404    /// Used to cancel in-flight queries on timeout (see `bridge/sql.rs`).
405    pub sql_interrupt: Arc<rusqlite::InterruptHandle>,
406    /// Shared SQLite connection for `kv.*` bridge (`__kv` table only).
407    /// Separate from sql_conn so KV scratch state and user SQL data don't
408    /// share WAL, page cache, or backup lifecycle.
409    pub kv_conn: Arc<Mutex<rusqlite::Connection>>,
410    /// Interrupt handle for the kv connection.
411    pub kv_interrupt: Arc<rusqlite::InterruptHandle>,
412    /// Shared SQLite connection for `ts.*` bridge (TSDB — time-series table).
413    /// Separate DB file so TSDB WAL does not share page cache with kv/sql.
414    pub ts_conn: Arc<Mutex<rusqlite::Connection>>,
415    /// Interrupt handle for the ts connection.
416    /// Used by `bridge::ts` to cancel in-flight queries on timeout (Subtask 2).
417    #[allow(dead_code)]
418    pub ts_interrupt: Arc<rusqlite::InterruptHandle>,
419    /// Async handle to the main Isle Lua VM that runs the user script via
420    /// `coroutine_eval`. After Subtask 2, `bridge::bus` no longer dispatches
421    /// handlers against this Isle; handlers live on `handler_isle` instead.
422    /// The field is retained because bridge code still keyed to the main
423    /// Isle (future `coroutine_call` back-edges, introspection APIs) may
424    /// need it, and removing it would force another HostContext reshape.
425    #[allow(dead_code)]
426    pub isle: Arc<AsyncIsle>,
427    /// Dedicated Isle for EventBus handler execution. Lua handlers
428    /// registered via `bus.on` / `bus.on_any` run here so that CPU-bound
429    /// handler code does not occupy the main Isle's LocalSet and block
430    /// grace timers / shutdown wakers on the main VM side.
431    ///
432    /// Used by `bridge::bus` to forward handler bytecode
433    /// (`Function::dump(true)` → `handler_isle.exec(...)`) and by
434    /// [`LuaHandler::call`](crate::bridge::bus) to dispatch via
435    /// `coroutine_call("__bus_dispatch", ...)`.
436    pub handler_isle: Arc<AsyncIsle>,
437    /// Ingress sender for the EventBus. Adapters (mesh / webhook / …)
438    /// clone this and push `Event`s. The ST3 mesh adapter captures its own
439    /// clone at `MeshAgent::connect` time, so the field itself is not read
440    /// elsewhere in the ST3 cut — kept `pub` for ST4+ adapter wiring.
441    #[allow(dead_code)]
442    pub bus_tx: mpsc::Sender<Event>,
443    /// Mutex-wrapped `Option<EventBus>` so `bus.on` / `bus.on_any` can lock
444    /// briefly from sync Lua context, and `bus.serve` can `Option::take`
445    /// ownership before entering the long-lived `run()` await (avoiding the
446    /// await-holding-lock anti-pattern on a `std::sync::Mutex`).
447    pub event_bus: Arc<Mutex<Option<EventBus>>>,
448}
449
450/// Open a SQLite connection at `path` (or `:memory:`) and apply the shared
451/// pragmas driven by ENV (`journal_mode`, `busy_timeout`). Returns the
452/// connection wrapped in Arc<Mutex<_>> together with its interrupt handle.
453///
454/// `label` is used only for the init log line (`sql` / `kv`) so that the two
455/// databases are distinguishable in tracing output.
456fn open_sqlite(
457    path: &Path,
458    label: &'static str,
459) -> BlockResult<(
460    Arc<Mutex<rusqlite::Connection>>,
461    Arc<rusqlite::InterruptHandle>,
462)> {
463    let is_memory = crate::bridge::config::is_memory_sql(path);
464    if !is_memory {
465        if let Some(parent) = path.parent() {
466            std::fs::create_dir_all(parent)
467                .map_err(|e| BlockError::Runtime(format!("{label} dir create: {e}")))?;
468        }
469    }
470    let conn = rusqlite::Connection::open(path)
471        .map_err(|e| BlockError::Runtime(format!("sqlite open {}: {e}", path.display())))?;
472    if !is_memory {
473        let journal = crate::bridge::config::sql_journal_mode();
474        conn.pragma_update(None, "journal_mode", &journal)
475            .map_err(|e| BlockError::Runtime(format!("journal_mode={journal}: {e}")))?;
476    }
477    let busy_ms = crate::bridge::config::sql_busy_timeout().as_millis() as i64;
478    conn.pragma_update(None, "busy_timeout", busy_ms)
479        .map_err(|e| BlockError::Runtime(format!("busy_timeout pragma: {e}")))?;
480    info!(label, path = %path.display(), busy_ms, "sqlite initialized");
481    let interrupt = Arc::new(conn.get_interrupt_handle());
482    let conn = Arc::new(Mutex::new(conn));
483    Ok((conn, interrupt))
484}
485
486/// Build the init closure shared between the main Isle and the handler
487/// Isle.  Sets `_SCRIPT_NAME`, registers `mlua-batteries` `std.*`, and
488/// configures `package.path` / `package.searchers` so `require "agent"`
489/// (and any `blocks/` module) works inside the Lua VM.
490///
491/// Returns an `FnOnce` so each call produces a fresh closure; this lets
492/// both Isles be spawned from the same config without `Clone` bounds on
493/// the captured `HashMap`.
494fn build_isle_init(
495    script_name: String,
496    script_dir: String,
497    blocks_paths: String,
498    prompt: Option<String>,
499    context: Option<String>,
500    extra_globals: HashMap<String, serde_json::Value>,
501) -> impl FnOnce(&mlua::Lua) -> mlua::Result<()> + Send + 'static {
502    move |lua| {
503        // Set script name before registering bridges (used by log.* for attribution)
504        lua.globals().set("_SCRIPT_NAME", script_name.as_str())?;
505        if let Some(ref p) = prompt {
506            lua.globals().set("_PROMPT", p.as_str())?;
507        }
508        if let Some(ref c) = context {
509            lua.globals().set("_CONTEXT", c.as_str())?;
510        }
511
512        mlua_batteries::register_all(lua, "std")?;
513
514        // ── extra_globals from BlockConfig ──────────────────────────
515        // Inject SDK-supplied parameterisation values into the Lua
516        // global namespace. Registered after mlua_batteries so that
517        // any value that *intentionally* shadows a `std.*` symbol
518        // wins — callers are responsible for not stomping on bridges
519        // they need.
520        for (name, value) in &extra_globals {
521            let lua_value = crate::bridge::json_to_lua(lua, value.clone())
522                .map_err(|e| mlua::Error::external(format!("extra_globals[{name}]: {e}")))?;
523            lua.globals().set(name.as_str(), lua_value)?;
524        }
525
526        // ── package.path ──────────────────────────────────────────────
527        // Priority: script_dir > project_root/blocks/ > exe_dir/blocks/ > default
528        let package: mlua::Table = lua.globals().get("package")?;
529        let current_path: String = package.get("path")?;
530        let new_path =
531            format!("{script_dir}/?.lua;{script_dir}/?/init.lua;{blocks_paths}{current_path}");
532        package.set("path", new_path)?;
533
534        // ── package.searchers — embedded fallback ─────────────────────
535        // Register a custom searcher that loads blocks/ modules from the
536        // sources baked in at compile time.  This is the lowest-priority
537        // searcher so filesystem copies always win.
538        let embedded: HashMap<&'static str, &'static str> =
539            EMBEDDED_BLOCKS.iter().copied().collect();
540
541        let searchers: mlua::Table = package.get("searchers")?;
542        let loader =
543            lua.create_function(move |lua, name: String| match embedded.get(name.as_str()) {
544                Some(source) => {
545                    let chunk = lua
546                        .load(*source)
547                        .set_name(format!("@embedded:blocks/{name}/init.lua"));
548                    let func = chunk.into_function()?;
549                    Ok(mlua::Value::Function(func))
550                }
551                None => {
552                    let msg = lua.create_string(format!("\n\tno embedded block '{name}'"))?;
553                    Ok(mlua::Value::String(msg))
554                }
555            })?;
556        // Append as the last searcher so filesystem paths remain preferred.
557        let next_idx = searchers.raw_len() + 1;
558        searchers.raw_set(next_idx, loader)?;
559
560        Ok(())
561    }
562}
563
564/// Spawn the dedicated handler Isle.
565///
566/// The handler Isle runs Lua bus handlers (`bus.on` / `bus.on_any`) on a
567/// separate OS thread with its own `tokio` current-thread runtime, keeping
568/// CPU-bound handlers from starving the main Isle's grace timers.
569///
570/// Bridge registration is deferred to a follow-up `exec` in `run()` because
571/// `HostContext` is not constructible until both Isles exist (the struct
572/// itself holds `Arc<AsyncIsle>` for both).
573async fn spawn_handler_isle(
574    script_name: String,
575    script_dir: String,
576    blocks_paths: String,
577    prompt: Option<String>,
578    context: Option<String>,
579    extra_globals: HashMap<String, serde_json::Value>,
580) -> BlockResult<(Arc<AsyncIsle>, AsyncIsleDriver)> {
581    let init = build_isle_init(
582        script_name,
583        script_dir,
584        blocks_paths,
585        prompt,
586        context,
587        extra_globals,
588    );
589    let (isle, driver) = AsyncIsle::builder()
590        .thread_name("agent-block-handler-isle")
591        .spawn(init)
592        .await
593        .map_err(|e| BlockError::Runtime(format!("handler isle spawn failed: {e}")))?;
594    info!(
595        thread_name = "agent-block-handler-isle",
596        "handler Isle spawned"
597    );
598    Ok((Arc::new(isle), driver))
599}
600
601fn hex_decode_32(s: &str) -> Result<[u8; 32], String> {
602    let s = s.trim();
603    if s.len() != 64 {
604        return Err(format!("expected 64 hex chars, got {}", s.len()));
605    }
606    let mut out = [0u8; 32];
607    for (i, byte) in out.iter_mut().enumerate() {
608        let hi = u8::from_str_radix(&s[2 * i..2 * i + 1], 16)
609            .map_err(|e| format!("invalid hex at position {}: {e}", 2 * i))?;
610        let lo = u8::from_str_radix(&s[2 * i + 1..2 * i + 2], 16)
611            .map_err(|e| format!("invalid hex at position {}: {e}", 2 * i + 1))?;
612        *byte = (hi << 4) | lo;
613    }
614    Ok(out)
615}
616
617pub async fn run(config: BlockConfig) -> BlockResult<()> {
618    // ── Resolve sources ───────────────────────────────────────────
619    // Convert the `Source` enums on `BlockConfig` to their concrete
620    // payloads before any Isle setup. `File`/`Path`/`Env` variants
621    // read from disk / environment exactly once, here at the start.
622    let (script_source, script_name, script_dir_pathbuf) = match &config.script {
623        ScriptSource::Path(p) => {
624            let source = std::fs::read_to_string(p)
625                .map_err(|e| BlockError::Script(format!("{}: {e}", p.display())))?;
626            let name = p
627                .file_name()
628                .map(|n| n.to_string_lossy().to_string())
629                .unwrap_or_else(|| "unknown".to_string());
630            let dir = p
631                .parent()
632                .map(|d| d.to_path_buf())
633                .unwrap_or_else(|| PathBuf::from("."));
634            (source, name, dir)
635        }
636        ScriptSource::Inline { source, name } => {
637            (source.clone(), name.clone(), config.project_root.clone())
638        }
639        ScriptSource::DefaultAgent => (
640            DEFAULT_AGENT_INVOKER.to_string(),
641            "default_agent_invoker.lua".to_string(),
642            config.project_root.clone(),
643        ),
644    };
645
646    let prompt_resolved: Option<String> = match &config.prompt {
647        Some(PromptSource::Inline(s)) => Some(s.clone()),
648        Some(PromptSource::File(p)) => Some(
649            std::fs::read_to_string(p)
650                .map_err(|e| BlockError::Script(format!("prompt file {}: {e}", p.display())))?,
651        ),
652        None => None,
653    };
654    let context_resolved: Option<String> = match &config.context {
655        Some(PromptSource::Inline(s)) => Some(s.clone()),
656        Some(PromptSource::File(p)) => Some(
657            std::fs::read_to_string(p)
658                .map_err(|e| BlockError::Script(format!("context file {}: {e}", p.display())))?,
659        ),
660        None => None,
661    };
662    let secret_key_resolved: Option<String> = match &config.secret_key {
663        Some(SecretKeySource::Inline(s)) => Some(s.clone()),
664        Some(SecretKeySource::Env(var)) => std::env::var(var).ok(),
665        None => None,
666    };
667
668    // NOTE: We previously held entered span guards across awaits for nested
669    // span context. That made the `run()` future `!Send`, which prevents
670    // SDK consumers from `tokio::spawn(run(config))`. Span context is
671    // attached to events via fields on the `info_span!` calls below; the
672    // missing nesting is an acceptable trade-off for `Send` correctness.
673    let _root_span = info_span!("agent_block", script = %script_name);
674
675    // ── .env ──────────────────────────────────────────────────────
676    // Load .env from project_root if present. Variables are merged into
677    // the process environment so Lua's `std.env.get()` picks them up.
678    let env_path = config.project_root.join(".env");
679    match dotenvy::from_path(&env_path) {
680        Ok(()) => info!(path = %env_path.display(), ".env loaded"),
681        Err(dotenvy::Error::Io(_)) => {} // file not found — fine
682        Err(e) => tracing::warn!(path = %env_path.display(), error = %e, ".env parse error"),
683    }
684
685    // ── Init ──────────────────────────────────────────────────────
686    let _init_span = info_span!("init");
687
688    // ── EventBus channel ─────────────────────────────────────────────
689    // Construct the bounded mpsc BEFORE MeshAgent::connect so the relay
690    // handler can hold a `bus_tx` clone and forward incoming requests
691    // into the dispatcher. Capacity is ENV-driven (see bridge::config).
692    let bus_capacity = crate::bridge::config::bus_capacity();
693    let (bus_tx, bus_rx) = mpsc::channel::<Event>(bus_capacity);
694    let event_bus = Arc::new(Mutex::new(Some(EventBus::new(bus_rx))));
695
696    // ── Pre-install host-side Rust handlers ───────────────────────────
697    // SDK consumers attach Rust handlers via `BlockConfig.host_handlers`
698    // so that script-side `bus.emit(kind, payload)` is captured by a Rust
699    // `Arc<dyn Handler>` instead of being dispatched to a Lua function.
700    // Registered here (before any Lua bridge registers handlers and before
701    // `bus.serve` takes ownership of the bus) so the EventBus already
702    // carries the host handlers when the script starts.
703    // Install host-side Rust handlers: kind-specific entries from
704    // `host_handlers` and, when set, the kind-agnostic `host_handler`
705    // (registered via `on_any` as the fallback for unmatched kinds).
706    // SDK-embed 1-shot callers typically only set `host_handler`.
707    let has_kind_handlers = !config.host_handlers.is_empty();
708    let has_any_handler = config.host_handler.is_some();
709    if has_kind_handlers || has_any_handler {
710        let mut guard = event_bus
711            .lock()
712            .map_err(|_| BlockError::Bus("event_bus mutex poisoned".into()))?;
713        let bus = guard
714            .as_mut()
715            .ok_or_else(|| BlockError::Bus("event_bus already taken".into()))?;
716        for (kind, handler) in &config.host_handlers {
717            bus.on(kind.clone(), Arc::clone(handler))
718                .map_err(|e| BlockError::Bus(format!("host_handlers on({kind}): {e}")))?;
719        }
720        if let Some(any_handler) = &config.host_handler {
721            bus.on_any(Arc::clone(any_handler))
722                .map_err(|e| BlockError::Bus(format!("host_handler on_any: {e}")))?;
723        }
724        info!(
725            kind_handlers = config.host_handlers.len(),
726            any_handler = has_any_handler,
727            "host handlers pre-installed"
728        );
729    }
730
731    // ── auto-serve: background dispatcher for SDK-embed callers ───────
732    // When `auto_serve_bus` is on and at least one host-side handler
733    // (kind-specific or kind-agnostic) is installed, take the EventBus
734    // out of the Mutex *before* the script runs and spawn the dispatcher
735    // loop on the runtime. This lets `bus.emit(kind, payload)` from the
736    // script reach the host handler without requiring the script to call
737    // `bus.serve()` (which blocks on signals and never returns under
738    // programmatic embedding).
739    let auto_serve = config.auto_serve_bus && (has_kind_handlers || has_any_handler);
740    let auto_serve_state: Option<(tokio::task::JoinHandle<()>, CancellationToken)> = if auto_serve {
741        let bus = {
742            let mut guard = event_bus
743                .lock()
744                .map_err(|_| BlockError::Bus("event_bus mutex poisoned".into()))?;
745            guard
746                .take()
747                .ok_or_else(|| BlockError::Bus("event_bus already taken".into()))?
748        };
749        let token = CancellationToken::new();
750        let token_for_task = token.clone();
751        let handle = tokio::spawn(async move {
752            let mut bus = bus;
753            if let Err(e) = bus.run(token_for_task).await {
754                tracing::error!(error = %e, "auto-serve: dispatcher loop returned error");
755            }
756        });
757        info!("auto-serve: dispatcher spawned");
758        Some((handle, token))
759    } else {
760        None
761    };
762
763    let mesh_agent = if let Some(ref relay_url) = config.relay_url {
764        let keypair = match &secret_key_resolved {
765            Some(hex_str) => {
766                let bytes = hex_decode_32(hex_str)
767                    .map_err(|e| BlockError::Runtime(format!("secret-key: {e}")))?;
768                agent_mesh_core::identity::AgentKeypair::from_bytes(&bytes)
769            }
770            None => agent_mesh_core::identity::AgentKeypair::generate(),
771        };
772        info!(agent_id = %keypair.agent_id(), "mesh identity");
773        let acl = agent_mesh_core::acl::AclPolicy {
774            default_deny: false,
775            rules: vec![],
776        };
777        let handler: Arc<dyn agent_mesh_sdk::RequestHandler> =
778            Arc::new(BusRelayHandler::new(bus_tx.clone()));
779        let url = relay_url.clone();
780        let agent = agent_mesh_sdk::MeshAgent::connect(keypair, &url, acl, handler)
781            .await
782            .map_err(|e| BlockError::Mesh(format!("connect to {relay_url} failed: {e}")))?;
783        info!(relay_url = %relay_url, "mesh connected");
784        Some(Arc::new(agent))
785    } else {
786        None
787    };
788
789    let mcp_manager = Arc::new(RwLock::new(McpManager::with_rpc_timeout(
790        config.mcp_rpc_timeout,
791    )?));
792
793    // Resolve project_root to absolute path.
794    // canonicalize() can fail if the path doesn't exist; fall back to
795    // joining with current_dir to guarantee an absolute path.
796    let project_root = config
797        .project_root
798        .canonicalize()
799        .or_else(|_| std::env::current_dir().map(|cwd| cwd.join(&config.project_root)))?;
800
801    // HTTP client: prefer the SDK-supplied client if any; otherwise
802    // construct a fresh default reqwest::Client (legacy behavior).
803    let http_client = config.http_client.clone().unwrap_or_default();
804
805    // ── SQLite init (kv + sql get separate DB files) ──────────────────────
806    // BlockConfig overrides take precedence; otherwise the env-driven
807    // resolution in `bridge::config::*` applies (see crate docs).
808    let sql_path = match &config.sql_path {
809        Some(p) => p.clone(),
810        None => crate::bridge::config::sql_path().map_err(BlockError::Runtime)?,
811    };
812    let (sql_conn, sql_interrupt) = open_sqlite(&sql_path, "sql")?;
813
814    let kv_path = match &config.kv_path {
815        Some(p) => p.clone(),
816        None => crate::bridge::config::kv_path().map_err(BlockError::Runtime)?,
817    };
818    let (kv_conn, kv_interrupt) = open_sqlite(&kv_path, "kv")?;
819
820    let ts_path = match &config.ts_path {
821        Some(p) => p.clone(),
822        None => crate::bridge::config::ts_path().map_err(BlockError::Runtime)?,
823    };
824    let (ts_conn, ts_interrupt) = open_sqlite(&ts_path, "ts")?;
825
826    // Use the script dir derived from the resolved `ScriptSource` for
827    // `package.path` lookups. For inline / default-agent variants the dir
828    // falls back to `project_root` (set during source resolution above).
829    let script_dir = script_dir_pathbuf.to_string_lossy().to_string();
830
831    // Precompute values captured by the init closure so we don't need to
832    // move the full `HostContext` into it (HostContext now holds
833    // `Arc<AsyncIsle>`, which is available only after `AsyncIsle::spawn`
834    // returns — classic chicken-and-egg). All bridge registrations run in a
835    // second pass via `isle.exec` below.
836    let blocks_paths = build_blocks_path(&project_root);
837    let prompt = prompt_resolved.clone();
838    let context = context_resolved.clone();
839
840    // ── main Isle ─────────────────────────────────────────────────
841    let (isle, driver) = AsyncIsle::spawn(build_isle_init(
842        script_name.clone(),
843        script_dir.clone(),
844        blocks_paths.clone(),
845        prompt.clone(),
846        context.clone(),
847        config.extra_globals.clone(),
848    ))
849    .await
850    .map_err(|e| BlockError::Runtime(format!("AsyncIsle spawn failed: {e}")))?;
851    let isle = Arc::new(isle);
852
853    // ── handler Isle (sequential, dependencies are trivial) ────────
854    let (handler_isle, handler_driver) = spawn_handler_isle(
855        script_name.clone(),
856        script_dir.clone(),
857        blocks_paths.clone(),
858        prompt,
859        context,
860        config.extra_globals.clone(),
861    )
862    .await?;
863
864    // Wire both Isles into McpManager so Lua notification callbacks can be
865    // dispatched from the rmcp task thread.
866    // - handler_isle: sampling/createMessage dispatch (exec on handler Isle)
867    // - main_isle: progress/log notification dispatch (exec on main Isle so
868    //   user callback upvalues are preserved — no bytecode dump/reload needed)
869    {
870        let mut mgr = mcp_manager.write().await;
871        mgr.set_handler_isle(Arc::clone(&handler_isle));
872        mgr.set_main_isle(Arc::clone(&isle));
873    }
874
875    // ── HostContext + bridge registration ──────────────────────────────
876    // Wrap the isle in an Arc so `HostContext` can hand it to
877    // `bridge::bus` (which uses `AsyncIsle::coroutine_call` to invoke Lua
878    // handlers from the EventBus dispatcher task).
879    let ctx = HostContext {
880        project_root,
881        mesh_agent,
882        mcp_manager: Arc::clone(&mcp_manager),
883        http_client,
884        sql_conn,
885        sql_interrupt,
886        kv_conn,
887        kv_interrupt,
888        ts_conn,
889        ts_interrupt,
890        isle: Arc::clone(&isle),
891        handler_isle: Arc::clone(&handler_isle),
892        bus_tx: bus_tx.clone(),
893        event_bus: Arc::clone(&event_bus),
894    };
895
896    {
897        let ctx = ctx.clone();
898        isle.exec(move |lua| {
899            bridge::register_all(lua, &ctx)
900                .map_err(|e| mlua_isle::IsleError::Lua(format!("bridge register failed: {e}")))?;
901            Ok(String::new())
902        })
903        .await
904        .map_err(|e| BlockError::Runtime(format!("bridge register: {e}")))?;
905    }
906
907    {
908        let ctx = ctx.clone();
909        handler_isle
910            .exec(move |lua| {
911                bridge::register_all_handler_side(lua, &ctx).map_err(|e| {
912                    mlua_isle::IsleError::Lua(format!("handler bridge register failed: {e}"))
913                })?;
914                Ok(String::new())
915            })
916            .await
917            .map_err(|e| BlockError::Runtime(format!("handler bridge register: {e}")))?;
918    }
919
920    // ── Inject host_tools into the Lua tool registry ───────────────
921    // Done after `bridge::register_all` so `_TOOL_REGISTRY` exists.
922    // Each entry becomes an Anthropic-shaped tool spec table
923    //   { name, schema = { description, input_schema }, handler, group? }
924    // where `handler` is a Lua async function that bridges back into
925    // the supplied `ToolHandler::call`. Lua-side `tool.list()` /
926    // `tool.schema()` / `agent.run` see these uniformly with native
927    // Lua-defined tools.
928    if !config.host_tools.is_empty() {
929        let host_tools = config.host_tools.clone();
930        let tool_count = host_tools.len();
931        isle.exec(move |lua| {
932            let registry: mlua::Table = lua
933                .globals()
934                .get("_TOOL_REGISTRY")
935                .map_err(|e| mlua_isle::IsleError::Lua(format!("get _TOOL_REGISTRY: {e}")))?;
936            for tool in host_tools {
937                let entry = lua
938                    .create_table()
939                    .map_err(|e| mlua_isle::IsleError::Lua(format!("create entry: {e}")))?;
940                entry
941                    .set("name", tool.name.as_str())
942                    .map_err(|e| mlua_isle::IsleError::Lua(format!("set name: {e}")))?;
943                // schema = { description, input_schema } — Anthropic shape
944                let schema = lua
945                    .create_table()
946                    .map_err(|e| mlua_isle::IsleError::Lua(format!("create schema: {e}")))?;
947                schema
948                    .set("description", tool.description.as_str())
949                    .map_err(|e| mlua_isle::IsleError::Lua(format!("set description: {e}")))?;
950                let input_schema_lua =
951                    crate::bridge::json_to_lua(lua, tool.input_schema.clone())
952                        .map_err(|e| mlua_isle::IsleError::Lua(format!("input_schema: {e}")))?;
953                schema
954                    .set("input_schema", input_schema_lua)
955                    .map_err(|e| mlua_isle::IsleError::Lua(format!("set input_schema: {e}")))?;
956                entry
957                    .set("schema", schema)
958                    .map_err(|e| mlua_isle::IsleError::Lua(format!("set schema: {e}")))?;
959                if let Some(group) = &tool.group {
960                    entry
961                        .set("group", group.as_str())
962                        .map_err(|e| mlua_isle::IsleError::Lua(format!("set group: {e}")))?;
963                }
964                let handler_arc = Arc::clone(&tool.handler);
965                let handler_fn = lua
966                    .create_async_function(move |lua, input: mlua::Value| {
967                        let handler = Arc::clone(&handler_arc);
968                        async move {
969                            let input_json = crate::bridge::lua_to_json(&lua, input)?;
970                            let result = handler
971                                .call(input_json)
972                                .await
973                                .map_err(mlua::Error::external)?;
974                            crate::bridge::json_to_lua(&lua, result)
975                        }
976                    })
977                    .map_err(|e| mlua_isle::IsleError::Lua(format!("create handler: {e}")))?;
978                entry
979                    .set("handler", handler_fn)
980                    .map_err(|e| mlua_isle::IsleError::Lua(format!("set handler: {e}")))?;
981                registry
982                    .set(tool.name.as_str(), entry)
983                    .map_err(|e| mlua_isle::IsleError::Lua(format!("registry set: {e}")))?;
984            }
985            Ok(String::new())
986        })
987        .await
988        .map_err(|e| BlockError::Runtime(format!("host_tools inject: {e}")))?;
989        info!(count = tool_count, "host tools injected into Lua registry");
990    }
991
992    drop(_init_span);
993
994    // ── Execute ───────────────────────────────────────────────────
995    // When `shutdown_token` is supplied, race the script future against
996    // the caller's cancellation signal. On cancel, propagate to the Isle
997    // via the AsyncTask's cancel token so the debug hook unwinds the Lua
998    // VM, then continue into the shutdown sequence below (we still want
999    // to release MCP/mesh handles and join the auto-serve dispatcher
1000    // before returning).
1001    let script_result: Result<(), BlockError> = {
1002        let _exec_span = info_span!("execute", script = %script_name);
1003
1004        let mut task = isle.spawn_coroutine_eval(&script_source);
1005        let task_cancel = task.cancel_token().clone();
1006        match config.shutdown_token.as_ref() {
1007            Some(token) => {
1008                tokio::select! {
1009                    biased;
1010                    _ = token.cancelled() => {
1011                        task_cancel.cancel();
1012                        // Wait for the Isle to unwind so the VM is in a
1013                        // consistent state before driver shutdown. The
1014                        // debug hook fires at the next HOOK_INTERVAL.
1015                        let _ = (&mut task).await;
1016                        info!("shutdown_token: cancelled by caller");
1017                        Err(BlockError::Cancelled)
1018                    }
1019                    res = &mut task => res.map(|_| ()).map_err(|e| BlockError::Script(format!("{e}"))),
1020                }
1021            }
1022            None => (&mut task)
1023                .await
1024                .map(|_| ())
1025                .map_err(|e| BlockError::Script(format!("{e}"))),
1026        }
1027    };
1028
1029    // ── auto-serve drain + cancel ─────────────────────────────────
1030    // Let the dispatcher drain events queued by the script, then signal
1031    // shutdown and bound the join. Mirrors `bus.serve`'s grace pattern.
1032    if let Some((handle, token)) = auto_serve_state {
1033        let grace_ms = crate::bridge::config::task_grace_ms();
1034        let grace = Duration::from_millis(grace_ms);
1035        tokio::time::sleep(grace).await;
1036        token.cancel();
1037        match tokio::time::timeout(grace, handle).await {
1038            Ok(Ok(())) => info!("auto-serve: dispatcher shut down cleanly"),
1039            Ok(Err(join_err)) => {
1040                tracing::error!(error = %join_err, "auto-serve: dispatcher task join error");
1041            }
1042            Err(_) => {
1043                tracing::warn!(
1044                    grace_ms,
1045                    "auto-serve: dispatcher join timed out after cancel; forcing exit"
1046                );
1047            }
1048        }
1049    }
1050
1051    // ── Shutdown ──────────────────────────────────────────────────
1052    {
1053        let _shutdown_span = info_span!("shutdown");
1054
1055        mcp_manager.write().await.disconnect_all().await?;
1056
1057        driver
1058            .shutdown()
1059            .await
1060            .map_err(|e| BlockError::Runtime(format!("AsyncIsle shutdown failed: {e}")))?;
1061
1062        // Handler Isle shutdown is independent of main shutdown: a failure
1063        // here (e.g. ThreadPanic on the handler thread) is logged but does
1064        // not poison the main process exit. The main Isle has already
1065        // been stopped cleanly above.
1066        match handler_driver.shutdown().await {
1067            Ok(()) => info!(
1068                thread_name = "agent-block-handler-isle",
1069                "handler Isle shut down"
1070            ),
1071            Err(e) => tracing::error!(
1072                error = %e,
1073                thread_name = "agent-block-handler-isle",
1074                "handler Isle shutdown failed"
1075            ),
1076        }
1077    }
1078
1079    script_result
1080}
1081
1082/// mesh → bus source adapter.
1083///
1084/// Implements [`agent_mesh_sdk::RequestHandler`] by packaging every incoming
1085/// mesh request into an [`Event`] with `kind = "mesh"`, pushing it onto the
1086/// bounded `bus_tx` channel, and awaiting the Lua handler's ack over a
1087/// oneshot channel carried inside the event.
1088///
1089/// Error paths (all `tracing::error!`-logged — silent-err-drop policy):
1090///
1091/// | Failure                   | Return value                           |
1092/// |---------------------------|----------------------------------------|
1093/// | `bus_tx.send` closed/full | `{"error": "bus channel closed"}`      |
1094/// | ack receiver dropped      | `{"error": "ack dropped"}`             |
1095/// | Lua handler `BlockError`  | `{"error": "<handler error>"}`         |
1096/// | Handler exceeded 30s      | `{"error": "handler timeout"}`         |
1097///
1098/// The 30s ack timeout mirrors the client-side timeout on `mesh.request`
1099/// (see `src/bridge/mesh.rs`).
1100struct BusRelayHandler {
1101    tx: mpsc::Sender<Event>,
1102}
1103
1104impl BusRelayHandler {
1105    fn new(tx: mpsc::Sender<Event>) -> Self {
1106        Self { tx }
1107    }
1108}
1109
1110/// Bound used for both the mesh-adapter ack wait and other source timeouts.
1111const BUS_ACK_TIMEOUT: Duration = Duration::from_secs(30);
1112
1113#[async_trait::async_trait]
1114impl agent_mesh_sdk::RequestHandler for BusRelayHandler {
1115    async fn handle(
1116        &self,
1117        from: &agent_mesh_core::identity::AgentId,
1118        payload: &serde_json::Value,
1119        _cancel: agent_mesh_sdk::CancelToken,
1120    ) -> serde_json::Value {
1121        let id = uuid::Uuid::new_v4().to_string();
1122        let meta = serde_json::json!({"from": from.to_string()});
1123        let (ack_tx, ack_rx) = oneshot::channel();
1124        let event = Event {
1125            kind: "mesh".into(),
1126            id: id.clone(),
1127            payload: payload.clone(),
1128            meta,
1129            ack_tx: Some(ack_tx),
1130        };
1131
1132        if let Err(e) = self.tx.send(event).await {
1133            tracing::error!(error = %e, id = %id, "bus channel closed; rejecting mesh request");
1134            return serde_json::json!({"error": "bus channel closed"});
1135        }
1136
1137        match tokio::time::timeout(BUS_ACK_TIMEOUT, ack_rx).await {
1138            Ok(Ok(Ok(v))) => v,
1139            Ok(Ok(Err(e))) => {
1140                tracing::error!(id = %id, error = %e, "mesh handler returned error");
1141                serde_json::json!({"error": e.to_string()})
1142            }
1143            Ok(Err(e)) => {
1144                tracing::error!(id = %id, error = %e, "mesh ack receiver dropped");
1145                serde_json::json!({"error": "ack dropped"})
1146            }
1147            Err(_) => {
1148                tracing::error!(id = %id, timeout_secs = BUS_ACK_TIMEOUT.as_secs(), "mesh handler timeout");
1149                serde_json::json!({"error": "handler timeout"})
1150            }
1151        }
1152    }
1153}