agent_block_core/host.rs
1//! Host — the thin Rust shell that wires up Lua VM, Mesh, HTTP, and MCP.
2//!
3//! # Responsibilities
4//!
5//! 1. Spawn an mlua-isle `AsyncIsle` (dedicated Lua VM thread with coroutine support)
6//! 2. Optionally connect to agent-mesh relay
7//! 3. Initialize the MCP manager for stdio-based MCP server connections
8//! 4. Inject all Lua stdlib bridges (`mesh.*`, `http.*`, `sh.*`, `tool.*`, `log.*`, `mcp.*`)
9//! 5. Execute the user-provided Lua script via `coroutine_eval` (async-aware)
10//! 6. Graceful shutdown (Isle + MCP servers + mesh)
11
12use std::collections::HashMap;
13use std::path::{Path, PathBuf};
14use std::sync::{Arc, Mutex};
15use std::time::Duration;
16use tokio::sync::{mpsc, oneshot, RwLock};
17
18use mlua_isle::{AsyncIsle, AsyncIsleDriver};
19use tracing::{info, info_span, warn};
20
21use crate::bridge;
22use crate::bus::{Event, EventBus, Handler};
23use agent_block_mcp::McpManager;
24use agent_block_types::error::{BlockError, BlockResult};
25use tokio_util::sync::CancellationToken;
26
27/// Embedded Lua sources for blocks/ StdPkg modules.
28/// These are baked into the binary at compile time so `cargo install` works
29/// without any extra file distribution.
30const EMBEDDED_BLOCKS: &[(&str, &str)] = &[
31 ("agent", include_str!("../blocks/agent/init.lua")),
32 ("session", include_str!("../blocks/session/init.lua")),
33 (
34 "compile_loop",
35 include_str!("../blocks/compile_loop/init.lua"),
36 ),
37];
38
39/// Embedded default agent invoker used by [`ScriptSource::DefaultAgent`].
40///
41/// Runs the StdPkg `agent` module with `_PROMPT` / `_CONTEXT` injected and
42/// emits the result on the EventBus. The emit kind is `"_"` — a neutral
43/// label with no SDK-side meaning. The result is intended to be received
44/// via [`BlockConfig::host_handler`] (the kind-agnostic single sink); the
45/// literal label is irrelevant to SDK consumers.
46const DEFAULT_AGENT_INVOKER: &str = r#"
47local agent = require("agent")
48local r = agent.run({
49 prompt = _PROMPT,
50 system = _CONTEXT,
51})
52bus.emit("_", r)
53"#;
54
55/// How the Lua script source for `run()` is supplied.
56///
57/// `Path` matches the CLI form (`agent-block -s <path>`), reading from
58/// the filesystem at start. `Inline` lets SDK consumers pass a script
59/// they hold in memory (compile-time `include_str!`, dynamically built
60/// string, etc.) without writing it to a tempfile. `DefaultAgent` uses
61/// an embedded invoker that runs the StdPkg `agent` module with the
62/// caller-supplied prompt/context and emits the result via
63/// `bus.emit("agent_result", ...)`.
64#[derive(Debug, Clone)]
65pub enum ScriptSource {
66 /// Read the script from a filesystem path at start.
67 Path(PathBuf),
68 /// Use the supplied source code directly.
69 Inline {
70 /// Lua source code.
71 source: String,
72 /// Display name used in tracing, error messages, and the Lua
73 /// `_SCRIPT_NAME` global (e.g. `"agent_invoker.lua"`).
74 name: String,
75 },
76 /// Use the embedded default agent invoker. `prompt` / `context`
77 /// are forwarded as `_PROMPT` / `_CONTEXT` Lua globals and the
78 /// agent result is emitted on the EventBus under a neutral label
79 /// (`"_"`). SDK consumers should pair this with
80 /// [`BlockConfig::host_handler`] (the kind-agnostic single sink)
81 /// and `auto_serve_bus = true`. The emit-kind is intentionally
82 /// meaningless; consumers that need string-keyed routing should
83 /// supply [`ScriptSource::Inline`] with their own invoker.
84 DefaultAgent,
85}
86
87/// How a string payload (prompt / system context) is supplied.
88///
89/// `Inline` is the literal string variant (CLI `--prompt` / `--context`).
90/// `File` reads the contents from disk at `run()` start (CLI
91/// `--prompt-file` / `--context-file`).
92#[derive(Debug, Clone)]
93pub enum PromptSource {
94 /// Literal string.
95 Inline(String),
96 /// Filesystem path; contents are read at `run()` start.
97 File(PathBuf),
98}
99
100/// How the Ed25519 mesh identity secret key is supplied.
101///
102/// `Inline` is a 64-hex literal. `Env` reads the named environment
103/// variable at `run()` start (CLI default uses
104/// `AGENT_BLOCK_MESH_SECRET_KEY`). Absence of any `SecretKeySource`
105/// (i.e. `BlockConfig.secret_key = None`) causes a random keypair to
106/// be generated, matching the prior behavior.
107#[derive(Debug, Clone)]
108pub enum SecretKeySource {
109 /// 64-character hex literal.
110 Inline(String),
111 /// Environment variable name to read at start.
112 Env(String),
113}
114
115/// Async handler invoked when the LLM (or a Lua call to
116/// `tool.call(name, ...)`) targets a Rust-implemented tool supplied via
117/// [`BlockConfig::host_tools`].
118///
119/// `input` arrives as a `serde_json::Value` (converted from Lua before
120/// the handler is invoked). The returned value is converted back to a
121/// Lua value and delivered to the caller. Errors are propagated as
122/// `LuaError::external` (visible inside the script) and as `BlockError`
123/// on the Rust side.
124#[async_trait::async_trait]
125pub trait ToolHandler: Send + Sync + 'static {
126 async fn call(&self, input: serde_json::Value) -> Result<serde_json::Value, BlockError>;
127}
128
129/// Declarative spec for a Rust-implemented tool injected into the Lua
130/// tool registry before the user script runs. The resulting entry is
131/// indistinguishable from a Lua-defined tool from the script's view:
132/// `tool.call("<name>", input)`, `agent.run({ ... })` tool dispatch,
133/// and `tool.schema()` enumeration all work uniformly.
134#[derive(Clone)]
135pub struct HostToolSpec {
136 /// Tool name. Becomes the routing key in `_TOOL_REGISTRY` and the
137 /// `name` field exposed by `tool.schema()` (Anthropic tool spec).
138 pub name: String,
139 /// Free-form description shown to the LLM. Becomes the
140 /// `description` field of the Anthropic tool spec.
141 pub description: String,
142 /// Input schema (Anthropic-compatible JSON Schema object).
143 pub input_schema: serde_json::Value,
144 /// Optional group label for [`agent.run`'s `tool_groups`] filter
145 /// and for [`BlockConfig::tool_policy`] (planned).
146 pub group: Option<String>,
147 /// Rust callback dispatched on every invocation.
148 pub handler: Arc<dyn ToolHandler>,
149}
150
151impl std::fmt::Debug for HostToolSpec {
152 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153 f.debug_struct("HostToolSpec")
154 .field("name", &self.name)
155 .field("description", &self.description)
156 .field("input_schema", &self.input_schema)
157 .field("group", &self.group)
158 .field("handler", &"<dyn ToolHandler>")
159 .finish()
160 }
161}
162
163/// Snapshot of a tool that a given [`BlockConfig`] will (statically)
164/// expose to the LLM. Produced by [`inspect_tools`] without running
165/// the script. MCP server tools are *not* included because they are
166/// only known after the MCP `initialize` handshake completes; callers
167/// that need that view should run the script and call `tool.schema()`
168/// from Lua.
169#[derive(Debug, Clone)]
170pub struct ToolMeta {
171 pub name: String,
172 pub description: String,
173 pub group: Option<String>,
174 pub source: ToolSource,
175}
176
177/// Origin of a tool listed by [`inspect_tools`].
178#[derive(Debug, Clone, PartialEq, Eq)]
179pub enum ToolSource {
180 /// Supplied via [`BlockConfig::host_tools`] (Rust-implemented).
181 HostRust,
182 /// Embedded StdPkg block (`agent`, `compile_loop`, …) — discovered
183 /// statically from [`EMBEDDED_BLOCKS`]. Note: not every embedded
184 /// block exposes a registered tool; this entry simply records that
185 /// the module is available via `require(...)`.
186 EmbeddedBlock,
187}
188
189/// Inspect the tools a [`BlockConfig`] will expose to the LLM without
190/// actually running the script. Returns the merged list of
191/// `host_tools` (declared in the config) and embedded-block sources.
192///
193/// MCP server tools are deliberately omitted — they only become known
194/// after the MCP `initialize` handshake. Use `tool.schema()` from
195/// inside the running script for that view.
196pub fn inspect_tools(config: &BlockConfig) -> Vec<ToolMeta> {
197 let mut out = Vec::new();
198 for t in &config.host_tools {
199 out.push(ToolMeta {
200 name: t.name.clone(),
201 description: t.description.clone(),
202 group: t.group.clone(),
203 source: ToolSource::HostRust,
204 });
205 }
206 for (name, _src) in EMBEDDED_BLOCKS {
207 out.push(ToolMeta {
208 name: (*name).to_string(),
209 description: format!("Embedded StdPkg block (require(\"{name}\"))"),
210 group: None,
211 source: ToolSource::EmbeddedBlock,
212 });
213 }
214 out
215}
216
217/// Build the `blocks/` portion of `package.path` from filesystem locations.
218///
219/// Priority (highest first):
220/// 1. `project_root/blocks/` — user-customisable, overrides embedded StdPkg
221/// 2. `exe_dir/blocks/` — development hot-reload (next to the binary)
222///
223/// Returns a semicolon-terminated string ready to prepend to `package.path`,
224/// or an empty string when no `blocks/` directories are found.
225fn build_blocks_path(project_root: &Path) -> String {
226 let mut out = String::new();
227
228 // 1. project_root/blocks/
229 let project_blocks = project_root.join("blocks");
230 if project_blocks.is_dir() {
231 let pb = project_blocks.to_string_lossy();
232 out.push_str(&format!("{pb}/?.lua;{pb}/?/init.lua;"));
233 }
234
235 // 2. exe_dir/blocks/
236 match std::env::current_exe() {
237 Ok(exe) => {
238 if let Some(exe_dir) = exe.parent() {
239 let exe_blocks = exe_dir.join("blocks");
240 if exe_blocks.is_dir() {
241 let eb = exe_blocks.to_string_lossy();
242 out.push_str(&format!("{eb}/?.lua;{eb}/?/init.lua;"));
243 }
244 }
245 }
246 Err(e) => {
247 warn!(error = %e, "current_exe() failed; skipping exe_dir/blocks/ from package.path");
248 }
249 }
250
251 out
252}
253
254pub struct BlockConfig {
255 /// Lua script to execute. See [`ScriptSource`] for the supported
256 /// shapes (filesystem path / inline source / embedded default
257 /// agent invoker).
258 pub script: ScriptSource,
259 pub project_root: PathBuf,
260 pub relay_url: Option<String>,
261 /// Ed25519 secret key for mesh identity. See [`SecretKeySource`]
262 /// for the supported shapes (inline 64-hex / environment variable).
263 /// `None` generates a random keypair. Required to talk to
264 /// registry/ACL-gated hosted meshes.
265 pub secret_key: Option<SecretKeySource>,
266 /// Per-RPC timeout for every MCP round-trip (connect / list / call).
267 /// Defaults to [`agent_block_mcp::DEFAULT_RPC_TIMEOUT`].
268 pub mcp_rpc_timeout: Duration,
269 /// Prompt payload injected as `_PROMPT` Lua global. See
270 /// [`PromptSource`] for the supported shapes. `None` leaves the
271 /// global unset.
272 pub prompt: Option<PromptSource>,
273 /// Context payload injected as `_CONTEXT` Lua global (typically
274 /// the system prompt). Same shape rules as [`Self::prompt`].
275 pub context: Option<PromptSource>,
276 /// Host-side Rust handlers pre-installed on the EventBus before the user
277 /// script starts. Each entry registers `handler` against `kind` via
278 /// [`EventBus::on`], so a script-side `bus.emit(kind, payload)` is
279 /// captured by the Rust handler rather than dispatched to a Lua function.
280 ///
281 /// Intended for SDK consumers that embed `agent-block-core` and need to
282 /// receive script output programmatically (e.g. a Spawner adapter that
283 /// turns LLM script output into a typed `WorkerResult`). Lua-side
284 /// `bus.on(kind, fn)` registrations layered on top of the handler Isle
285 /// are still possible, but the EventBus dispatches a single handler per
286 /// `kind` (last-write-wins), so host-side and Lua-side registrations on
287 /// the same `kind` collide; choose one side per routing key.
288 ///
289 /// Defaults to an empty map (no host handlers).
290 pub host_handlers: HashMap<String, Arc<dyn Handler>>,
291 /// Single host-side Rust handler that catches every event regardless
292 /// of `kind`. Internally registered via [`EventBus::on_any`], so it
293 /// acts as a fallback when no entry in [`Self::host_handlers`]
294 /// matches the incoming `kind`.
295 ///
296 /// This is the SDK-embed 1-shot sink: SDK consumers do not need to
297 /// invent or coordinate a string `kind` between the Lua script and
298 /// their Rust code. The agent invoker's emit-kind is irrelevant —
299 /// the handler receives every event.
300 ///
301 /// Use this when you want a single Rust handler to receive results
302 /// (typical embedded use). Use [`Self::host_handlers`] instead when
303 /// you actually need string-keyed routing (multi-source / multi-
304 /// handler dispatch). The two may coexist: kind-specific handlers
305 /// in `host_handlers` take precedence, and this single handler is
306 /// the fallback for unmatched kinds.
307 ///
308 /// Defaults to `None`.
309 pub host_handler: Option<Arc<dyn Handler>>,
310 /// Rust-implemented tools injected into the Lua tool registry
311 /// before the user script runs. Each entry becomes
312 /// indistinguishable from a Lua-defined tool: it is discoverable
313 /// via `tool.list()` / `tool.schema()`, dispatchable via
314 /// `tool.call(name, input)`, and visible to `agent.run`'s LLM
315 /// function-calling.
316 ///
317 /// SDK consumers can use this to expose Rust capabilities
318 /// (database lookups, business logic, etc.) to the LLM without
319 /// writing any Lua. See [`HostToolSpec`] and [`ToolHandler`].
320 ///
321 /// Defaults to an empty list.
322 pub host_tools: Vec<HostToolSpec>,
323 /// When `true`, the EventBus dispatcher loop is driven in the background
324 /// for the duration of the script and shut down gracefully after the
325 /// script completes. Required for SDK-embed callers that supply
326 /// [`Self::host_handlers`] and need `bus.emit(kind, payload)` events
327 /// emitted from the script to actually reach those handlers without
328 /// requiring the script to call `bus.serve()` (which blocks on
329 /// SIGTERM / Ctrl+C and never returns under programmatic embedding).
330 ///
331 /// After the script finishes, the dispatcher is given a grace window
332 /// (`AGENT_BLOCK_TASK_GRACE_MS`, default 1000ms) to drain queued events
333 /// and finish any in-flight handler, then is cancelled.
334 ///
335 /// Mutually exclusive with Lua-side `bus.serve()`: enabling this flag
336 /// takes ownership of the EventBus before the script runs, so a script
337 /// that calls `bus.on(...)` followed by `bus.serve()` will error
338 /// ("bus.serve() has already taken ownership"). Use this flag when the
339 /// script's sole purpose is to push events to host handlers.
340 ///
341 /// Defaults to `false` (legacy behavior: dispatcher only runs when the
342 /// script calls `bus.serve()`).
343 pub auto_serve_bus: bool,
344 /// Optional caller-supplied cancellation token. When cancelled, the
345 /// in-flight script is interrupted via the Isle's debug-hook cancel
346 /// path, the auto-serve dispatcher (if any) is shut down, and `run()`
347 /// returns `Err(BlockError::Cancelled)`.
348 ///
349 /// Intended for SDK consumers that spawn `run()` as a tokio task and
350 /// need an out-of-band abort signal (timeouts, parent-task cancellation
351 /// propagation, user-driven stop). The token is observed across the
352 /// `coroutine_eval` await; once cancellation propagates, the shutdown
353 /// sequence (MCP disconnect, Isle drivers, auto-serve dispatcher)
354 /// still runs so file descriptors and remote handles are released.
355 ///
356 /// Defaults to `None` (legacy behavior: `run()` only completes when
357 /// the script returns naturally).
358 pub shutdown_token: Option<CancellationToken>,
359}
360
361/// Shared context passed into Lua bridge functions.
362#[derive(Clone)]
363pub struct HostContext {
364 pub project_root: PathBuf,
365 pub mesh_agent: Option<Arc<agent_mesh_sdk::MeshAgent>>,
366 pub mcp_manager: Arc<RwLock<McpManager>>,
367 /// Shared async HTTP client for `http.*` bridge.
368 pub http_client: reqwest::Client,
369 /// Shared SQLite connection for `sql.*` bridge (user tables).
370 pub sql_conn: Arc<Mutex<rusqlite::Connection>>,
371 /// Interrupt handle for the sql connection.
372 /// Used to cancel in-flight queries on timeout (see `bridge/sql.rs`).
373 pub sql_interrupt: Arc<rusqlite::InterruptHandle>,
374 /// Shared SQLite connection for `kv.*` bridge (`__kv` table only).
375 /// Separate from sql_conn so KV scratch state and user SQL data don't
376 /// share WAL, page cache, or backup lifecycle.
377 pub kv_conn: Arc<Mutex<rusqlite::Connection>>,
378 /// Interrupt handle for the kv connection.
379 pub kv_interrupt: Arc<rusqlite::InterruptHandle>,
380 /// Shared SQLite connection for `ts.*` bridge (TSDB — time-series table).
381 /// Separate DB file so TSDB WAL does not share page cache with kv/sql.
382 pub ts_conn: Arc<Mutex<rusqlite::Connection>>,
383 /// Interrupt handle for the ts connection.
384 /// Used by `bridge::ts` to cancel in-flight queries on timeout (Subtask 2).
385 #[allow(dead_code)]
386 pub ts_interrupt: Arc<rusqlite::InterruptHandle>,
387 /// Async handle to the main Isle Lua VM that runs the user script via
388 /// `coroutine_eval`. After Subtask 2, `bridge::bus` no longer dispatches
389 /// handlers against this Isle; handlers live on `handler_isle` instead.
390 /// The field is retained because bridge code still keyed to the main
391 /// Isle (future `coroutine_call` back-edges, introspection APIs) may
392 /// need it, and removing it would force another HostContext reshape.
393 #[allow(dead_code)]
394 pub isle: Arc<AsyncIsle>,
395 /// Dedicated Isle for EventBus handler execution. Lua handlers
396 /// registered via `bus.on` / `bus.on_any` run here so that CPU-bound
397 /// handler code does not occupy the main Isle's LocalSet and block
398 /// grace timers / shutdown wakers on the main VM side.
399 ///
400 /// Used by `bridge::bus` to forward handler bytecode
401 /// (`Function::dump(true)` → `handler_isle.exec(...)`) and by
402 /// [`LuaHandler::call`](crate::bridge::bus) to dispatch via
403 /// `coroutine_call("__bus_dispatch", ...)`.
404 pub handler_isle: Arc<AsyncIsle>,
405 /// Ingress sender for the EventBus. Adapters (mesh / webhook / …)
406 /// clone this and push `Event`s. The ST3 mesh adapter captures its own
407 /// clone at `MeshAgent::connect` time, so the field itself is not read
408 /// elsewhere in the ST3 cut — kept `pub` for ST4+ adapter wiring.
409 #[allow(dead_code)]
410 pub bus_tx: mpsc::Sender<Event>,
411 /// Mutex-wrapped `Option<EventBus>` so `bus.on` / `bus.on_any` can lock
412 /// briefly from sync Lua context, and `bus.serve` can `Option::take`
413 /// ownership before entering the long-lived `run()` await (avoiding the
414 /// await-holding-lock anti-pattern on a `std::sync::Mutex`).
415 pub event_bus: Arc<Mutex<Option<EventBus>>>,
416}
417
418/// Open a SQLite connection at `path` (or `:memory:`) and apply the shared
419/// pragmas driven by ENV (`journal_mode`, `busy_timeout`). Returns the
420/// connection wrapped in Arc<Mutex<_>> together with its interrupt handle.
421///
422/// `label` is used only for the init log line (`sql` / `kv`) so that the two
423/// databases are distinguishable in tracing output.
424fn open_sqlite(
425 path: &Path,
426 label: &'static str,
427) -> BlockResult<(
428 Arc<Mutex<rusqlite::Connection>>,
429 Arc<rusqlite::InterruptHandle>,
430)> {
431 let is_memory = crate::bridge::config::is_memory_sql(path);
432 if !is_memory {
433 if let Some(parent) = path.parent() {
434 std::fs::create_dir_all(parent)
435 .map_err(|e| BlockError::Runtime(format!("{label} dir create: {e}")))?;
436 }
437 }
438 let conn = rusqlite::Connection::open(path)
439 .map_err(|e| BlockError::Runtime(format!("sqlite open {}: {e}", path.display())))?;
440 if !is_memory {
441 let journal = crate::bridge::config::sql_journal_mode();
442 conn.pragma_update(None, "journal_mode", &journal)
443 .map_err(|e| BlockError::Runtime(format!("journal_mode={journal}: {e}")))?;
444 }
445 let busy_ms = crate::bridge::config::sql_busy_timeout().as_millis() as i64;
446 conn.pragma_update(None, "busy_timeout", busy_ms)
447 .map_err(|e| BlockError::Runtime(format!("busy_timeout pragma: {e}")))?;
448 info!(label, path = %path.display(), busy_ms, "sqlite initialized");
449 let interrupt = Arc::new(conn.get_interrupt_handle());
450 let conn = Arc::new(Mutex::new(conn));
451 Ok((conn, interrupt))
452}
453
454/// Build the init closure shared between the main Isle and the handler
455/// Isle. Sets `_SCRIPT_NAME`, registers `mlua-batteries` `std.*`, and
456/// configures `package.path` / `package.searchers` so `require "agent"`
457/// (and any `blocks/` module) works inside the Lua VM.
458///
459/// Returns an `FnOnce` so each call produces a fresh closure; this lets
460/// both Isles be spawned from the same config without `Clone` bounds on
461/// the captured `HashMap`.
462fn build_isle_init(
463 script_name: String,
464 script_dir: String,
465 blocks_paths: String,
466 prompt: Option<String>,
467 context: Option<String>,
468) -> impl FnOnce(&mlua::Lua) -> mlua::Result<()> + Send + 'static {
469 move |lua| {
470 // Set script name before registering bridges (used by log.* for attribution)
471 lua.globals().set("_SCRIPT_NAME", script_name.as_str())?;
472 if let Some(ref p) = prompt {
473 lua.globals().set("_PROMPT", p.as_str())?;
474 }
475 if let Some(ref c) = context {
476 lua.globals().set("_CONTEXT", c.as_str())?;
477 }
478
479 mlua_batteries::register_all(lua, "std")?;
480
481 // ── package.path ──────────────────────────────────────────────
482 // Priority: script_dir > project_root/blocks/ > exe_dir/blocks/ > default
483 let package: mlua::Table = lua.globals().get("package")?;
484 let current_path: String = package.get("path")?;
485 let new_path =
486 format!("{script_dir}/?.lua;{script_dir}/?/init.lua;{blocks_paths}{current_path}");
487 package.set("path", new_path)?;
488
489 // ── package.searchers — embedded fallback ─────────────────────
490 // Register a custom searcher that loads blocks/ modules from the
491 // sources baked in at compile time. This is the lowest-priority
492 // searcher so filesystem copies always win.
493 let embedded: HashMap<&'static str, &'static str> =
494 EMBEDDED_BLOCKS.iter().copied().collect();
495
496 let searchers: mlua::Table = package.get("searchers")?;
497 let loader =
498 lua.create_function(move |lua, name: String| match embedded.get(name.as_str()) {
499 Some(source) => {
500 let chunk = lua
501 .load(*source)
502 .set_name(format!("@embedded:blocks/{name}/init.lua"));
503 let func = chunk.into_function()?;
504 Ok(mlua::Value::Function(func))
505 }
506 None => {
507 let msg = lua.create_string(format!("\n\tno embedded block '{name}'"))?;
508 Ok(mlua::Value::String(msg))
509 }
510 })?;
511 // Append as the last searcher so filesystem paths remain preferred.
512 let next_idx = searchers.raw_len() + 1;
513 searchers.raw_set(next_idx, loader)?;
514
515 Ok(())
516 }
517}
518
519/// Spawn the dedicated handler Isle.
520///
521/// The handler Isle runs Lua bus handlers (`bus.on` / `bus.on_any`) on a
522/// separate OS thread with its own `tokio` current-thread runtime, keeping
523/// CPU-bound handlers from starving the main Isle's grace timers.
524///
525/// Bridge registration is deferred to a follow-up `exec` in `run()` because
526/// `HostContext` is not constructible until both Isles exist (the struct
527/// itself holds `Arc<AsyncIsle>` for both).
528async fn spawn_handler_isle(
529 script_name: String,
530 script_dir: String,
531 blocks_paths: String,
532 prompt: Option<String>,
533 context: Option<String>,
534) -> BlockResult<(Arc<AsyncIsle>, AsyncIsleDriver)> {
535 let init = build_isle_init(script_name, script_dir, blocks_paths, prompt, context);
536 let (isle, driver) = AsyncIsle::builder()
537 .thread_name("agent-block-handler-isle")
538 .spawn(init)
539 .await
540 .map_err(|e| BlockError::Runtime(format!("handler isle spawn failed: {e}")))?;
541 info!(
542 thread_name = "agent-block-handler-isle",
543 "handler Isle spawned"
544 );
545 Ok((Arc::new(isle), driver))
546}
547
548fn hex_decode_32(s: &str) -> Result<[u8; 32], String> {
549 let s = s.trim();
550 if s.len() != 64 {
551 return Err(format!("expected 64 hex chars, got {}", s.len()));
552 }
553 let mut out = [0u8; 32];
554 for (i, byte) in out.iter_mut().enumerate() {
555 let hi = u8::from_str_radix(&s[2 * i..2 * i + 1], 16)
556 .map_err(|e| format!("invalid hex at position {}: {e}", 2 * i))?;
557 let lo = u8::from_str_radix(&s[2 * i + 1..2 * i + 2], 16)
558 .map_err(|e| format!("invalid hex at position {}: {e}", 2 * i + 1))?;
559 *byte = (hi << 4) | lo;
560 }
561 Ok(out)
562}
563
564pub async fn run(config: BlockConfig) -> BlockResult<()> {
565 // ── Resolve sources ───────────────────────────────────────────
566 // Convert the `Source` enums on `BlockConfig` to their concrete
567 // payloads before any Isle setup. `File`/`Path`/`Env` variants
568 // read from disk / environment exactly once, here at the start.
569 let (script_source, script_name, script_dir_pathbuf) = match &config.script {
570 ScriptSource::Path(p) => {
571 let source = std::fs::read_to_string(p)
572 .map_err(|e| BlockError::Script(format!("{}: {e}", p.display())))?;
573 let name = p
574 .file_name()
575 .map(|n| n.to_string_lossy().to_string())
576 .unwrap_or_else(|| "unknown".to_string());
577 let dir = p
578 .parent()
579 .map(|d| d.to_path_buf())
580 .unwrap_or_else(|| PathBuf::from("."));
581 (source, name, dir)
582 }
583 ScriptSource::Inline { source, name } => {
584 (source.clone(), name.clone(), config.project_root.clone())
585 }
586 ScriptSource::DefaultAgent => (
587 DEFAULT_AGENT_INVOKER.to_string(),
588 "default_agent_invoker.lua".to_string(),
589 config.project_root.clone(),
590 ),
591 };
592
593 let prompt_resolved: Option<String> = match &config.prompt {
594 Some(PromptSource::Inline(s)) => Some(s.clone()),
595 Some(PromptSource::File(p)) => Some(
596 std::fs::read_to_string(p)
597 .map_err(|e| BlockError::Script(format!("prompt file {}: {e}", p.display())))?,
598 ),
599 None => None,
600 };
601 let context_resolved: Option<String> = match &config.context {
602 Some(PromptSource::Inline(s)) => Some(s.clone()),
603 Some(PromptSource::File(p)) => Some(
604 std::fs::read_to_string(p)
605 .map_err(|e| BlockError::Script(format!("context file {}: {e}", p.display())))?,
606 ),
607 None => None,
608 };
609 let secret_key_resolved: Option<String> = match &config.secret_key {
610 Some(SecretKeySource::Inline(s)) => Some(s.clone()),
611 Some(SecretKeySource::Env(var)) => std::env::var(var).ok(),
612 None => None,
613 };
614
615 // NOTE: We previously held entered span guards across awaits for nested
616 // span context. That made the `run()` future `!Send`, which prevents
617 // SDK consumers from `tokio::spawn(run(config))`. Span context is
618 // attached to events via fields on the `info_span!` calls below; the
619 // missing nesting is an acceptable trade-off for `Send` correctness.
620 let _root_span = info_span!("agent_block", script = %script_name);
621
622 // ── .env ──────────────────────────────────────────────────────
623 // Load .env from project_root if present. Variables are merged into
624 // the process environment so Lua's `std.env.get()` picks them up.
625 let env_path = config.project_root.join(".env");
626 match dotenvy::from_path(&env_path) {
627 Ok(()) => info!(path = %env_path.display(), ".env loaded"),
628 Err(dotenvy::Error::Io(_)) => {} // file not found — fine
629 Err(e) => tracing::warn!(path = %env_path.display(), error = %e, ".env parse error"),
630 }
631
632 // ── Init ──────────────────────────────────────────────────────
633 let _init_span = info_span!("init");
634
635 // ── EventBus channel ─────────────────────────────────────────────
636 // Construct the bounded mpsc BEFORE MeshAgent::connect so the relay
637 // handler can hold a `bus_tx` clone and forward incoming requests
638 // into the dispatcher. Capacity is ENV-driven (see bridge::config).
639 let bus_capacity = crate::bridge::config::bus_capacity();
640 let (bus_tx, bus_rx) = mpsc::channel::<Event>(bus_capacity);
641 let event_bus = Arc::new(Mutex::new(Some(EventBus::new(bus_rx))));
642
643 // ── Pre-install host-side Rust handlers ───────────────────────────
644 // SDK consumers attach Rust handlers via `BlockConfig.host_handlers`
645 // so that script-side `bus.emit(kind, payload)` is captured by a Rust
646 // `Arc<dyn Handler>` instead of being dispatched to a Lua function.
647 // Registered here (before any Lua bridge registers handlers and before
648 // `bus.serve` takes ownership of the bus) so the EventBus already
649 // carries the host handlers when the script starts.
650 // Install host-side Rust handlers: kind-specific entries from
651 // `host_handlers` and, when set, the kind-agnostic `host_handler`
652 // (registered via `on_any` as the fallback for unmatched kinds).
653 // SDK-embed 1-shot callers typically only set `host_handler`.
654 let has_kind_handlers = !config.host_handlers.is_empty();
655 let has_any_handler = config.host_handler.is_some();
656 if has_kind_handlers || has_any_handler {
657 let mut guard = event_bus
658 .lock()
659 .map_err(|_| BlockError::Bus("event_bus mutex poisoned".into()))?;
660 let bus = guard
661 .as_mut()
662 .ok_or_else(|| BlockError::Bus("event_bus already taken".into()))?;
663 for (kind, handler) in &config.host_handlers {
664 bus.on(kind.clone(), Arc::clone(handler))
665 .map_err(|e| BlockError::Bus(format!("host_handlers on({kind}): {e}")))?;
666 }
667 if let Some(any_handler) = &config.host_handler {
668 bus.on_any(Arc::clone(any_handler))
669 .map_err(|e| BlockError::Bus(format!("host_handler on_any: {e}")))?;
670 }
671 info!(
672 kind_handlers = config.host_handlers.len(),
673 any_handler = has_any_handler,
674 "host handlers pre-installed"
675 );
676 }
677
678 // ── auto-serve: background dispatcher for SDK-embed callers ───────
679 // When `auto_serve_bus` is on and at least one host-side handler
680 // (kind-specific or kind-agnostic) is installed, take the EventBus
681 // out of the Mutex *before* the script runs and spawn the dispatcher
682 // loop on the runtime. This lets `bus.emit(kind, payload)` from the
683 // script reach the host handler without requiring the script to call
684 // `bus.serve()` (which blocks on signals and never returns under
685 // programmatic embedding).
686 let auto_serve = config.auto_serve_bus && (has_kind_handlers || has_any_handler);
687 let auto_serve_state: Option<(tokio::task::JoinHandle<()>, CancellationToken)> = if auto_serve {
688 let bus = {
689 let mut guard = event_bus
690 .lock()
691 .map_err(|_| BlockError::Bus("event_bus mutex poisoned".into()))?;
692 guard
693 .take()
694 .ok_or_else(|| BlockError::Bus("event_bus already taken".into()))?
695 };
696 let token = CancellationToken::new();
697 let token_for_task = token.clone();
698 let handle = tokio::spawn(async move {
699 let mut bus = bus;
700 if let Err(e) = bus.run(token_for_task).await {
701 tracing::error!(error = %e, "auto-serve: dispatcher loop returned error");
702 }
703 });
704 info!("auto-serve: dispatcher spawned");
705 Some((handle, token))
706 } else {
707 None
708 };
709
710 let mesh_agent = if let Some(ref relay_url) = config.relay_url {
711 let keypair = match &secret_key_resolved {
712 Some(hex_str) => {
713 let bytes = hex_decode_32(hex_str)
714 .map_err(|e| BlockError::Runtime(format!("secret-key: {e}")))?;
715 agent_mesh_core::identity::AgentKeypair::from_bytes(&bytes)
716 }
717 None => agent_mesh_core::identity::AgentKeypair::generate(),
718 };
719 info!(agent_id = %keypair.agent_id(), "mesh identity");
720 let acl = agent_mesh_core::acl::AclPolicy {
721 default_deny: false,
722 rules: vec![],
723 };
724 let handler: Arc<dyn agent_mesh_sdk::RequestHandler> =
725 Arc::new(BusRelayHandler::new(bus_tx.clone()));
726 let url = relay_url.clone();
727 let agent = agent_mesh_sdk::MeshAgent::connect(keypair, &url, acl, handler)
728 .await
729 .map_err(|e| BlockError::Mesh(format!("connect to {relay_url} failed: {e}")))?;
730 info!(relay_url = %relay_url, "mesh connected");
731 Some(Arc::new(agent))
732 } else {
733 None
734 };
735
736 let mcp_manager = Arc::new(RwLock::new(McpManager::with_rpc_timeout(
737 config.mcp_rpc_timeout,
738 )?));
739
740 // Resolve project_root to absolute path.
741 // canonicalize() can fail if the path doesn't exist; fall back to
742 // joining with current_dir to guarantee an absolute path.
743 let project_root = config
744 .project_root
745 .canonicalize()
746 .or_else(|_| std::env::current_dir().map(|cwd| cwd.join(&config.project_root)))?;
747
748 let http_client = reqwest::Client::new();
749
750 // ── SQLite init (kv + sql get separate DB files) ──────────────────────
751 // All knobs are ENV-driven (see `bridge/config.rs`).
752 let sql_path = crate::bridge::config::sql_path().map_err(BlockError::Runtime)?;
753 let (sql_conn, sql_interrupt) = open_sqlite(&sql_path, "sql")?;
754
755 let kv_path = crate::bridge::config::kv_path().map_err(BlockError::Runtime)?;
756 let (kv_conn, kv_interrupt) = open_sqlite(&kv_path, "kv")?;
757
758 let ts_path = crate::bridge::config::ts_path().map_err(BlockError::Runtime)?;
759 let (ts_conn, ts_interrupt) = open_sqlite(&ts_path, "ts")?;
760
761 // Use the script dir derived from the resolved `ScriptSource` for
762 // `package.path` lookups. For inline / default-agent variants the dir
763 // falls back to `project_root` (set during source resolution above).
764 let script_dir = script_dir_pathbuf.to_string_lossy().to_string();
765
766 // Precompute values captured by the init closure so we don't need to
767 // move the full `HostContext` into it (HostContext now holds
768 // `Arc<AsyncIsle>`, which is available only after `AsyncIsle::spawn`
769 // returns — classic chicken-and-egg). All bridge registrations run in a
770 // second pass via `isle.exec` below.
771 let blocks_paths = build_blocks_path(&project_root);
772 let prompt = prompt_resolved.clone();
773 let context = context_resolved.clone();
774
775 // ── main Isle ─────────────────────────────────────────────────
776 let (isle, driver) = AsyncIsle::spawn(build_isle_init(
777 script_name.clone(),
778 script_dir.clone(),
779 blocks_paths.clone(),
780 prompt.clone(),
781 context.clone(),
782 ))
783 .await
784 .map_err(|e| BlockError::Runtime(format!("AsyncIsle spawn failed: {e}")))?;
785 let isle = Arc::new(isle);
786
787 // ── handler Isle (sequential, dependencies are trivial) ────────
788 let (handler_isle, handler_driver) = spawn_handler_isle(
789 script_name.clone(),
790 script_dir.clone(),
791 blocks_paths.clone(),
792 prompt,
793 context,
794 )
795 .await?;
796
797 // Wire both Isles into McpManager so Lua notification callbacks can be
798 // dispatched from the rmcp task thread.
799 // - handler_isle: sampling/createMessage dispatch (exec on handler Isle)
800 // - main_isle: progress/log notification dispatch (exec on main Isle so
801 // user callback upvalues are preserved — no bytecode dump/reload needed)
802 {
803 let mut mgr = mcp_manager.write().await;
804 mgr.set_handler_isle(Arc::clone(&handler_isle));
805 mgr.set_main_isle(Arc::clone(&isle));
806 }
807
808 // ── HostContext + bridge registration ──────────────────────────────
809 // Wrap the isle in an Arc so `HostContext` can hand it to
810 // `bridge::bus` (which uses `AsyncIsle::coroutine_call` to invoke Lua
811 // handlers from the EventBus dispatcher task).
812 let ctx = HostContext {
813 project_root,
814 mesh_agent,
815 mcp_manager: Arc::clone(&mcp_manager),
816 http_client,
817 sql_conn,
818 sql_interrupt,
819 kv_conn,
820 kv_interrupt,
821 ts_conn,
822 ts_interrupt,
823 isle: Arc::clone(&isle),
824 handler_isle: Arc::clone(&handler_isle),
825 bus_tx: bus_tx.clone(),
826 event_bus: Arc::clone(&event_bus),
827 };
828
829 {
830 let ctx = ctx.clone();
831 isle.exec(move |lua| {
832 bridge::register_all(lua, &ctx)
833 .map_err(|e| mlua_isle::IsleError::Lua(format!("bridge register failed: {e}")))?;
834 Ok(String::new())
835 })
836 .await
837 .map_err(|e| BlockError::Runtime(format!("bridge register: {e}")))?;
838 }
839
840 {
841 let ctx = ctx.clone();
842 handler_isle
843 .exec(move |lua| {
844 bridge::register_all_handler_side(lua, &ctx).map_err(|e| {
845 mlua_isle::IsleError::Lua(format!("handler bridge register failed: {e}"))
846 })?;
847 Ok(String::new())
848 })
849 .await
850 .map_err(|e| BlockError::Runtime(format!("handler bridge register: {e}")))?;
851 }
852
853 // ── Inject host_tools into the Lua tool registry ───────────────
854 // Done after `bridge::register_all` so `_TOOL_REGISTRY` exists.
855 // Each entry becomes an Anthropic-shaped tool spec table
856 // { name, schema = { description, input_schema }, handler, group? }
857 // where `handler` is a Lua async function that bridges back into
858 // the supplied `ToolHandler::call`. Lua-side `tool.list()` /
859 // `tool.schema()` / `agent.run` see these uniformly with native
860 // Lua-defined tools.
861 if !config.host_tools.is_empty() {
862 let host_tools = config.host_tools.clone();
863 let tool_count = host_tools.len();
864 isle.exec(move |lua| {
865 let registry: mlua::Table = lua
866 .globals()
867 .get("_TOOL_REGISTRY")
868 .map_err(|e| mlua_isle::IsleError::Lua(format!("get _TOOL_REGISTRY: {e}")))?;
869 for tool in host_tools {
870 let entry = lua
871 .create_table()
872 .map_err(|e| mlua_isle::IsleError::Lua(format!("create entry: {e}")))?;
873 entry
874 .set("name", tool.name.as_str())
875 .map_err(|e| mlua_isle::IsleError::Lua(format!("set name: {e}")))?;
876 // schema = { description, input_schema } — Anthropic shape
877 let schema = lua
878 .create_table()
879 .map_err(|e| mlua_isle::IsleError::Lua(format!("create schema: {e}")))?;
880 schema
881 .set("description", tool.description.as_str())
882 .map_err(|e| mlua_isle::IsleError::Lua(format!("set description: {e}")))?;
883 let input_schema_lua =
884 crate::bridge::json_to_lua(lua, tool.input_schema.clone())
885 .map_err(|e| mlua_isle::IsleError::Lua(format!("input_schema: {e}")))?;
886 schema
887 .set("input_schema", input_schema_lua)
888 .map_err(|e| mlua_isle::IsleError::Lua(format!("set input_schema: {e}")))?;
889 entry
890 .set("schema", schema)
891 .map_err(|e| mlua_isle::IsleError::Lua(format!("set schema: {e}")))?;
892 if let Some(group) = &tool.group {
893 entry
894 .set("group", group.as_str())
895 .map_err(|e| mlua_isle::IsleError::Lua(format!("set group: {e}")))?;
896 }
897 let handler_arc = Arc::clone(&tool.handler);
898 let handler_fn = lua
899 .create_async_function(move |lua, input: mlua::Value| {
900 let handler = Arc::clone(&handler_arc);
901 async move {
902 let input_json = crate::bridge::lua_to_json(&lua, input)?;
903 let result = handler
904 .call(input_json)
905 .await
906 .map_err(mlua::Error::external)?;
907 crate::bridge::json_to_lua(&lua, result)
908 }
909 })
910 .map_err(|e| mlua_isle::IsleError::Lua(format!("create handler: {e}")))?;
911 entry
912 .set("handler", handler_fn)
913 .map_err(|e| mlua_isle::IsleError::Lua(format!("set handler: {e}")))?;
914 registry
915 .set(tool.name.as_str(), entry)
916 .map_err(|e| mlua_isle::IsleError::Lua(format!("registry set: {e}")))?;
917 }
918 Ok(String::new())
919 })
920 .await
921 .map_err(|e| BlockError::Runtime(format!("host_tools inject: {e}")))?;
922 info!(count = tool_count, "host tools injected into Lua registry");
923 }
924
925 drop(_init_span);
926
927 // ── Execute ───────────────────────────────────────────────────
928 // When `shutdown_token` is supplied, race the script future against
929 // the caller's cancellation signal. On cancel, propagate to the Isle
930 // via the AsyncTask's cancel token so the debug hook unwinds the Lua
931 // VM, then continue into the shutdown sequence below (we still want
932 // to release MCP/mesh handles and join the auto-serve dispatcher
933 // before returning).
934 let script_result: Result<(), BlockError> = {
935 let _exec_span = info_span!("execute", script = %script_name);
936
937 let mut task = isle.spawn_coroutine_eval(&script_source);
938 let task_cancel = task.cancel_token().clone();
939 match config.shutdown_token.as_ref() {
940 Some(token) => {
941 tokio::select! {
942 biased;
943 _ = token.cancelled() => {
944 task_cancel.cancel();
945 // Wait for the Isle to unwind so the VM is in a
946 // consistent state before driver shutdown. The
947 // debug hook fires at the next HOOK_INTERVAL.
948 let _ = (&mut task).await;
949 info!("shutdown_token: cancelled by caller");
950 Err(BlockError::Cancelled)
951 }
952 res = &mut task => res.map(|_| ()).map_err(|e| BlockError::Script(format!("{e}"))),
953 }
954 }
955 None => (&mut task)
956 .await
957 .map(|_| ())
958 .map_err(|e| BlockError::Script(format!("{e}"))),
959 }
960 };
961
962 // ── auto-serve drain + cancel ─────────────────────────────────
963 // Let the dispatcher drain events queued by the script, then signal
964 // shutdown and bound the join. Mirrors `bus.serve`'s grace pattern.
965 if let Some((handle, token)) = auto_serve_state {
966 let grace_ms = crate::bridge::config::task_grace_ms();
967 let grace = Duration::from_millis(grace_ms);
968 tokio::time::sleep(grace).await;
969 token.cancel();
970 match tokio::time::timeout(grace, handle).await {
971 Ok(Ok(())) => info!("auto-serve: dispatcher shut down cleanly"),
972 Ok(Err(join_err)) => {
973 tracing::error!(error = %join_err, "auto-serve: dispatcher task join error");
974 }
975 Err(_) => {
976 tracing::warn!(
977 grace_ms,
978 "auto-serve: dispatcher join timed out after cancel; forcing exit"
979 );
980 }
981 }
982 }
983
984 // ── Shutdown ──────────────────────────────────────────────────
985 {
986 let _shutdown_span = info_span!("shutdown");
987
988 mcp_manager.write().await.disconnect_all().await?;
989
990 driver
991 .shutdown()
992 .await
993 .map_err(|e| BlockError::Runtime(format!("AsyncIsle shutdown failed: {e}")))?;
994
995 // Handler Isle shutdown is independent of main shutdown: a failure
996 // here (e.g. ThreadPanic on the handler thread) is logged but does
997 // not poison the main process exit. The main Isle has already
998 // been stopped cleanly above.
999 match handler_driver.shutdown().await {
1000 Ok(()) => info!(
1001 thread_name = "agent-block-handler-isle",
1002 "handler Isle shut down"
1003 ),
1004 Err(e) => tracing::error!(
1005 error = %e,
1006 thread_name = "agent-block-handler-isle",
1007 "handler Isle shutdown failed"
1008 ),
1009 }
1010 }
1011
1012 script_result
1013}
1014
1015/// mesh → bus source adapter.
1016///
1017/// Implements [`agent_mesh_sdk::RequestHandler`] by packaging every incoming
1018/// mesh request into an [`Event`] with `kind = "mesh"`, pushing it onto the
1019/// bounded `bus_tx` channel, and awaiting the Lua handler's ack over a
1020/// oneshot channel carried inside the event.
1021///
1022/// Error paths (all `tracing::error!`-logged — silent-err-drop policy):
1023///
1024/// | Failure | Return value |
1025/// |---------------------------|----------------------------------------|
1026/// | `bus_tx.send` closed/full | `{"error": "bus channel closed"}` |
1027/// | ack receiver dropped | `{"error": "ack dropped"}` |
1028/// | Lua handler `BlockError` | `{"error": "<handler error>"}` |
1029/// | Handler exceeded 30s | `{"error": "handler timeout"}` |
1030///
1031/// The 30s ack timeout mirrors the client-side timeout on `mesh.request`
1032/// (see `src/bridge/mesh.rs`).
1033struct BusRelayHandler {
1034 tx: mpsc::Sender<Event>,
1035}
1036
1037impl BusRelayHandler {
1038 fn new(tx: mpsc::Sender<Event>) -> Self {
1039 Self { tx }
1040 }
1041}
1042
1043/// Bound used for both the mesh-adapter ack wait and other source timeouts.
1044const BUS_ACK_TIMEOUT: Duration = Duration::from_secs(30);
1045
1046#[async_trait::async_trait]
1047impl agent_mesh_sdk::RequestHandler for BusRelayHandler {
1048 async fn handle(
1049 &self,
1050 from: &agent_mesh_core::identity::AgentId,
1051 payload: &serde_json::Value,
1052 _cancel: agent_mesh_sdk::CancelToken,
1053 ) -> serde_json::Value {
1054 let id = uuid::Uuid::new_v4().to_string();
1055 let meta = serde_json::json!({"from": from.to_string()});
1056 let (ack_tx, ack_rx) = oneshot::channel();
1057 let event = Event {
1058 kind: "mesh".into(),
1059 id: id.clone(),
1060 payload: payload.clone(),
1061 meta,
1062 ack_tx: Some(ack_tx),
1063 };
1064
1065 if let Err(e) = self.tx.send(event).await {
1066 tracing::error!(error = %e, id = %id, "bus channel closed; rejecting mesh request");
1067 return serde_json::json!({"error": "bus channel closed"});
1068 }
1069
1070 match tokio::time::timeout(BUS_ACK_TIMEOUT, ack_rx).await {
1071 Ok(Ok(Ok(v))) => v,
1072 Ok(Ok(Err(e))) => {
1073 tracing::error!(id = %id, error = %e, "mesh handler returned error");
1074 serde_json::json!({"error": e.to_string()})
1075 }
1076 Ok(Err(e)) => {
1077 tracing::error!(id = %id, error = %e, "mesh ack receiver dropped");
1078 serde_json::json!({"error": "ack dropped"})
1079 }
1080 Err(_) => {
1081 tracing::error!(id = %id, timeout_secs = BUS_ACK_TIMEOUT.as_secs(), "mesh handler timeout");
1082 serde_json::json!({"error": "handler timeout"})
1083 }
1084 }
1085 }
1086}