agent_block_core/host.rs
1//! Host — the thin Rust shell that wires up Lua VM, Mesh, HTTP, and MCP.
2//!
3//! # Responsibilities
4//!
5//! 1. Spawn an mlua-isle `AsyncIsle` (dedicated Lua VM thread with coroutine support)
6//! 2. Optionally connect to agent-mesh relay
7//! 3. Initialize the MCP manager for stdio-based MCP server connections
8//! 4. Inject all Lua stdlib bridges (`mesh.*`, `http.*`, `sh.*`, `tool.*`, `log.*`, `mcp.*`)
9//! 5. Execute the user-provided Lua script via `coroutine_eval` (async-aware)
10//! 6. Graceful shutdown (Isle + MCP servers + mesh)
11
12use std::collections::HashMap;
13use std::path::{Path, PathBuf};
14use std::sync::{Arc, Mutex};
15use std::time::Duration;
16use tokio::sync::{mpsc, oneshot, RwLock};
17
18use mlua_isle::{AsyncIsle, AsyncIsleDriver};
19use tracing::{info, info_span, warn};
20
21use crate::bridge;
22use crate::bus::{Event, EventBus, Handler};
23use agent_block_mcp::McpManager;
24use agent_block_types::error::{BlockError, BlockResult};
25use tokio_util::sync::CancellationToken;
26
27/// Embedded Lua sources for blocks/ StdPkg modules.
28/// These are baked into the binary at compile time so `cargo install` works
29/// without any extra file distribution.
30const EMBEDDED_BLOCKS: &[(&str, &str)] = &[
31 ("agent", include_str!("../blocks/agent/init.lua")),
32 ("session", include_str!("../blocks/session/init.lua")),
33 (
34 "compile_loop",
35 include_str!("../blocks/compile_loop/init.lua"),
36 ),
37];
38
39/// Embedded default agent invoker used by [`ScriptSource::DefaultAgent`].
40///
41/// Runs the StdPkg `agent` module with `_PROMPT` / `_CONTEXT` injected and
42/// emits the result on the EventBus. The emit kind is `"_"` — a neutral
43/// label with no SDK-side meaning. The result is intended to be received
44/// via [`BlockConfig::host_handler`] (the kind-agnostic single sink); the
45/// literal label is irrelevant to SDK consumers.
46const DEFAULT_AGENT_INVOKER: &str = r#"
47local agent = require("agent")
48local r = agent.run({
49 prompt = _PROMPT,
50 system = _CONTEXT,
51})
52bus.emit("_", r)
53"#;
54
55/// How the Lua script source for `run()` is supplied.
56///
57/// `Path` matches the CLI form (`agent-block -s <path>`), reading from
58/// the filesystem at start. `Inline` lets SDK consumers pass a script
59/// they hold in memory (compile-time `include_str!`, dynamically built
60/// string, etc.) without writing it to a tempfile. `DefaultAgent` uses
61/// an embedded invoker that runs the StdPkg `agent` module with the
62/// caller-supplied prompt/context and emits the result via
63/// `bus.emit("agent_result", ...)`.
64#[derive(Debug, Clone)]
65pub enum ScriptSource {
66 /// Read the script from a filesystem path at start.
67 Path(PathBuf),
68 /// Use the supplied source code directly.
69 Inline {
70 /// Lua source code.
71 source: String,
72 /// Display name used in tracing, error messages, and the Lua
73 /// `_SCRIPT_NAME` global (e.g. `"agent_invoker.lua"`).
74 name: String,
75 },
76 /// Use the embedded default agent invoker. `prompt` / `context`
77 /// are forwarded as `_PROMPT` / `_CONTEXT` Lua globals and the
78 /// agent result is emitted on the EventBus under a neutral label
79 /// (`"_"`). SDK consumers should pair this with
80 /// [`BlockConfig::host_handler`] (the kind-agnostic single sink)
81 /// and `auto_serve_bus = true`. The emit-kind is intentionally
82 /// meaningless; consumers that need string-keyed routing should
83 /// supply [`ScriptSource::Inline`] with their own invoker.
84 DefaultAgent,
85}
86
87/// How a string payload (prompt / system context) is supplied.
88///
89/// `Inline` is the literal string variant (CLI `--prompt` / `--context`).
90/// `File` reads the contents from disk at `run()` start (CLI
91/// `--prompt-file` / `--context-file`).
92#[derive(Debug, Clone)]
93pub enum PromptSource {
94 /// Literal string.
95 Inline(String),
96 /// Filesystem path; contents are read at `run()` start.
97 File(PathBuf),
98}
99
100/// How the Ed25519 mesh identity secret key is supplied.
101///
102/// `Inline` is a 64-hex literal. `Env` reads the named environment
103/// variable at `run()` start (CLI default uses
104/// `AGENT_BLOCK_MESH_SECRET_KEY`). Absence of any `SecretKeySource`
105/// (i.e. `BlockConfig.secret_key = None`) causes a random keypair to
106/// be generated, matching the prior behavior.
107#[derive(Debug, Clone)]
108pub enum SecretKeySource {
109 /// 64-character hex literal.
110 Inline(String),
111 /// Environment variable name to read at start.
112 Env(String),
113}
114
115/// Async handler invoked when the LLM (or a Lua call to
116/// `tool.call(name, ...)`) targets a Rust-implemented tool supplied via
117/// [`BlockConfig::host_tools`].
118///
119/// `input` arrives as a `serde_json::Value` (converted from Lua before
120/// the handler is invoked). The returned value is converted back to a
121/// Lua value and delivered to the caller. Errors are propagated as
122/// `LuaError::external` (visible inside the script) and as `BlockError`
123/// on the Rust side.
124#[async_trait::async_trait]
125pub trait ToolHandler: Send + Sync + 'static {
126 async fn call(&self, input: serde_json::Value) -> Result<serde_json::Value, BlockError>;
127}
128
129/// Declarative spec for a Rust-implemented tool injected into the Lua
130/// tool registry before the user script runs. The resulting entry is
131/// indistinguishable from a Lua-defined tool from the script's view:
132/// `tool.call("<name>", input)`, `agent.run({ ... })` tool dispatch,
133/// and `tool.schema()` enumeration all work uniformly.
134#[derive(Clone)]
135pub struct HostToolSpec {
136 /// Tool name. Becomes the routing key in `_TOOL_REGISTRY` and the
137 /// `name` field exposed by `tool.schema()` (Anthropic tool spec).
138 pub name: String,
139 /// Free-form description shown to the LLM. Becomes the
140 /// `description` field of the Anthropic tool spec.
141 pub description: String,
142 /// Input schema (Anthropic-compatible JSON Schema object).
143 pub input_schema: serde_json::Value,
144 /// Optional group label for [`agent.run`'s `tool_groups`] filter
145 /// and for [`BlockConfig::tool_policy`] (planned).
146 pub group: Option<String>,
147 /// Rust callback dispatched on every invocation.
148 pub handler: Arc<dyn ToolHandler>,
149}
150
151impl std::fmt::Debug for HostToolSpec {
152 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153 f.debug_struct("HostToolSpec")
154 .field("name", &self.name)
155 .field("description", &self.description)
156 .field("input_schema", &self.input_schema)
157 .field("group", &self.group)
158 .field("handler", &"<dyn ToolHandler>")
159 .finish()
160 }
161}
162
163/// Snapshot of a tool that a given [`BlockConfig`] will (statically)
164/// expose to the LLM. Produced by [`inspect_tools`] without running
165/// the script. MCP server tools are *not* included because they are
166/// only known after the MCP `initialize` handshake completes; callers
167/// that need that view should run the script and call `tool.schema()`
168/// from Lua.
169#[derive(Debug, Clone)]
170pub struct ToolMeta {
171 pub name: String,
172 pub description: String,
173 pub group: Option<String>,
174 pub source: ToolSource,
175}
176
177/// Origin of a tool listed by [`inspect_tools`].
178#[derive(Debug, Clone, PartialEq, Eq)]
179pub enum ToolSource {
180 /// Supplied via [`BlockConfig::host_tools`] (Rust-implemented).
181 HostRust,
182 /// Embedded StdPkg block (`agent`, `compile_loop`, …) — discovered
183 /// statically from [`EMBEDDED_BLOCKS`]. Note: not every embedded
184 /// block exposes a registered tool; this entry simply records that
185 /// the module is available via `require(...)`.
186 EmbeddedBlock,
187}
188
189/// Inspect the tools a [`BlockConfig`] will expose to the LLM without
190/// actually running the script. Returns the merged list of
191/// `host_tools` (declared in the config) and embedded-block sources.
192///
193/// MCP server tools are deliberately omitted — they only become known
194/// after the MCP `initialize` handshake. Use `tool.schema()` from
195/// inside the running script for that view.
196pub fn inspect_tools(config: &BlockConfig) -> Vec<ToolMeta> {
197 let mut out = Vec::new();
198 for t in &config.host_tools {
199 out.push(ToolMeta {
200 name: t.name.clone(),
201 description: t.description.clone(),
202 group: t.group.clone(),
203 source: ToolSource::HostRust,
204 });
205 }
206 for (name, _src) in EMBEDDED_BLOCKS {
207 out.push(ToolMeta {
208 name: (*name).to_string(),
209 description: format!("Embedded StdPkg block (require(\"{name}\"))"),
210 group: None,
211 source: ToolSource::EmbeddedBlock,
212 });
213 }
214 out
215}
216
217/// Build the `blocks/` portion of `package.path` from filesystem locations.
218///
219/// Priority (highest first):
220/// 1. `project_root/blocks/` — user-customisable, overrides embedded StdPkg
221/// 2. `exe_dir/blocks/` — development hot-reload (next to the binary)
222///
223/// Returns a semicolon-terminated string ready to prepend to `package.path`,
224/// or an empty string when no `blocks/` directories are found.
225fn build_blocks_path(project_root: &Path) -> String {
226 let mut out = String::new();
227
228 // 1. project_root/blocks/
229 let project_blocks = project_root.join("blocks");
230 if project_blocks.is_dir() {
231 let pb = project_blocks.to_string_lossy();
232 out.push_str(&format!("{pb}/?.lua;{pb}/?/init.lua;"));
233 }
234
235 // 2. exe_dir/blocks/
236 match std::env::current_exe() {
237 Ok(exe) => {
238 if let Some(exe_dir) = exe.parent() {
239 let exe_blocks = exe_dir.join("blocks");
240 if exe_blocks.is_dir() {
241 let eb = exe_blocks.to_string_lossy();
242 out.push_str(&format!("{eb}/?.lua;{eb}/?/init.lua;"));
243 }
244 }
245 }
246 Err(e) => {
247 warn!(error = %e, "current_exe() failed; skipping exe_dir/blocks/ from package.path");
248 }
249 }
250
251 out
252}
253
254pub struct BlockConfig {
255 /// Lua script to execute. See [`ScriptSource`] for the supported
256 /// shapes (filesystem path / inline source / embedded default
257 /// agent invoker).
258 pub script: ScriptSource,
259 pub project_root: PathBuf,
260 pub relay_url: Option<String>,
261 /// Ed25519 secret key for mesh identity. See [`SecretKeySource`]
262 /// for the supported shapes (inline 64-hex / environment variable).
263 /// `None` generates a random keypair. Required to talk to
264 /// registry/ACL-gated hosted meshes.
265 pub secret_key: Option<SecretKeySource>,
266 /// Per-RPC timeout for every MCP round-trip (connect / list / call).
267 /// Defaults to [`agent_block_mcp::DEFAULT_RPC_TIMEOUT`].
268 pub mcp_rpc_timeout: Duration,
269 /// Prompt payload injected as `_PROMPT` Lua global. See
270 /// [`PromptSource`] for the supported shapes. `None` leaves the
271 /// global unset.
272 pub prompt: Option<PromptSource>,
273 /// Context payload injected as `_CONTEXT` Lua global (typically
274 /// the system prompt). Same shape rules as [`Self::prompt`].
275 pub context: Option<PromptSource>,
276 /// Host-side Rust handlers pre-installed on the EventBus before the user
277 /// script starts. Each entry registers `handler` against `kind` via
278 /// [`EventBus::on`], so a script-side `bus.emit(kind, payload)` is
279 /// captured by the Rust handler rather than dispatched to a Lua function.
280 ///
281 /// Intended for SDK consumers that embed `agent-block-core` and need to
282 /// receive script output programmatically (e.g. a Spawner adapter that
283 /// turns LLM script output into a typed `WorkerResult`). Lua-side
284 /// `bus.on(kind, fn)` registrations layered on top of the handler Isle
285 /// are still possible, but the EventBus dispatches a single handler per
286 /// `kind` (last-write-wins), so host-side and Lua-side registrations on
287 /// the same `kind` collide; choose one side per routing key.
288 ///
289 /// Defaults to an empty map (no host handlers).
290 pub host_handlers: HashMap<String, Arc<dyn Handler>>,
291 /// Single host-side Rust handler that catches every event regardless
292 /// of `kind`. Internally registered via [`EventBus::on_any`], so it
293 /// acts as a fallback when no entry in [`Self::host_handlers`]
294 /// matches the incoming `kind`.
295 ///
296 /// This is the SDK-embed 1-shot sink: SDK consumers do not need to
297 /// invent or coordinate a string `kind` between the Lua script and
298 /// their Rust code. The agent invoker's emit-kind is irrelevant —
299 /// the handler receives every event.
300 ///
301 /// Use this when you want a single Rust handler to receive results
302 /// (typical embedded use). Use [`Self::host_handlers`] instead when
303 /// you actually need string-keyed routing (multi-source / multi-
304 /// handler dispatch). The two may coexist: kind-specific handlers
305 /// in `host_handlers` take precedence, and this single handler is
306 /// the fallback for unmatched kinds.
307 ///
308 /// Defaults to `None`.
309 pub host_handler: Option<Arc<dyn Handler>>,
310 /// Rust-implemented tools injected into the Lua tool registry
311 /// before the user script runs. Each entry becomes
312 /// indistinguishable from a Lua-defined tool: it is discoverable
313 /// via `tool.list()` / `tool.schema()`, dispatchable via
314 /// `tool.call(name, input)`, and visible to `agent.run`'s LLM
315 /// function-calling.
316 ///
317 /// SDK consumers can use this to expose Rust capabilities
318 /// (database lookups, business logic, etc.) to the LLM without
319 /// writing any Lua. See [`HostToolSpec`] and [`ToolHandler`].
320 ///
321 /// Defaults to an empty list.
322 pub host_tools: Vec<HostToolSpec>,
323 /// Optional custom `reqwest::Client` for the `http.*` Lua bridge
324 /// and any other in-process HTTP traffic. SDK consumers can wire
325 /// in their own TLS roots, proxy, default headers, connection
326 /// pool tuning, etc.
327 ///
328 /// `None` falls back to `reqwest::Client::new()` with default
329 /// settings (legacy behavior).
330 pub http_client: Option<reqwest::Client>,
331 /// Override path for the `std.sql` SQLite database file. `None`
332 /// reads the `AGENT_BLOCK_SQL_PATH` env var (CLI default), or
333 /// falls back to `{base_dir}/db.sqlite`. Pass `Some(":memory:")`
334 /// for an in-memory DB (useful for tests / isolation).
335 pub sql_path: Option<PathBuf>,
336 /// Override path for the `std.kv` SQLite database file. Same
337 /// semantics as [`Self::sql_path`].
338 pub kv_path: Option<PathBuf>,
339 /// Override path for the `std.ts` SQLite database file. Same
340 /// semantics as [`Self::sql_path`].
341 pub ts_path: Option<PathBuf>,
342 /// Extra Lua globals injected into both the main Isle and the
343 /// handler Isle before the user script runs. Each entry
344 /// `(name, value)` results in `_G[name] = json_to_lua(value)`.
345 ///
346 /// Use this to parameterize an inline script from Rust without
347 /// baking the values into the Lua source (`_USER_ID`,
348 /// `_TENANT`, `_FEATURE_FLAGS`, etc.). Keys must be valid Lua
349 /// identifiers; values are any `serde_json::Value`.
350 ///
351 /// `_PROMPT`, `_CONTEXT`, and `_SCRIPT_NAME` are reserved
352 /// (managed by other `BlockConfig` fields); colliding with them
353 /// silently overrides those defaults — use with care.
354 pub extra_globals: HashMap<String, serde_json::Value>,
355 /// When `true`, the EventBus dispatcher loop is driven in the background
356 /// for the duration of the script and shut down gracefully after the
357 /// script completes. Required for SDK-embed callers that supply
358 /// [`Self::host_handlers`] and need `bus.emit(kind, payload)` events
359 /// emitted from the script to actually reach those handlers without
360 /// requiring the script to call `bus.serve()` (which blocks on
361 /// SIGTERM / Ctrl+C and never returns under programmatic embedding).
362 ///
363 /// After the script finishes, the dispatcher is given a grace window
364 /// (`AGENT_BLOCK_TASK_GRACE_MS`, default 1000ms) to drain queued events
365 /// and finish any in-flight handler, then is cancelled.
366 ///
367 /// Mutually exclusive with Lua-side `bus.serve()`: enabling this flag
368 /// takes ownership of the EventBus before the script runs, so a script
369 /// that calls `bus.on(...)` followed by `bus.serve()` will error
370 /// ("bus.serve() has already taken ownership"). Use this flag when the
371 /// script's sole purpose is to push events to host handlers.
372 ///
373 /// Defaults to `false` (legacy behavior: dispatcher only runs when the
374 /// script calls `bus.serve()`).
375 pub auto_serve_bus: bool,
376 /// Optional caller-supplied cancellation token. When cancelled, the
377 /// in-flight script is interrupted via the Isle's debug-hook cancel
378 /// path, the auto-serve dispatcher (if any) is shut down, and `run()`
379 /// returns `Err(BlockError::Cancelled)`.
380 ///
381 /// Intended for SDK consumers that spawn `run()` as a tokio task and
382 /// need an out-of-band abort signal (timeouts, parent-task cancellation
383 /// propagation, user-driven stop). The token is observed across the
384 /// `coroutine_eval` await; once cancellation propagates, the shutdown
385 /// sequence (MCP disconnect, Isle drivers, auto-serve dispatcher)
386 /// still runs so file descriptors and remote handles are released.
387 ///
388 /// Defaults to `None` (legacy behavior: `run()` only completes when
389 /// the script returns naturally).
390 pub shutdown_token: Option<CancellationToken>,
391}
392
393/// Shared context passed into Lua bridge functions.
394#[derive(Clone)]
395pub struct HostContext {
396 pub project_root: PathBuf,
397 pub mesh_agent: Option<Arc<agent_mesh_sdk::MeshAgent>>,
398 pub mcp_manager: Arc<RwLock<McpManager>>,
399 /// Shared async HTTP client for `http.*` bridge.
400 pub http_client: reqwest::Client,
401 /// Shared SQLite connection for `sql.*` bridge (user tables).
402 pub sql_conn: Arc<Mutex<rusqlite::Connection>>,
403 /// Interrupt handle for the sql connection.
404 /// Used to cancel in-flight queries on timeout (see `bridge/sql.rs`).
405 pub sql_interrupt: Arc<rusqlite::InterruptHandle>,
406 /// Shared SQLite connection for `kv.*` bridge (`__kv` table only).
407 /// Separate from sql_conn so KV scratch state and user SQL data don't
408 /// share WAL, page cache, or backup lifecycle.
409 pub kv_conn: Arc<Mutex<rusqlite::Connection>>,
410 /// Interrupt handle for the kv connection.
411 pub kv_interrupt: Arc<rusqlite::InterruptHandle>,
412 /// Shared SQLite connection for `ts.*` bridge (TSDB — time-series table).
413 /// Separate DB file so TSDB WAL does not share page cache with kv/sql.
414 pub ts_conn: Arc<Mutex<rusqlite::Connection>>,
415 /// Interrupt handle for the ts connection.
416 /// Used by `bridge::ts` to cancel in-flight queries on timeout (Subtask 2).
417 #[allow(dead_code)]
418 pub ts_interrupt: Arc<rusqlite::InterruptHandle>,
419 /// Async handle to the main Isle Lua VM that runs the user script via
420 /// `coroutine_eval`. After Subtask 2, `bridge::bus` no longer dispatches
421 /// handlers against this Isle; handlers live on `handler_isle` instead.
422 /// The field is retained because bridge code still keyed to the main
423 /// Isle (future `coroutine_call` back-edges, introspection APIs) may
424 /// need it, and removing it would force another HostContext reshape.
425 #[allow(dead_code)]
426 pub isle: Arc<AsyncIsle>,
427 /// Dedicated Isle for EventBus handler execution. Lua handlers
428 /// registered via `bus.on` / `bus.on_any` run here so that CPU-bound
429 /// handler code does not occupy the main Isle's LocalSet and block
430 /// grace timers / shutdown wakers on the main VM side.
431 ///
432 /// Used by `bridge::bus` to forward handler bytecode
433 /// (`Function::dump(true)` → `handler_isle.exec(...)`) and by
434 /// [`LuaHandler::call`](crate::bridge::bus) to dispatch via
435 /// `coroutine_call("__bus_dispatch", ...)`.
436 pub handler_isle: Arc<AsyncIsle>,
437 /// Ingress sender for the EventBus. Adapters (mesh / webhook / …)
438 /// clone this and push `Event`s. The ST3 mesh adapter captures its own
439 /// clone at `MeshAgent::connect` time, so the field itself is not read
440 /// elsewhere in the ST3 cut — kept `pub` for ST4+ adapter wiring.
441 #[allow(dead_code)]
442 pub bus_tx: mpsc::Sender<Event>,
443 /// Mutex-wrapped `Option<EventBus>` so `bus.on` / `bus.on_any` can lock
444 /// briefly from sync Lua context, and `bus.serve` can `Option::take`
445 /// ownership before entering the long-lived `run()` await (avoiding the
446 /// await-holding-lock anti-pattern on a `std::sync::Mutex`).
447 pub event_bus: Arc<Mutex<Option<EventBus>>>,
448}
449
450/// Open a SQLite connection at `path` (or `:memory:`) and apply the shared
451/// pragmas driven by ENV (`journal_mode`, `busy_timeout`). Returns the
452/// connection wrapped in Arc<Mutex<_>> together with its interrupt handle.
453///
454/// `label` is used only for the init log line (`sql` / `kv`) so that the two
455/// databases are distinguishable in tracing output.
456fn open_sqlite(
457 path: &Path,
458 label: &'static str,
459) -> BlockResult<(
460 Arc<Mutex<rusqlite::Connection>>,
461 Arc<rusqlite::InterruptHandle>,
462)> {
463 let is_memory = crate::bridge::config::is_memory_sql(path);
464 if !is_memory {
465 if let Some(parent) = path.parent() {
466 std::fs::create_dir_all(parent)
467 .map_err(|e| BlockError::Runtime(format!("{label} dir create: {e}")))?;
468 }
469 }
470 let conn = rusqlite::Connection::open(path)
471 .map_err(|e| BlockError::Runtime(format!("sqlite open {}: {e}", path.display())))?;
472 if !is_memory {
473 let journal = crate::bridge::config::sql_journal_mode();
474 conn.pragma_update(None, "journal_mode", &journal)
475 .map_err(|e| BlockError::Runtime(format!("journal_mode={journal}: {e}")))?;
476 }
477 let busy_ms = crate::bridge::config::sql_busy_timeout().as_millis() as i64;
478 conn.pragma_update(None, "busy_timeout", busy_ms)
479 .map_err(|e| BlockError::Runtime(format!("busy_timeout pragma: {e}")))?;
480 info!(label, path = %path.display(), busy_ms, "sqlite initialized");
481 let interrupt = Arc::new(conn.get_interrupt_handle());
482 let conn = Arc::new(Mutex::new(conn));
483 Ok((conn, interrupt))
484}
485
486/// Build the init closure shared between the main Isle and the handler
487/// Isle. Sets `_SCRIPT_NAME`, registers `mlua-batteries` `std.*`, and
488/// configures `package.path` / `package.searchers` so `require "agent"`
489/// (and any `blocks/` module) works inside the Lua VM.
490///
491/// Returns an `FnOnce` so each call produces a fresh closure; this lets
492/// both Isles be spawned from the same config without `Clone` bounds on
493/// the captured `HashMap`.
494fn build_isle_init(
495 script_name: String,
496 script_dir: String,
497 blocks_paths: String,
498 prompt: Option<String>,
499 context: Option<String>,
500 extra_globals: HashMap<String, serde_json::Value>,
501) -> impl FnOnce(&mlua::Lua) -> mlua::Result<()> + Send + 'static {
502 move |lua| {
503 // Set script name before registering bridges (used by log.* for attribution)
504 lua.globals().set("_SCRIPT_NAME", script_name.as_str())?;
505 if let Some(ref p) = prompt {
506 lua.globals().set("_PROMPT", p.as_str())?;
507 }
508 if let Some(ref c) = context {
509 lua.globals().set("_CONTEXT", c.as_str())?;
510 }
511
512 mlua_batteries::register_all(lua, "std")?;
513
514 // ── extra_globals from BlockConfig ──────────────────────────
515 // Inject SDK-supplied parameterisation values into the Lua
516 // global namespace. Registered after mlua_batteries so that
517 // any value that *intentionally* shadows a `std.*` symbol
518 // wins — callers are responsible for not stomping on bridges
519 // they need.
520 for (name, value) in &extra_globals {
521 let lua_value = crate::bridge::json_to_lua(lua, value.clone())
522 .map_err(|e| mlua::Error::external(format!("extra_globals[{name}]: {e}")))?;
523 lua.globals().set(name.as_str(), lua_value)?;
524 }
525
526 // ── package.path ──────────────────────────────────────────────
527 // Priority: script_dir > project_root/blocks/ > exe_dir/blocks/ > default
528 let package: mlua::Table = lua.globals().get("package")?;
529 let current_path: String = package.get("path")?;
530 let new_path =
531 format!("{script_dir}/?.lua;{script_dir}/?/init.lua;{blocks_paths}{current_path}");
532 package.set("path", new_path)?;
533
534 // ── package.searchers — embedded fallback ─────────────────────
535 // Register a custom searcher that loads blocks/ modules from the
536 // sources baked in at compile time. This is the lowest-priority
537 // searcher so filesystem copies always win.
538 let embedded: HashMap<&'static str, &'static str> =
539 EMBEDDED_BLOCKS.iter().copied().collect();
540
541 let searchers: mlua::Table = package.get("searchers")?;
542 let loader =
543 lua.create_function(move |lua, name: String| match embedded.get(name.as_str()) {
544 Some(source) => {
545 let chunk = lua
546 .load(*source)
547 .set_name(format!("@embedded:blocks/{name}/init.lua"));
548 let func = chunk.into_function()?;
549 Ok(mlua::Value::Function(func))
550 }
551 None => {
552 let msg = lua.create_string(format!("\n\tno embedded block '{name}'"))?;
553 Ok(mlua::Value::String(msg))
554 }
555 })?;
556 // Append as the last searcher so filesystem paths remain preferred.
557 let next_idx = searchers.raw_len() + 1;
558 searchers.raw_set(next_idx, loader)?;
559
560 Ok(())
561 }
562}
563
564/// Spawn the dedicated handler Isle.
565///
566/// The handler Isle runs Lua bus handlers (`bus.on` / `bus.on_any`) on a
567/// separate OS thread with its own `tokio` current-thread runtime, keeping
568/// CPU-bound handlers from starving the main Isle's grace timers.
569///
570/// Bridge registration is deferred to a follow-up `exec` in `run()` because
571/// `HostContext` is not constructible until both Isles exist (the struct
572/// itself holds `Arc<AsyncIsle>` for both).
573async fn spawn_handler_isle(
574 script_name: String,
575 script_dir: String,
576 blocks_paths: String,
577 prompt: Option<String>,
578 context: Option<String>,
579 extra_globals: HashMap<String, serde_json::Value>,
580) -> BlockResult<(Arc<AsyncIsle>, AsyncIsleDriver)> {
581 let init = build_isle_init(
582 script_name,
583 script_dir,
584 blocks_paths,
585 prompt,
586 context,
587 extra_globals,
588 );
589 let (isle, driver) = AsyncIsle::builder()
590 .thread_name("agent-block-handler-isle")
591 .spawn(init)
592 .await
593 .map_err(|e| BlockError::Runtime(format!("handler isle spawn failed: {e}")))?;
594 info!(
595 thread_name = "agent-block-handler-isle",
596 "handler Isle spawned"
597 );
598 Ok((Arc::new(isle), driver))
599}
600
601fn hex_decode_32(s: &str) -> Result<[u8; 32], String> {
602 let s = s.trim();
603 if s.len() != 64 {
604 return Err(format!("expected 64 hex chars, got {}", s.len()));
605 }
606 let mut out = [0u8; 32];
607 for (i, byte) in out.iter_mut().enumerate() {
608 let hi = u8::from_str_radix(&s[2 * i..2 * i + 1], 16)
609 .map_err(|e| format!("invalid hex at position {}: {e}", 2 * i))?;
610 let lo = u8::from_str_radix(&s[2 * i + 1..2 * i + 2], 16)
611 .map_err(|e| format!("invalid hex at position {}: {e}", 2 * i + 1))?;
612 *byte = (hi << 4) | lo;
613 }
614 Ok(out)
615}
616
617pub async fn run(config: BlockConfig) -> BlockResult<()> {
618 // ── Resolve sources ───────────────────────────────────────────
619 // Convert the `Source` enums on `BlockConfig` to their concrete
620 // payloads before any Isle setup. `File`/`Path`/`Env` variants
621 // read from disk / environment exactly once, here at the start.
622 let (script_source, script_name, script_dir_pathbuf) = match &config.script {
623 ScriptSource::Path(p) => {
624 let source = std::fs::read_to_string(p)
625 .map_err(|e| BlockError::Script(format!("{}: {e}", p.display())))?;
626 let name = p
627 .file_name()
628 .map(|n| n.to_string_lossy().to_string())
629 .unwrap_or_else(|| "unknown".to_string());
630 let dir = p
631 .parent()
632 .map(|d| d.to_path_buf())
633 .unwrap_or_else(|| PathBuf::from("."));
634 (source, name, dir)
635 }
636 ScriptSource::Inline { source, name } => {
637 (source.clone(), name.clone(), config.project_root.clone())
638 }
639 ScriptSource::DefaultAgent => (
640 DEFAULT_AGENT_INVOKER.to_string(),
641 "default_agent_invoker.lua".to_string(),
642 config.project_root.clone(),
643 ),
644 };
645
646 let prompt_resolved: Option<String> = match &config.prompt {
647 Some(PromptSource::Inline(s)) => Some(s.clone()),
648 Some(PromptSource::File(p)) => Some(
649 std::fs::read_to_string(p)
650 .map_err(|e| BlockError::Script(format!("prompt file {}: {e}", p.display())))?,
651 ),
652 None => None,
653 };
654 let context_resolved: Option<String> = match &config.context {
655 Some(PromptSource::Inline(s)) => Some(s.clone()),
656 Some(PromptSource::File(p)) => Some(
657 std::fs::read_to_string(p)
658 .map_err(|e| BlockError::Script(format!("context file {}: {e}", p.display())))?,
659 ),
660 None => None,
661 };
662 let secret_key_resolved: Option<String> = match &config.secret_key {
663 Some(SecretKeySource::Inline(s)) => Some(s.clone()),
664 Some(SecretKeySource::Env(var)) => std::env::var(var).ok(),
665 None => None,
666 };
667
668 // NOTE: We previously held entered span guards across awaits for nested
669 // span context. That made the `run()` future `!Send`, which prevents
670 // SDK consumers from `tokio::spawn(run(config))`. Span context is
671 // attached to events via fields on the `info_span!` calls below; the
672 // missing nesting is an acceptable trade-off for `Send` correctness.
673 let _root_span = info_span!("agent_block", script = %script_name);
674
675 // ── .env ──────────────────────────────────────────────────────
676 // Load .env from project_root if present. Variables are merged into
677 // the process environment so Lua's `std.env.get()` picks them up.
678 let env_path = config.project_root.join(".env");
679 match dotenvy::from_path(&env_path) {
680 Ok(()) => info!(path = %env_path.display(), ".env loaded"),
681 Err(dotenvy::Error::Io(_)) => {} // file not found — fine
682 Err(e) => tracing::warn!(path = %env_path.display(), error = %e, ".env parse error"),
683 }
684
685 // ── Init ──────────────────────────────────────────────────────
686 let _init_span = info_span!("init");
687
688 // ── EventBus channel ─────────────────────────────────────────────
689 // Construct the bounded mpsc BEFORE MeshAgent::connect so the relay
690 // handler can hold a `bus_tx` clone and forward incoming requests
691 // into the dispatcher. Capacity is ENV-driven (see bridge::config).
692 let bus_capacity = crate::bridge::config::bus_capacity();
693 let (bus_tx, bus_rx) = mpsc::channel::<Event>(bus_capacity);
694 let event_bus = Arc::new(Mutex::new(Some(EventBus::new(bus_rx))));
695
696 // ── Pre-install host-side Rust handlers ───────────────────────────
697 // SDK consumers attach Rust handlers via `BlockConfig.host_handlers`
698 // so that script-side `bus.emit(kind, payload)` is captured by a Rust
699 // `Arc<dyn Handler>` instead of being dispatched to a Lua function.
700 // Registered here (before any Lua bridge registers handlers and before
701 // `bus.serve` takes ownership of the bus) so the EventBus already
702 // carries the host handlers when the script starts.
703 // Install host-side Rust handlers: kind-specific entries from
704 // `host_handlers` and, when set, the kind-agnostic `host_handler`
705 // (registered via `on_any` as the fallback for unmatched kinds).
706 // SDK-embed 1-shot callers typically only set `host_handler`.
707 let has_kind_handlers = !config.host_handlers.is_empty();
708 let has_any_handler = config.host_handler.is_some();
709 if has_kind_handlers || has_any_handler {
710 let mut guard = event_bus
711 .lock()
712 .map_err(|_| BlockError::Bus("event_bus mutex poisoned".into()))?;
713 let bus = guard
714 .as_mut()
715 .ok_or_else(|| BlockError::Bus("event_bus already taken".into()))?;
716 for (kind, handler) in &config.host_handlers {
717 bus.on(kind.clone(), Arc::clone(handler))
718 .map_err(|e| BlockError::Bus(format!("host_handlers on({kind}): {e}")))?;
719 }
720 if let Some(any_handler) = &config.host_handler {
721 bus.on_any(Arc::clone(any_handler))
722 .map_err(|e| BlockError::Bus(format!("host_handler on_any: {e}")))?;
723 }
724 info!(
725 kind_handlers = config.host_handlers.len(),
726 any_handler = has_any_handler,
727 "host handlers pre-installed"
728 );
729 }
730
731 // ── auto-serve: background dispatcher for SDK-embed callers ───────
732 // When `auto_serve_bus` is on and at least one host-side handler
733 // (kind-specific or kind-agnostic) is installed, take the EventBus
734 // out of the Mutex *before* the script runs and spawn the dispatcher
735 // loop on the runtime. This lets `bus.emit(kind, payload)` from the
736 // script reach the host handler without requiring the script to call
737 // `bus.serve()` (which blocks on signals and never returns under
738 // programmatic embedding).
739 let auto_serve = config.auto_serve_bus && (has_kind_handlers || has_any_handler);
740 let auto_serve_state: Option<(tokio::task::JoinHandle<()>, CancellationToken)> = if auto_serve {
741 let bus = {
742 let mut guard = event_bus
743 .lock()
744 .map_err(|_| BlockError::Bus("event_bus mutex poisoned".into()))?;
745 guard
746 .take()
747 .ok_or_else(|| BlockError::Bus("event_bus already taken".into()))?
748 };
749 let token = CancellationToken::new();
750 let token_for_task = token.clone();
751 let handle = tokio::spawn(async move {
752 let mut bus = bus;
753 if let Err(e) = bus.run(token_for_task).await {
754 tracing::error!(error = %e, "auto-serve: dispatcher loop returned error");
755 }
756 });
757 info!("auto-serve: dispatcher spawned");
758 Some((handle, token))
759 } else {
760 None
761 };
762
763 let mesh_agent = if let Some(ref relay_url) = config.relay_url {
764 let keypair = match &secret_key_resolved {
765 Some(hex_str) => {
766 let bytes = hex_decode_32(hex_str)
767 .map_err(|e| BlockError::Runtime(format!("secret-key: {e}")))?;
768 agent_mesh_core::identity::AgentKeypair::from_bytes(&bytes)
769 }
770 None => agent_mesh_core::identity::AgentKeypair::generate(),
771 };
772 info!(agent_id = %keypair.agent_id(), "mesh identity");
773 let acl = agent_mesh_core::acl::AclPolicy {
774 default_deny: false,
775 rules: vec![],
776 };
777 let handler: Arc<dyn agent_mesh_sdk::RequestHandler> =
778 Arc::new(BusRelayHandler::new(bus_tx.clone()));
779 let url = relay_url.clone();
780 let agent = agent_mesh_sdk::MeshAgent::connect(keypair, &url, acl, handler)
781 .await
782 .map_err(|e| BlockError::Mesh(format!("connect to {relay_url} failed: {e}")))?;
783 info!(relay_url = %relay_url, "mesh connected");
784 Some(Arc::new(agent))
785 } else {
786 None
787 };
788
789 let mcp_manager = Arc::new(RwLock::new(McpManager::with_rpc_timeout(
790 config.mcp_rpc_timeout,
791 )?));
792
793 // Resolve project_root to absolute path.
794 // canonicalize() can fail if the path doesn't exist; fall back to
795 // joining with current_dir to guarantee an absolute path.
796 let project_root = config
797 .project_root
798 .canonicalize()
799 .or_else(|_| std::env::current_dir().map(|cwd| cwd.join(&config.project_root)))?;
800
801 // HTTP client: prefer the SDK-supplied client if any; otherwise
802 // construct a fresh default reqwest::Client (legacy behavior).
803 let http_client = config.http_client.clone().unwrap_or_default();
804
805 // ── SQLite init (kv + sql get separate DB files) ──────────────────────
806 // BlockConfig overrides take precedence; otherwise the env-driven
807 // resolution in `bridge::config::*` applies (see crate docs).
808 let sql_path = match &config.sql_path {
809 Some(p) => p.clone(),
810 None => crate::bridge::config::sql_path().map_err(BlockError::Runtime)?,
811 };
812 let (sql_conn, sql_interrupt) = open_sqlite(&sql_path, "sql")?;
813
814 let kv_path = match &config.kv_path {
815 Some(p) => p.clone(),
816 None => crate::bridge::config::kv_path().map_err(BlockError::Runtime)?,
817 };
818 let (kv_conn, kv_interrupt) = open_sqlite(&kv_path, "kv")?;
819
820 let ts_path = match &config.ts_path {
821 Some(p) => p.clone(),
822 None => crate::bridge::config::ts_path().map_err(BlockError::Runtime)?,
823 };
824 let (ts_conn, ts_interrupt) = open_sqlite(&ts_path, "ts")?;
825
826 // Use the script dir derived from the resolved `ScriptSource` for
827 // `package.path` lookups. For inline / default-agent variants the dir
828 // falls back to `project_root` (set during source resolution above).
829 let script_dir = script_dir_pathbuf.to_string_lossy().to_string();
830
831 // Precompute values captured by the init closure so we don't need to
832 // move the full `HostContext` into it (HostContext now holds
833 // `Arc<AsyncIsle>`, which is available only after `AsyncIsle::spawn`
834 // returns — classic chicken-and-egg). All bridge registrations run in a
835 // second pass via `isle.exec` below.
836 let blocks_paths = build_blocks_path(&project_root);
837 let prompt = prompt_resolved.clone();
838 let context = context_resolved.clone();
839
840 // ── main Isle ─────────────────────────────────────────────────
841 let (isle, driver) = AsyncIsle::spawn(build_isle_init(
842 script_name.clone(),
843 script_dir.clone(),
844 blocks_paths.clone(),
845 prompt.clone(),
846 context.clone(),
847 config.extra_globals.clone(),
848 ))
849 .await
850 .map_err(|e| BlockError::Runtime(format!("AsyncIsle spawn failed: {e}")))?;
851 let isle = Arc::new(isle);
852
853 // ── handler Isle (sequential, dependencies are trivial) ────────
854 let (handler_isle, handler_driver) = spawn_handler_isle(
855 script_name.clone(),
856 script_dir.clone(),
857 blocks_paths.clone(),
858 prompt,
859 context,
860 config.extra_globals.clone(),
861 )
862 .await?;
863
864 // Wire both Isles into McpManager so Lua notification callbacks can be
865 // dispatched from the rmcp task thread.
866 // - handler_isle: sampling/createMessage dispatch (exec on handler Isle)
867 // - main_isle: progress/log notification dispatch (exec on main Isle so
868 // user callback upvalues are preserved — no bytecode dump/reload needed)
869 {
870 let mut mgr = mcp_manager.write().await;
871 mgr.set_handler_isle(Arc::clone(&handler_isle));
872 mgr.set_main_isle(Arc::clone(&isle));
873 }
874
875 // ── HostContext + bridge registration ──────────────────────────────
876 // Wrap the isle in an Arc so `HostContext` can hand it to
877 // `bridge::bus` (which uses `AsyncIsle::coroutine_call` to invoke Lua
878 // handlers from the EventBus dispatcher task).
879 let ctx = HostContext {
880 project_root,
881 mesh_agent,
882 mcp_manager: Arc::clone(&mcp_manager),
883 http_client,
884 sql_conn,
885 sql_interrupt,
886 kv_conn,
887 kv_interrupt,
888 ts_conn,
889 ts_interrupt,
890 isle: Arc::clone(&isle),
891 handler_isle: Arc::clone(&handler_isle),
892 bus_tx: bus_tx.clone(),
893 event_bus: Arc::clone(&event_bus),
894 };
895
896 {
897 let ctx = ctx.clone();
898 isle.exec(move |lua| {
899 bridge::register_all(lua, &ctx)
900 .map_err(|e| mlua_isle::IsleError::Lua(format!("bridge register failed: {e}")))?;
901 Ok(String::new())
902 })
903 .await
904 .map_err(|e| BlockError::Runtime(format!("bridge register: {e}")))?;
905 }
906
907 {
908 let ctx = ctx.clone();
909 handler_isle
910 .exec(move |lua| {
911 bridge::register_all_handler_side(lua, &ctx).map_err(|e| {
912 mlua_isle::IsleError::Lua(format!("handler bridge register failed: {e}"))
913 })?;
914 Ok(String::new())
915 })
916 .await
917 .map_err(|e| BlockError::Runtime(format!("handler bridge register: {e}")))?;
918 }
919
920 // ── Inject host_tools into the Lua tool registry ───────────────
921 // Done after `bridge::register_all` so `_TOOL_REGISTRY` exists.
922 // Each entry becomes an Anthropic-shaped tool spec table
923 // { name, schema = { description, input_schema }, handler, group? }
924 // where `handler` is a Lua async function that bridges back into
925 // the supplied `ToolHandler::call`. Lua-side `tool.list()` /
926 // `tool.schema()` / `agent.run` see these uniformly with native
927 // Lua-defined tools.
928 if !config.host_tools.is_empty() {
929 let host_tools = config.host_tools.clone();
930 let tool_count = host_tools.len();
931 isle.exec(move |lua| {
932 let registry: mlua::Table = lua
933 .globals()
934 .get("_TOOL_REGISTRY")
935 .map_err(|e| mlua_isle::IsleError::Lua(format!("get _TOOL_REGISTRY: {e}")))?;
936 for tool in host_tools {
937 let entry = lua
938 .create_table()
939 .map_err(|e| mlua_isle::IsleError::Lua(format!("create entry: {e}")))?;
940 entry
941 .set("name", tool.name.as_str())
942 .map_err(|e| mlua_isle::IsleError::Lua(format!("set name: {e}")))?;
943 // schema = { description, input_schema } — Anthropic shape
944 let schema = lua
945 .create_table()
946 .map_err(|e| mlua_isle::IsleError::Lua(format!("create schema: {e}")))?;
947 schema
948 .set("description", tool.description.as_str())
949 .map_err(|e| mlua_isle::IsleError::Lua(format!("set description: {e}")))?;
950 let input_schema_lua =
951 crate::bridge::json_to_lua(lua, tool.input_schema.clone())
952 .map_err(|e| mlua_isle::IsleError::Lua(format!("input_schema: {e}")))?;
953 schema
954 .set("input_schema", input_schema_lua)
955 .map_err(|e| mlua_isle::IsleError::Lua(format!("set input_schema: {e}")))?;
956 entry
957 .set("schema", schema)
958 .map_err(|e| mlua_isle::IsleError::Lua(format!("set schema: {e}")))?;
959 if let Some(group) = &tool.group {
960 entry
961 .set("group", group.as_str())
962 .map_err(|e| mlua_isle::IsleError::Lua(format!("set group: {e}")))?;
963 }
964 let handler_arc = Arc::clone(&tool.handler);
965 let handler_fn = lua
966 .create_async_function(move |lua, input: mlua::Value| {
967 let handler = Arc::clone(&handler_arc);
968 async move {
969 let input_json = crate::bridge::lua_to_json(&lua, input)?;
970 let result = handler
971 .call(input_json)
972 .await
973 .map_err(mlua::Error::external)?;
974 crate::bridge::json_to_lua(&lua, result)
975 }
976 })
977 .map_err(|e| mlua_isle::IsleError::Lua(format!("create handler: {e}")))?;
978 entry
979 .set("handler", handler_fn)
980 .map_err(|e| mlua_isle::IsleError::Lua(format!("set handler: {e}")))?;
981 registry
982 .set(tool.name.as_str(), entry)
983 .map_err(|e| mlua_isle::IsleError::Lua(format!("registry set: {e}")))?;
984 }
985 Ok(String::new())
986 })
987 .await
988 .map_err(|e| BlockError::Runtime(format!("host_tools inject: {e}")))?;
989 info!(count = tool_count, "host tools injected into Lua registry");
990 }
991
992 drop(_init_span);
993
994 // ── Execute ───────────────────────────────────────────────────
995 // When `shutdown_token` is supplied, race the script future against
996 // the caller's cancellation signal. On cancel, propagate to the Isle
997 // via the AsyncTask's cancel token so the debug hook unwinds the Lua
998 // VM, then continue into the shutdown sequence below (we still want
999 // to release MCP/mesh handles and join the auto-serve dispatcher
1000 // before returning).
1001 let script_result: Result<(), BlockError> = {
1002 let _exec_span = info_span!("execute", script = %script_name);
1003
1004 let mut task = isle.spawn_coroutine_eval(&script_source);
1005 let task_cancel = task.cancel_token().clone();
1006 match config.shutdown_token.as_ref() {
1007 Some(token) => {
1008 tokio::select! {
1009 biased;
1010 _ = token.cancelled() => {
1011 task_cancel.cancel();
1012 // Wait for the Isle to unwind so the VM is in a
1013 // consistent state before driver shutdown. The
1014 // debug hook fires at the next HOOK_INTERVAL.
1015 let _ = (&mut task).await;
1016 info!("shutdown_token: cancelled by caller");
1017 Err(BlockError::Cancelled)
1018 }
1019 res = &mut task => res.map(|_| ()).map_err(|e| BlockError::Script(format!("{e}"))),
1020 }
1021 }
1022 None => (&mut task)
1023 .await
1024 .map(|_| ())
1025 .map_err(|e| BlockError::Script(format!("{e}"))),
1026 }
1027 };
1028
1029 // ── auto-serve drain + cancel ─────────────────────────────────
1030 // Let the dispatcher drain events queued by the script, then signal
1031 // shutdown and bound the join. Mirrors `bus.serve`'s grace pattern.
1032 if let Some((handle, token)) = auto_serve_state {
1033 let grace_ms = crate::bridge::config::task_grace_ms();
1034 let grace = Duration::from_millis(grace_ms);
1035 tokio::time::sleep(grace).await;
1036 token.cancel();
1037 match tokio::time::timeout(grace, handle).await {
1038 Ok(Ok(())) => info!("auto-serve: dispatcher shut down cleanly"),
1039 Ok(Err(join_err)) => {
1040 tracing::error!(error = %join_err, "auto-serve: dispatcher task join error");
1041 }
1042 Err(_) => {
1043 tracing::warn!(
1044 grace_ms,
1045 "auto-serve: dispatcher join timed out after cancel; forcing exit"
1046 );
1047 }
1048 }
1049 }
1050
1051 // ── Shutdown ──────────────────────────────────────────────────
1052 {
1053 let _shutdown_span = info_span!("shutdown");
1054
1055 mcp_manager.write().await.disconnect_all().await?;
1056
1057 driver
1058 .shutdown()
1059 .await
1060 .map_err(|e| BlockError::Runtime(format!("AsyncIsle shutdown failed: {e}")))?;
1061
1062 // Handler Isle shutdown is independent of main shutdown: a failure
1063 // here (e.g. ThreadPanic on the handler thread) is logged but does
1064 // not poison the main process exit. The main Isle has already
1065 // been stopped cleanly above.
1066 match handler_driver.shutdown().await {
1067 Ok(()) => info!(
1068 thread_name = "agent-block-handler-isle",
1069 "handler Isle shut down"
1070 ),
1071 Err(e) => tracing::error!(
1072 error = %e,
1073 thread_name = "agent-block-handler-isle",
1074 "handler Isle shutdown failed"
1075 ),
1076 }
1077 }
1078
1079 script_result
1080}
1081
1082/// mesh → bus source adapter.
1083///
1084/// Implements [`agent_mesh_sdk::RequestHandler`] by packaging every incoming
1085/// mesh request into an [`Event`] with `kind = "mesh"`, pushing it onto the
1086/// bounded `bus_tx` channel, and awaiting the Lua handler's ack over a
1087/// oneshot channel carried inside the event.
1088///
1089/// Error paths (all `tracing::error!`-logged — silent-err-drop policy):
1090///
1091/// | Failure | Return value |
1092/// |---------------------------|----------------------------------------|
1093/// | `bus_tx.send` closed/full | `{"error": "bus channel closed"}` |
1094/// | ack receiver dropped | `{"error": "ack dropped"}` |
1095/// | Lua handler `BlockError` | `{"error": "<handler error>"}` |
1096/// | Handler exceeded 30s | `{"error": "handler timeout"}` |
1097///
1098/// The 30s ack timeout mirrors the client-side timeout on `mesh.request`
1099/// (see `src/bridge/mesh.rs`).
1100struct BusRelayHandler {
1101 tx: mpsc::Sender<Event>,
1102}
1103
1104impl BusRelayHandler {
1105 fn new(tx: mpsc::Sender<Event>) -> Self {
1106 Self { tx }
1107 }
1108}
1109
1110/// Bound used for both the mesh-adapter ack wait and other source timeouts.
1111const BUS_ACK_TIMEOUT: Duration = Duration::from_secs(30);
1112
1113#[async_trait::async_trait]
1114impl agent_mesh_sdk::RequestHandler for BusRelayHandler {
1115 async fn handle(
1116 &self,
1117 from: &agent_mesh_core::identity::AgentId,
1118 payload: &serde_json::Value,
1119 _cancel: agent_mesh_sdk::CancelToken,
1120 ) -> serde_json::Value {
1121 let id = uuid::Uuid::new_v4().to_string();
1122 let meta = serde_json::json!({"from": from.to_string()});
1123 let (ack_tx, ack_rx) = oneshot::channel();
1124 let event = Event {
1125 kind: "mesh".into(),
1126 id: id.clone(),
1127 payload: payload.clone(),
1128 meta,
1129 ack_tx: Some(ack_tx),
1130 };
1131
1132 if let Err(e) = self.tx.send(event).await {
1133 tracing::error!(error = %e, id = %id, "bus channel closed; rejecting mesh request");
1134 return serde_json::json!({"error": "bus channel closed"});
1135 }
1136
1137 match tokio::time::timeout(BUS_ACK_TIMEOUT, ack_rx).await {
1138 Ok(Ok(Ok(v))) => v,
1139 Ok(Ok(Err(e))) => {
1140 tracing::error!(id = %id, error = %e, "mesh handler returned error");
1141 serde_json::json!({"error": e.to_string()})
1142 }
1143 Ok(Err(e)) => {
1144 tracing::error!(id = %id, error = %e, "mesh ack receiver dropped");
1145 serde_json::json!({"error": "ack dropped"})
1146 }
1147 Err(_) => {
1148 tracing::error!(id = %id, timeout_secs = BUS_ACK_TIMEOUT.as_secs(), "mesh handler timeout");
1149 serde_json::json!({"error": "handler timeout"})
1150 }
1151 }
1152 }
1153}