agent_block_core/host.rs
1//! Host — the thin Rust shell that wires up Lua VM, Mesh, HTTP, and MCP.
2//!
3//! # Responsibilities
4//!
5//! 1. Spawn an mlua-isle `AsyncIsle` (dedicated Lua VM thread with coroutine support)
6//! 2. Optionally connect to agent-mesh relay
7//! 3. Initialize the MCP manager for stdio-based MCP server connections
8//! 4. Inject all Lua stdlib bridges (`mesh.*`, `http.*`, `sh.*`, `tool.*`, `log.*`, `mcp.*`)
9//! 5. Execute the user-provided Lua script via `coroutine_eval` (async-aware)
10//! 6. Graceful shutdown (Isle + MCP servers + mesh)
11
12use std::collections::HashMap;
13use std::path::{Path, PathBuf};
14use std::sync::{Arc, Mutex};
15use std::time::Duration;
16use tokio::sync::{mpsc, oneshot, RwLock};
17
18use mlua_isle::{AsyncIsle, AsyncIsleDriver};
19use tracing::{info, info_span, warn};
20
21use crate::bridge;
22use crate::bus::{Event, EventBus, Handler};
23use agent_block_mcp::McpManager;
24use agent_block_types::error::{BlockError, BlockResult};
25use tokio_util::sync::CancellationToken;
26
27/// Embedded Lua sources for blocks/ StdPkg modules.
28/// These are baked into the binary at compile time so `cargo install` works
29/// without any extra file distribution.
30const EMBEDDED_BLOCKS: &[(&str, &str)] = &[
31 ("agent", include_str!("../blocks/agent/init.lua")),
32 ("session", include_str!("../blocks/session/init.lua")),
33 (
34 "compile_loop",
35 include_str!("../blocks/compile_loop/init.lua"),
36 ),
37];
38
39/// Embedded default agent invoker used by [`ScriptSource::DefaultAgent`].
40///
41/// Runs the StdPkg `agent` module with `_PROMPT` / `_CONTEXT` injected and
42/// emits the result on the EventBus. The emit kind is `"_"` — a neutral
43/// label with no SDK-side meaning. The result is intended to be received
44/// via [`BlockConfig::host_handler`] (the kind-agnostic single sink); the
45/// literal label is irrelevant to SDK consumers.
46const DEFAULT_AGENT_INVOKER: &str = r#"
47local agent = require("agent")
48local r = agent.run({
49 prompt = _PROMPT,
50 system = _CONTEXT,
51})
52bus.emit("_", r)
53"#;
54
55/// How the Lua script source for `run()` is supplied.
56///
57/// `Path` matches the CLI form (`agent-block -s <path>`), reading from
58/// the filesystem at start. `Inline` lets SDK consumers pass a script
59/// they hold in memory (compile-time `include_str!`, dynamically built
60/// string, etc.) without writing it to a tempfile. `DefaultAgent` uses
61/// an embedded invoker that runs the StdPkg `agent` module with the
62/// caller-supplied prompt/context and emits the result via
63/// `bus.emit("agent_result", ...)`.
64#[derive(Debug, Clone)]
65pub enum ScriptSource {
66 /// Read the script from a filesystem path at start.
67 Path(PathBuf),
68 /// Use the supplied source code directly.
69 Inline {
70 /// Lua source code.
71 source: String,
72 /// Display name used in tracing, error messages, and the Lua
73 /// `_SCRIPT_NAME` global (e.g. `"agent_invoker.lua"`).
74 name: String,
75 },
76 /// Use the embedded default agent invoker. `prompt` / `context`
77 /// are forwarded as `_PROMPT` / `_CONTEXT` Lua globals and the
78 /// agent result is emitted on the EventBus under a neutral label
79 /// (`"_"`). SDK consumers should pair this with
80 /// [`BlockConfig::host_handler`] (the kind-agnostic single sink)
81 /// and `auto_serve_bus = true`. The emit-kind is intentionally
82 /// meaningless; consumers that need string-keyed routing should
83 /// supply [`ScriptSource::Inline`] with their own invoker.
84 DefaultAgent,
85}
86
87/// How a string payload (prompt / system context) is supplied.
88///
89/// `Inline` is the literal string variant (CLI `--prompt` / `--context`).
90/// `File` reads the contents from disk at `run()` start (CLI
91/// `--prompt-file` / `--context-file`).
92#[derive(Debug, Clone)]
93pub enum PromptSource {
94 /// Literal string.
95 Inline(String),
96 /// Filesystem path; contents are read at `run()` start.
97 File(PathBuf),
98}
99
100/// How the Ed25519 mesh identity secret key is supplied.
101///
102/// `Inline` is a 64-hex literal. `Env` reads the named environment
103/// variable at `run()` start (CLI default uses
104/// `AGENT_BLOCK_MESH_SECRET_KEY`). Absence of any `SecretKeySource`
105/// (i.e. `BlockConfig.secret_key = None`) causes a random keypair to
106/// be generated, matching the prior behavior.
107#[derive(Debug, Clone)]
108pub enum SecretKeySource {
109 /// 64-character hex literal.
110 Inline(String),
111 /// Environment variable name to read at start.
112 Env(String),
113}
114
115/// Build the `blocks/` portion of `package.path` from filesystem locations.
116///
117/// Priority (highest first):
118/// 1. `project_root/blocks/` — user-customisable, overrides embedded StdPkg
119/// 2. `exe_dir/blocks/` — development hot-reload (next to the binary)
120///
121/// Returns a semicolon-terminated string ready to prepend to `package.path`,
122/// or an empty string when no `blocks/` directories are found.
123fn build_blocks_path(project_root: &Path) -> String {
124 let mut out = String::new();
125
126 // 1. project_root/blocks/
127 let project_blocks = project_root.join("blocks");
128 if project_blocks.is_dir() {
129 let pb = project_blocks.to_string_lossy();
130 out.push_str(&format!("{pb}/?.lua;{pb}/?/init.lua;"));
131 }
132
133 // 2. exe_dir/blocks/
134 match std::env::current_exe() {
135 Ok(exe) => {
136 if let Some(exe_dir) = exe.parent() {
137 let exe_blocks = exe_dir.join("blocks");
138 if exe_blocks.is_dir() {
139 let eb = exe_blocks.to_string_lossy();
140 out.push_str(&format!("{eb}/?.lua;{eb}/?/init.lua;"));
141 }
142 }
143 }
144 Err(e) => {
145 warn!(error = %e, "current_exe() failed; skipping exe_dir/blocks/ from package.path");
146 }
147 }
148
149 out
150}
151
152pub struct BlockConfig {
153 /// Lua script to execute. See [`ScriptSource`] for the supported
154 /// shapes (filesystem path / inline source / embedded default
155 /// agent invoker).
156 pub script: ScriptSource,
157 pub project_root: PathBuf,
158 pub relay_url: Option<String>,
159 /// Ed25519 secret key for mesh identity. See [`SecretKeySource`]
160 /// for the supported shapes (inline 64-hex / environment variable).
161 /// `None` generates a random keypair. Required to talk to
162 /// registry/ACL-gated hosted meshes.
163 pub secret_key: Option<SecretKeySource>,
164 /// Per-RPC timeout for every MCP round-trip (connect / list / call).
165 /// Defaults to [`agent_block_mcp::DEFAULT_RPC_TIMEOUT`].
166 pub mcp_rpc_timeout: Duration,
167 /// Prompt payload injected as `_PROMPT` Lua global. See
168 /// [`PromptSource`] for the supported shapes. `None` leaves the
169 /// global unset.
170 pub prompt: Option<PromptSource>,
171 /// Context payload injected as `_CONTEXT` Lua global (typically
172 /// the system prompt). Same shape rules as [`Self::prompt`].
173 pub context: Option<PromptSource>,
174 /// Host-side Rust handlers pre-installed on the EventBus before the user
175 /// script starts. Each entry registers `handler` against `kind` via
176 /// [`EventBus::on`], so a script-side `bus.emit(kind, payload)` is
177 /// captured by the Rust handler rather than dispatched to a Lua function.
178 ///
179 /// Intended for SDK consumers that embed `agent-block-core` and need to
180 /// receive script output programmatically (e.g. a Spawner adapter that
181 /// turns LLM script output into a typed `WorkerResult`). Lua-side
182 /// `bus.on(kind, fn)` registrations layered on top of the handler Isle
183 /// are still possible, but the EventBus dispatches a single handler per
184 /// `kind` (last-write-wins), so host-side and Lua-side registrations on
185 /// the same `kind` collide; choose one side per routing key.
186 ///
187 /// Defaults to an empty map (no host handlers).
188 pub host_handlers: HashMap<String, Arc<dyn Handler>>,
189 /// Single host-side Rust handler that catches every event regardless
190 /// of `kind`. Internally registered via [`EventBus::on_any`], so it
191 /// acts as a fallback when no entry in [`Self::host_handlers`]
192 /// matches the incoming `kind`.
193 ///
194 /// This is the SDK-embed 1-shot sink: SDK consumers do not need to
195 /// invent or coordinate a string `kind` between the Lua script and
196 /// their Rust code. The agent invoker's emit-kind is irrelevant —
197 /// the handler receives every event.
198 ///
199 /// Use this when you want a single Rust handler to receive results
200 /// (typical embedded use). Use [`Self::host_handlers`] instead when
201 /// you actually need string-keyed routing (multi-source / multi-
202 /// handler dispatch). The two may coexist: kind-specific handlers
203 /// in `host_handlers` take precedence, and this single handler is
204 /// the fallback for unmatched kinds.
205 ///
206 /// Defaults to `None`.
207 pub host_handler: Option<Arc<dyn Handler>>,
208 /// When `true`, the EventBus dispatcher loop is driven in the background
209 /// for the duration of the script and shut down gracefully after the
210 /// script completes. Required for SDK-embed callers that supply
211 /// [`Self::host_handlers`] and need `bus.emit(kind, payload)` events
212 /// emitted from the script to actually reach those handlers without
213 /// requiring the script to call `bus.serve()` (which blocks on
214 /// SIGTERM / Ctrl+C and never returns under programmatic embedding).
215 ///
216 /// After the script finishes, the dispatcher is given a grace window
217 /// (`AGENT_BLOCK_TASK_GRACE_MS`, default 1000ms) to drain queued events
218 /// and finish any in-flight handler, then is cancelled.
219 ///
220 /// Mutually exclusive with Lua-side `bus.serve()`: enabling this flag
221 /// takes ownership of the EventBus before the script runs, so a script
222 /// that calls `bus.on(...)` followed by `bus.serve()` will error
223 /// ("bus.serve() has already taken ownership"). Use this flag when the
224 /// script's sole purpose is to push events to host handlers.
225 ///
226 /// Defaults to `false` (legacy behavior: dispatcher only runs when the
227 /// script calls `bus.serve()`).
228 pub auto_serve_bus: bool,
229 /// Optional caller-supplied cancellation token. When cancelled, the
230 /// in-flight script is interrupted via the Isle's debug-hook cancel
231 /// path, the auto-serve dispatcher (if any) is shut down, and `run()`
232 /// returns `Err(BlockError::Cancelled)`.
233 ///
234 /// Intended for SDK consumers that spawn `run()` as a tokio task and
235 /// need an out-of-band abort signal (timeouts, parent-task cancellation
236 /// propagation, user-driven stop). The token is observed across the
237 /// `coroutine_eval` await; once cancellation propagates, the shutdown
238 /// sequence (MCP disconnect, Isle drivers, auto-serve dispatcher)
239 /// still runs so file descriptors and remote handles are released.
240 ///
241 /// Defaults to `None` (legacy behavior: `run()` only completes when
242 /// the script returns naturally).
243 pub shutdown_token: Option<CancellationToken>,
244}
245
246/// Shared context passed into Lua bridge functions.
247#[derive(Clone)]
248pub struct HostContext {
249 pub project_root: PathBuf,
250 pub mesh_agent: Option<Arc<agent_mesh_sdk::MeshAgent>>,
251 pub mcp_manager: Arc<RwLock<McpManager>>,
252 /// Shared async HTTP client for `http.*` bridge.
253 pub http_client: reqwest::Client,
254 /// Shared SQLite connection for `sql.*` bridge (user tables).
255 pub sql_conn: Arc<Mutex<rusqlite::Connection>>,
256 /// Interrupt handle for the sql connection.
257 /// Used to cancel in-flight queries on timeout (see `bridge/sql.rs`).
258 pub sql_interrupt: Arc<rusqlite::InterruptHandle>,
259 /// Shared SQLite connection for `kv.*` bridge (`__kv` table only).
260 /// Separate from sql_conn so KV scratch state and user SQL data don't
261 /// share WAL, page cache, or backup lifecycle.
262 pub kv_conn: Arc<Mutex<rusqlite::Connection>>,
263 /// Interrupt handle for the kv connection.
264 pub kv_interrupt: Arc<rusqlite::InterruptHandle>,
265 /// Shared SQLite connection for `ts.*` bridge (TSDB — time-series table).
266 /// Separate DB file so TSDB WAL does not share page cache with kv/sql.
267 pub ts_conn: Arc<Mutex<rusqlite::Connection>>,
268 /// Interrupt handle for the ts connection.
269 /// Used by `bridge::ts` to cancel in-flight queries on timeout (Subtask 2).
270 #[allow(dead_code)]
271 pub ts_interrupt: Arc<rusqlite::InterruptHandle>,
272 /// Async handle to the main Isle Lua VM that runs the user script via
273 /// `coroutine_eval`. After Subtask 2, `bridge::bus` no longer dispatches
274 /// handlers against this Isle; handlers live on `handler_isle` instead.
275 /// The field is retained because bridge code still keyed to the main
276 /// Isle (future `coroutine_call` back-edges, introspection APIs) may
277 /// need it, and removing it would force another HostContext reshape.
278 #[allow(dead_code)]
279 pub isle: Arc<AsyncIsle>,
280 /// Dedicated Isle for EventBus handler execution. Lua handlers
281 /// registered via `bus.on` / `bus.on_any` run here so that CPU-bound
282 /// handler code does not occupy the main Isle's LocalSet and block
283 /// grace timers / shutdown wakers on the main VM side.
284 ///
285 /// Used by `bridge::bus` to forward handler bytecode
286 /// (`Function::dump(true)` → `handler_isle.exec(...)`) and by
287 /// [`LuaHandler::call`](crate::bridge::bus) to dispatch via
288 /// `coroutine_call("__bus_dispatch", ...)`.
289 pub handler_isle: Arc<AsyncIsle>,
290 /// Ingress sender for the EventBus. Adapters (mesh / webhook / …)
291 /// clone this and push `Event`s. The ST3 mesh adapter captures its own
292 /// clone at `MeshAgent::connect` time, so the field itself is not read
293 /// elsewhere in the ST3 cut — kept `pub` for ST4+ adapter wiring.
294 #[allow(dead_code)]
295 pub bus_tx: mpsc::Sender<Event>,
296 /// Mutex-wrapped `Option<EventBus>` so `bus.on` / `bus.on_any` can lock
297 /// briefly from sync Lua context, and `bus.serve` can `Option::take`
298 /// ownership before entering the long-lived `run()` await (avoiding the
299 /// await-holding-lock anti-pattern on a `std::sync::Mutex`).
300 pub event_bus: Arc<Mutex<Option<EventBus>>>,
301}
302
303/// Open a SQLite connection at `path` (or `:memory:`) and apply the shared
304/// pragmas driven by ENV (`journal_mode`, `busy_timeout`). Returns the
305/// connection wrapped in Arc<Mutex<_>> together with its interrupt handle.
306///
307/// `label` is used only for the init log line (`sql` / `kv`) so that the two
308/// databases are distinguishable in tracing output.
309fn open_sqlite(
310 path: &Path,
311 label: &'static str,
312) -> BlockResult<(
313 Arc<Mutex<rusqlite::Connection>>,
314 Arc<rusqlite::InterruptHandle>,
315)> {
316 let is_memory = crate::bridge::config::is_memory_sql(path);
317 if !is_memory {
318 if let Some(parent) = path.parent() {
319 std::fs::create_dir_all(parent)
320 .map_err(|e| BlockError::Runtime(format!("{label} dir create: {e}")))?;
321 }
322 }
323 let conn = rusqlite::Connection::open(path)
324 .map_err(|e| BlockError::Runtime(format!("sqlite open {}: {e}", path.display())))?;
325 if !is_memory {
326 let journal = crate::bridge::config::sql_journal_mode();
327 conn.pragma_update(None, "journal_mode", &journal)
328 .map_err(|e| BlockError::Runtime(format!("journal_mode={journal}: {e}")))?;
329 }
330 let busy_ms = crate::bridge::config::sql_busy_timeout().as_millis() as i64;
331 conn.pragma_update(None, "busy_timeout", busy_ms)
332 .map_err(|e| BlockError::Runtime(format!("busy_timeout pragma: {e}")))?;
333 info!(label, path = %path.display(), busy_ms, "sqlite initialized");
334 let interrupt = Arc::new(conn.get_interrupt_handle());
335 let conn = Arc::new(Mutex::new(conn));
336 Ok((conn, interrupt))
337}
338
339/// Build the init closure shared between the main Isle and the handler
340/// Isle. Sets `_SCRIPT_NAME`, registers `mlua-batteries` `std.*`, and
341/// configures `package.path` / `package.searchers` so `require "agent"`
342/// (and any `blocks/` module) works inside the Lua VM.
343///
344/// Returns an `FnOnce` so each call produces a fresh closure; this lets
345/// both Isles be spawned from the same config without `Clone` bounds on
346/// the captured `HashMap`.
347fn build_isle_init(
348 script_name: String,
349 script_dir: String,
350 blocks_paths: String,
351 prompt: Option<String>,
352 context: Option<String>,
353) -> impl FnOnce(&mlua::Lua) -> mlua::Result<()> + Send + 'static {
354 move |lua| {
355 // Set script name before registering bridges (used by log.* for attribution)
356 lua.globals().set("_SCRIPT_NAME", script_name.as_str())?;
357 if let Some(ref p) = prompt {
358 lua.globals().set("_PROMPT", p.as_str())?;
359 }
360 if let Some(ref c) = context {
361 lua.globals().set("_CONTEXT", c.as_str())?;
362 }
363
364 mlua_batteries::register_all(lua, "std")?;
365
366 // ── package.path ──────────────────────────────────────────────
367 // Priority: script_dir > project_root/blocks/ > exe_dir/blocks/ > default
368 let package: mlua::Table = lua.globals().get("package")?;
369 let current_path: String = package.get("path")?;
370 let new_path =
371 format!("{script_dir}/?.lua;{script_dir}/?/init.lua;{blocks_paths}{current_path}");
372 package.set("path", new_path)?;
373
374 // ── package.searchers — embedded fallback ─────────────────────
375 // Register a custom searcher that loads blocks/ modules from the
376 // sources baked in at compile time. This is the lowest-priority
377 // searcher so filesystem copies always win.
378 let embedded: HashMap<&'static str, &'static str> =
379 EMBEDDED_BLOCKS.iter().copied().collect();
380
381 let searchers: mlua::Table = package.get("searchers")?;
382 let loader =
383 lua.create_function(move |lua, name: String| match embedded.get(name.as_str()) {
384 Some(source) => {
385 let chunk = lua
386 .load(*source)
387 .set_name(format!("@embedded:blocks/{name}/init.lua"));
388 let func = chunk.into_function()?;
389 Ok(mlua::Value::Function(func))
390 }
391 None => {
392 let msg = lua.create_string(format!("\n\tno embedded block '{name}'"))?;
393 Ok(mlua::Value::String(msg))
394 }
395 })?;
396 // Append as the last searcher so filesystem paths remain preferred.
397 let next_idx = searchers.raw_len() + 1;
398 searchers.raw_set(next_idx, loader)?;
399
400 Ok(())
401 }
402}
403
404/// Spawn the dedicated handler Isle.
405///
406/// The handler Isle runs Lua bus handlers (`bus.on` / `bus.on_any`) on a
407/// separate OS thread with its own `tokio` current-thread runtime, keeping
408/// CPU-bound handlers from starving the main Isle's grace timers.
409///
410/// Bridge registration is deferred to a follow-up `exec` in `run()` because
411/// `HostContext` is not constructible until both Isles exist (the struct
412/// itself holds `Arc<AsyncIsle>` for both).
413async fn spawn_handler_isle(
414 script_name: String,
415 script_dir: String,
416 blocks_paths: String,
417 prompt: Option<String>,
418 context: Option<String>,
419) -> BlockResult<(Arc<AsyncIsle>, AsyncIsleDriver)> {
420 let init = build_isle_init(script_name, script_dir, blocks_paths, prompt, context);
421 let (isle, driver) = AsyncIsle::builder()
422 .thread_name("agent-block-handler-isle")
423 .spawn(init)
424 .await
425 .map_err(|e| BlockError::Runtime(format!("handler isle spawn failed: {e}")))?;
426 info!(
427 thread_name = "agent-block-handler-isle",
428 "handler Isle spawned"
429 );
430 Ok((Arc::new(isle), driver))
431}
432
433fn hex_decode_32(s: &str) -> Result<[u8; 32], String> {
434 let s = s.trim();
435 if s.len() != 64 {
436 return Err(format!("expected 64 hex chars, got {}", s.len()));
437 }
438 let mut out = [0u8; 32];
439 for (i, byte) in out.iter_mut().enumerate() {
440 let hi = u8::from_str_radix(&s[2 * i..2 * i + 1], 16)
441 .map_err(|e| format!("invalid hex at position {}: {e}", 2 * i))?;
442 let lo = u8::from_str_radix(&s[2 * i + 1..2 * i + 2], 16)
443 .map_err(|e| format!("invalid hex at position {}: {e}", 2 * i + 1))?;
444 *byte = (hi << 4) | lo;
445 }
446 Ok(out)
447}
448
449pub async fn run(config: BlockConfig) -> BlockResult<()> {
450 // ── Resolve sources ───────────────────────────────────────────
451 // Convert the `Source` enums on `BlockConfig` to their concrete
452 // payloads before any Isle setup. `File`/`Path`/`Env` variants
453 // read from disk / environment exactly once, here at the start.
454 let (script_source, script_name, script_dir_pathbuf) = match &config.script {
455 ScriptSource::Path(p) => {
456 let source = std::fs::read_to_string(p)
457 .map_err(|e| BlockError::Script(format!("{}: {e}", p.display())))?;
458 let name = p
459 .file_name()
460 .map(|n| n.to_string_lossy().to_string())
461 .unwrap_or_else(|| "unknown".to_string());
462 let dir = p
463 .parent()
464 .map(|d| d.to_path_buf())
465 .unwrap_or_else(|| PathBuf::from("."));
466 (source, name, dir)
467 }
468 ScriptSource::Inline { source, name } => {
469 (source.clone(), name.clone(), config.project_root.clone())
470 }
471 ScriptSource::DefaultAgent => (
472 DEFAULT_AGENT_INVOKER.to_string(),
473 "default_agent_invoker.lua".to_string(),
474 config.project_root.clone(),
475 ),
476 };
477
478 let prompt_resolved: Option<String> = match &config.prompt {
479 Some(PromptSource::Inline(s)) => Some(s.clone()),
480 Some(PromptSource::File(p)) => Some(
481 std::fs::read_to_string(p)
482 .map_err(|e| BlockError::Script(format!("prompt file {}: {e}", p.display())))?,
483 ),
484 None => None,
485 };
486 let context_resolved: Option<String> = match &config.context {
487 Some(PromptSource::Inline(s)) => Some(s.clone()),
488 Some(PromptSource::File(p)) => Some(
489 std::fs::read_to_string(p)
490 .map_err(|e| BlockError::Script(format!("context file {}: {e}", p.display())))?,
491 ),
492 None => None,
493 };
494 let secret_key_resolved: Option<String> = match &config.secret_key {
495 Some(SecretKeySource::Inline(s)) => Some(s.clone()),
496 Some(SecretKeySource::Env(var)) => std::env::var(var).ok(),
497 None => None,
498 };
499
500 // NOTE: We previously held entered span guards across awaits for nested
501 // span context. That made the `run()` future `!Send`, which prevents
502 // SDK consumers from `tokio::spawn(run(config))`. Span context is
503 // attached to events via fields on the `info_span!` calls below; the
504 // missing nesting is an acceptable trade-off for `Send` correctness.
505 let _root_span = info_span!("agent_block", script = %script_name);
506
507 // ── .env ──────────────────────────────────────────────────────
508 // Load .env from project_root if present. Variables are merged into
509 // the process environment so Lua's `std.env.get()` picks them up.
510 let env_path = config.project_root.join(".env");
511 match dotenvy::from_path(&env_path) {
512 Ok(()) => info!(path = %env_path.display(), ".env loaded"),
513 Err(dotenvy::Error::Io(_)) => {} // file not found — fine
514 Err(e) => tracing::warn!(path = %env_path.display(), error = %e, ".env parse error"),
515 }
516
517 // ── Init ──────────────────────────────────────────────────────
518 let _init_span = info_span!("init");
519
520 // ── EventBus channel ─────────────────────────────────────────────
521 // Construct the bounded mpsc BEFORE MeshAgent::connect so the relay
522 // handler can hold a `bus_tx` clone and forward incoming requests
523 // into the dispatcher. Capacity is ENV-driven (see bridge::config).
524 let bus_capacity = crate::bridge::config::bus_capacity();
525 let (bus_tx, bus_rx) = mpsc::channel::<Event>(bus_capacity);
526 let event_bus = Arc::new(Mutex::new(Some(EventBus::new(bus_rx))));
527
528 // ── Pre-install host-side Rust handlers ───────────────────────────
529 // SDK consumers attach Rust handlers via `BlockConfig.host_handlers`
530 // so that script-side `bus.emit(kind, payload)` is captured by a Rust
531 // `Arc<dyn Handler>` instead of being dispatched to a Lua function.
532 // Registered here (before any Lua bridge registers handlers and before
533 // `bus.serve` takes ownership of the bus) so the EventBus already
534 // carries the host handlers when the script starts.
535 // Install host-side Rust handlers: kind-specific entries from
536 // `host_handlers` and, when set, the kind-agnostic `host_handler`
537 // (registered via `on_any` as the fallback for unmatched kinds).
538 // SDK-embed 1-shot callers typically only set `host_handler`.
539 let has_kind_handlers = !config.host_handlers.is_empty();
540 let has_any_handler = config.host_handler.is_some();
541 if has_kind_handlers || has_any_handler {
542 let mut guard = event_bus
543 .lock()
544 .map_err(|_| BlockError::Bus("event_bus mutex poisoned".into()))?;
545 let bus = guard
546 .as_mut()
547 .ok_or_else(|| BlockError::Bus("event_bus already taken".into()))?;
548 for (kind, handler) in &config.host_handlers {
549 bus.on(kind.clone(), Arc::clone(handler))
550 .map_err(|e| BlockError::Bus(format!("host_handlers on({kind}): {e}")))?;
551 }
552 if let Some(any_handler) = &config.host_handler {
553 bus.on_any(Arc::clone(any_handler))
554 .map_err(|e| BlockError::Bus(format!("host_handler on_any: {e}")))?;
555 }
556 info!(
557 kind_handlers = config.host_handlers.len(),
558 any_handler = has_any_handler,
559 "host handlers pre-installed"
560 );
561 }
562
563 // ── auto-serve: background dispatcher for SDK-embed callers ───────
564 // When `auto_serve_bus` is on and at least one host-side handler
565 // (kind-specific or kind-agnostic) is installed, take the EventBus
566 // out of the Mutex *before* the script runs and spawn the dispatcher
567 // loop on the runtime. This lets `bus.emit(kind, payload)` from the
568 // script reach the host handler without requiring the script to call
569 // `bus.serve()` (which blocks on signals and never returns under
570 // programmatic embedding).
571 let auto_serve = config.auto_serve_bus && (has_kind_handlers || has_any_handler);
572 let auto_serve_state: Option<(tokio::task::JoinHandle<()>, CancellationToken)> = if auto_serve {
573 let bus = {
574 let mut guard = event_bus
575 .lock()
576 .map_err(|_| BlockError::Bus("event_bus mutex poisoned".into()))?;
577 guard
578 .take()
579 .ok_or_else(|| BlockError::Bus("event_bus already taken".into()))?
580 };
581 let token = CancellationToken::new();
582 let token_for_task = token.clone();
583 let handle = tokio::spawn(async move {
584 let mut bus = bus;
585 if let Err(e) = bus.run(token_for_task).await {
586 tracing::error!(error = %e, "auto-serve: dispatcher loop returned error");
587 }
588 });
589 info!("auto-serve: dispatcher spawned");
590 Some((handle, token))
591 } else {
592 None
593 };
594
595 let mesh_agent = if let Some(ref relay_url) = config.relay_url {
596 let keypair = match &secret_key_resolved {
597 Some(hex_str) => {
598 let bytes = hex_decode_32(hex_str)
599 .map_err(|e| BlockError::Runtime(format!("secret-key: {e}")))?;
600 agent_mesh_core::identity::AgentKeypair::from_bytes(&bytes)
601 }
602 None => agent_mesh_core::identity::AgentKeypair::generate(),
603 };
604 info!(agent_id = %keypair.agent_id(), "mesh identity");
605 let acl = agent_mesh_core::acl::AclPolicy {
606 default_deny: false,
607 rules: vec![],
608 };
609 let handler: Arc<dyn agent_mesh_sdk::RequestHandler> =
610 Arc::new(BusRelayHandler::new(bus_tx.clone()));
611 let url = relay_url.clone();
612 let agent = agent_mesh_sdk::MeshAgent::connect(keypair, &url, acl, handler)
613 .await
614 .map_err(|e| BlockError::Mesh(format!("connect to {relay_url} failed: {e}")))?;
615 info!(relay_url = %relay_url, "mesh connected");
616 Some(Arc::new(agent))
617 } else {
618 None
619 };
620
621 let mcp_manager = Arc::new(RwLock::new(McpManager::with_rpc_timeout(
622 config.mcp_rpc_timeout,
623 )?));
624
625 // Resolve project_root to absolute path.
626 // canonicalize() can fail if the path doesn't exist; fall back to
627 // joining with current_dir to guarantee an absolute path.
628 let project_root = config
629 .project_root
630 .canonicalize()
631 .or_else(|_| std::env::current_dir().map(|cwd| cwd.join(&config.project_root)))?;
632
633 let http_client = reqwest::Client::new();
634
635 // ── SQLite init (kv + sql get separate DB files) ──────────────────────
636 // All knobs are ENV-driven (see `bridge/config.rs`).
637 let sql_path = crate::bridge::config::sql_path().map_err(BlockError::Runtime)?;
638 let (sql_conn, sql_interrupt) = open_sqlite(&sql_path, "sql")?;
639
640 let kv_path = crate::bridge::config::kv_path().map_err(BlockError::Runtime)?;
641 let (kv_conn, kv_interrupt) = open_sqlite(&kv_path, "kv")?;
642
643 let ts_path = crate::bridge::config::ts_path().map_err(BlockError::Runtime)?;
644 let (ts_conn, ts_interrupt) = open_sqlite(&ts_path, "ts")?;
645
646 // Use the script dir derived from the resolved `ScriptSource` for
647 // `package.path` lookups. For inline / default-agent variants the dir
648 // falls back to `project_root` (set during source resolution above).
649 let script_dir = script_dir_pathbuf.to_string_lossy().to_string();
650
651 // Precompute values captured by the init closure so we don't need to
652 // move the full `HostContext` into it (HostContext now holds
653 // `Arc<AsyncIsle>`, which is available only after `AsyncIsle::spawn`
654 // returns — classic chicken-and-egg). All bridge registrations run in a
655 // second pass via `isle.exec` below.
656 let blocks_paths = build_blocks_path(&project_root);
657 let prompt = prompt_resolved.clone();
658 let context = context_resolved.clone();
659
660 // ── main Isle ─────────────────────────────────────────────────
661 let (isle, driver) = AsyncIsle::spawn(build_isle_init(
662 script_name.clone(),
663 script_dir.clone(),
664 blocks_paths.clone(),
665 prompt.clone(),
666 context.clone(),
667 ))
668 .await
669 .map_err(|e| BlockError::Runtime(format!("AsyncIsle spawn failed: {e}")))?;
670 let isle = Arc::new(isle);
671
672 // ── handler Isle (sequential, dependencies are trivial) ────────
673 let (handler_isle, handler_driver) = spawn_handler_isle(
674 script_name.clone(),
675 script_dir.clone(),
676 blocks_paths.clone(),
677 prompt,
678 context,
679 )
680 .await?;
681
682 // Wire both Isles into McpManager so Lua notification callbacks can be
683 // dispatched from the rmcp task thread.
684 // - handler_isle: sampling/createMessage dispatch (exec on handler Isle)
685 // - main_isle: progress/log notification dispatch (exec on main Isle so
686 // user callback upvalues are preserved — no bytecode dump/reload needed)
687 {
688 let mut mgr = mcp_manager.write().await;
689 mgr.set_handler_isle(Arc::clone(&handler_isle));
690 mgr.set_main_isle(Arc::clone(&isle));
691 }
692
693 // ── HostContext + bridge registration ──────────────────────────────
694 // Wrap the isle in an Arc so `HostContext` can hand it to
695 // `bridge::bus` (which uses `AsyncIsle::coroutine_call` to invoke Lua
696 // handlers from the EventBus dispatcher task).
697 let ctx = HostContext {
698 project_root,
699 mesh_agent,
700 mcp_manager: Arc::clone(&mcp_manager),
701 http_client,
702 sql_conn,
703 sql_interrupt,
704 kv_conn,
705 kv_interrupt,
706 ts_conn,
707 ts_interrupt,
708 isle: Arc::clone(&isle),
709 handler_isle: Arc::clone(&handler_isle),
710 bus_tx: bus_tx.clone(),
711 event_bus: Arc::clone(&event_bus),
712 };
713
714 {
715 let ctx = ctx.clone();
716 isle.exec(move |lua| {
717 bridge::register_all(lua, &ctx)
718 .map_err(|e| mlua_isle::IsleError::Lua(format!("bridge register failed: {e}")))?;
719 Ok(String::new())
720 })
721 .await
722 .map_err(|e| BlockError::Runtime(format!("bridge register: {e}")))?;
723 }
724
725 {
726 let ctx = ctx.clone();
727 handler_isle
728 .exec(move |lua| {
729 bridge::register_all_handler_side(lua, &ctx).map_err(|e| {
730 mlua_isle::IsleError::Lua(format!("handler bridge register failed: {e}"))
731 })?;
732 Ok(String::new())
733 })
734 .await
735 .map_err(|e| BlockError::Runtime(format!("handler bridge register: {e}")))?;
736 }
737
738 drop(_init_span);
739
740 // ── Execute ───────────────────────────────────────────────────
741 // When `shutdown_token` is supplied, race the script future against
742 // the caller's cancellation signal. On cancel, propagate to the Isle
743 // via the AsyncTask's cancel token so the debug hook unwinds the Lua
744 // VM, then continue into the shutdown sequence below (we still want
745 // to release MCP/mesh handles and join the auto-serve dispatcher
746 // before returning).
747 let script_result: Result<(), BlockError> = {
748 let _exec_span = info_span!("execute", script = %script_name);
749
750 let mut task = isle.spawn_coroutine_eval(&script_source);
751 let task_cancel = task.cancel_token().clone();
752 match config.shutdown_token.as_ref() {
753 Some(token) => {
754 tokio::select! {
755 biased;
756 _ = token.cancelled() => {
757 task_cancel.cancel();
758 // Wait for the Isle to unwind so the VM is in a
759 // consistent state before driver shutdown. The
760 // debug hook fires at the next HOOK_INTERVAL.
761 let _ = (&mut task).await;
762 info!("shutdown_token: cancelled by caller");
763 Err(BlockError::Cancelled)
764 }
765 res = &mut task => res.map(|_| ()).map_err(|e| BlockError::Script(format!("{e}"))),
766 }
767 }
768 None => (&mut task)
769 .await
770 .map(|_| ())
771 .map_err(|e| BlockError::Script(format!("{e}"))),
772 }
773 };
774
775 // ── auto-serve drain + cancel ─────────────────────────────────
776 // Let the dispatcher drain events queued by the script, then signal
777 // shutdown and bound the join. Mirrors `bus.serve`'s grace pattern.
778 if let Some((handle, token)) = auto_serve_state {
779 let grace_ms = crate::bridge::config::task_grace_ms();
780 let grace = Duration::from_millis(grace_ms);
781 tokio::time::sleep(grace).await;
782 token.cancel();
783 match tokio::time::timeout(grace, handle).await {
784 Ok(Ok(())) => info!("auto-serve: dispatcher shut down cleanly"),
785 Ok(Err(join_err)) => {
786 tracing::error!(error = %join_err, "auto-serve: dispatcher task join error");
787 }
788 Err(_) => {
789 tracing::warn!(
790 grace_ms,
791 "auto-serve: dispatcher join timed out after cancel; forcing exit"
792 );
793 }
794 }
795 }
796
797 // ── Shutdown ──────────────────────────────────────────────────
798 {
799 let _shutdown_span = info_span!("shutdown");
800
801 mcp_manager.write().await.disconnect_all().await?;
802
803 driver
804 .shutdown()
805 .await
806 .map_err(|e| BlockError::Runtime(format!("AsyncIsle shutdown failed: {e}")))?;
807
808 // Handler Isle shutdown is independent of main shutdown: a failure
809 // here (e.g. ThreadPanic on the handler thread) is logged but does
810 // not poison the main process exit. The main Isle has already
811 // been stopped cleanly above.
812 match handler_driver.shutdown().await {
813 Ok(()) => info!(
814 thread_name = "agent-block-handler-isle",
815 "handler Isle shut down"
816 ),
817 Err(e) => tracing::error!(
818 error = %e,
819 thread_name = "agent-block-handler-isle",
820 "handler Isle shutdown failed"
821 ),
822 }
823 }
824
825 script_result
826}
827
828/// mesh → bus source adapter.
829///
830/// Implements [`agent_mesh_sdk::RequestHandler`] by packaging every incoming
831/// mesh request into an [`Event`] with `kind = "mesh"`, pushing it onto the
832/// bounded `bus_tx` channel, and awaiting the Lua handler's ack over a
833/// oneshot channel carried inside the event.
834///
835/// Error paths (all `tracing::error!`-logged — silent-err-drop policy):
836///
837/// | Failure | Return value |
838/// |---------------------------|----------------------------------------|
839/// | `bus_tx.send` closed/full | `{"error": "bus channel closed"}` |
840/// | ack receiver dropped | `{"error": "ack dropped"}` |
841/// | Lua handler `BlockError` | `{"error": "<handler error>"}` |
842/// | Handler exceeded 30s | `{"error": "handler timeout"}` |
843///
844/// The 30s ack timeout mirrors the client-side timeout on `mesh.request`
845/// (see `src/bridge/mesh.rs`).
846struct BusRelayHandler {
847 tx: mpsc::Sender<Event>,
848}
849
850impl BusRelayHandler {
851 fn new(tx: mpsc::Sender<Event>) -> Self {
852 Self { tx }
853 }
854}
855
856/// Bound used for both the mesh-adapter ack wait and other source timeouts.
857const BUS_ACK_TIMEOUT: Duration = Duration::from_secs(30);
858
859#[async_trait::async_trait]
860impl agent_mesh_sdk::RequestHandler for BusRelayHandler {
861 async fn handle(
862 &self,
863 from: &agent_mesh_core::identity::AgentId,
864 payload: &serde_json::Value,
865 _cancel: agent_mesh_sdk::CancelToken,
866 ) -> serde_json::Value {
867 let id = uuid::Uuid::new_v4().to_string();
868 let meta = serde_json::json!({"from": from.to_string()});
869 let (ack_tx, ack_rx) = oneshot::channel();
870 let event = Event {
871 kind: "mesh".into(),
872 id: id.clone(),
873 payload: payload.clone(),
874 meta,
875 ack_tx: Some(ack_tx),
876 };
877
878 if let Err(e) = self.tx.send(event).await {
879 tracing::error!(error = %e, id = %id, "bus channel closed; rejecting mesh request");
880 return serde_json::json!({"error": "bus channel closed"});
881 }
882
883 match tokio::time::timeout(BUS_ACK_TIMEOUT, ack_rx).await {
884 Ok(Ok(Ok(v))) => v,
885 Ok(Ok(Err(e))) => {
886 tracing::error!(id = %id, error = %e, "mesh handler returned error");
887 serde_json::json!({"error": e.to_string()})
888 }
889 Ok(Err(e)) => {
890 tracing::error!(id = %id, error = %e, "mesh ack receiver dropped");
891 serde_json::json!({"error": "ack dropped"})
892 }
893 Err(_) => {
894 tracing::error!(id = %id, timeout_secs = BUS_ACK_TIMEOUT.as_secs(), "mesh handler timeout");
895 serde_json::json!({"error": "handler timeout"})
896 }
897 }
898 }
899}