agent_block_core/host.rs
1//! Host — the thin Rust shell that wires up Lua VM, Mesh, HTTP, and MCP.
2//!
3//! # Responsibilities
4//!
5//! 1. Spawn an mlua-isle `AsyncIsle` (dedicated Lua VM thread with coroutine support)
6//! 2. Optionally connect to agent-mesh relay
7//! 3. Initialize the MCP manager for stdio-based MCP server connections
8//! 4. Inject all Lua stdlib bridges (`mesh.*`, `http.*`, `sh.*`, `tool.*`, `log.*`, `mcp.*`)
9//! 5. Execute the user-provided Lua script via `coroutine_eval` (async-aware)
10//! 6. Graceful shutdown (Isle + MCP servers + mesh)
11
12use std::collections::HashMap;
13use std::path::{Path, PathBuf};
14use std::sync::{Arc, Mutex};
15use std::time::Duration;
16use tokio::sync::{mpsc, oneshot, RwLock};
17
18use mlua_isle::{AsyncIsle, AsyncIsleDriver};
19use tracing::{info, info_span, warn};
20
21use crate::bridge;
22use crate::bus::{Event, EventBus, Handler};
23use agent_block_mcp::McpManager;
24use agent_block_types::error::{BlockError, BlockResult};
25use tokio_util::sync::CancellationToken;
26
27/// Embedded Lua sources for blocks/ StdPkg modules.
28/// These are baked into the binary at compile time so `cargo install` works
29/// without any extra file distribution.
30const EMBEDDED_BLOCKS: &[(&str, &str)] = &[
31 ("agent", include_str!("../blocks/agent/init.lua")),
32 ("session", include_str!("../blocks/session/init.lua")),
33 (
34 "compile_loop",
35 include_str!("../blocks/compile_loop/init.lua"),
36 ),
37];
38
39/// Embedded default agent invoker used by [`ScriptSource::DefaultAgent`].
40///
41/// Runs the StdPkg `agent` module with `_PROMPT` / `_CONTEXT` injected and
42/// emits the result on the EventBus under kind `"agent_result"` so SDK
43/// consumers can wire a host handler without bundling any Lua file.
44const DEFAULT_AGENT_INVOKER: &str = r#"
45local agent = require("agent")
46local r = agent.run({
47 prompt = _PROMPT,
48 system = _CONTEXT,
49})
50bus.emit("agent_result", r)
51"#;
52
53/// How the Lua script source for `run()` is supplied.
54///
55/// `Path` matches the CLI form (`agent-block -s <path>`), reading from
56/// the filesystem at start. `Inline` lets SDK consumers pass a script
57/// they hold in memory (compile-time `include_str!`, dynamically built
58/// string, etc.) without writing it to a tempfile. `DefaultAgent` uses
59/// an embedded invoker that runs the StdPkg `agent` module with the
60/// caller-supplied prompt/context and emits the result via
61/// `bus.emit("agent_result", ...)`.
62#[derive(Debug, Clone)]
63pub enum ScriptSource {
64 /// Read the script from a filesystem path at start.
65 Path(PathBuf),
66 /// Use the supplied source code directly.
67 Inline {
68 /// Lua source code.
69 source: String,
70 /// Display name used in tracing, error messages, and the Lua
71 /// `_SCRIPT_NAME` global (e.g. `"agent_invoker.lua"`).
72 name: String,
73 },
74 /// Use the embedded default agent invoker. `prompt` / `context`
75 /// are forwarded as `_PROMPT` / `_CONTEXT` Lua globals and the
76 /// result is emitted on the EventBus under kind `"agent_result"`.
77 /// SDK consumers typically pair this with `host_handlers` keyed on
78 /// `"agent_result"` and `auto_serve_bus = true`.
79 DefaultAgent,
80}
81
82/// How a string payload (prompt / system context) is supplied.
83///
84/// `Inline` is the literal string variant (CLI `--prompt` / `--context`).
85/// `File` reads the contents from disk at `run()` start (CLI
86/// `--prompt-file` / `--context-file`).
87#[derive(Debug, Clone)]
88pub enum PromptSource {
89 /// Literal string.
90 Inline(String),
91 /// Filesystem path; contents are read at `run()` start.
92 File(PathBuf),
93}
94
95/// How the Ed25519 mesh identity secret key is supplied.
96///
97/// `Inline` is a 64-hex literal. `Env` reads the named environment
98/// variable at `run()` start (CLI default uses
99/// `AGENT_BLOCK_MESH_SECRET_KEY`). Absence of any `SecretKeySource`
100/// (i.e. `BlockConfig.secret_key = None`) causes a random keypair to
101/// be generated, matching the prior behavior.
102#[derive(Debug, Clone)]
103pub enum SecretKeySource {
104 /// 64-character hex literal.
105 Inline(String),
106 /// Environment variable name to read at start.
107 Env(String),
108}
109
110/// Build the `blocks/` portion of `package.path` from filesystem locations.
111///
112/// Priority (highest first):
113/// 1. `project_root/blocks/` — user-customisable, overrides embedded StdPkg
114/// 2. `exe_dir/blocks/` — development hot-reload (next to the binary)
115///
116/// Returns a semicolon-terminated string ready to prepend to `package.path`,
117/// or an empty string when no `blocks/` directories are found.
118fn build_blocks_path(project_root: &Path) -> String {
119 let mut out = String::new();
120
121 // 1. project_root/blocks/
122 let project_blocks = project_root.join("blocks");
123 if project_blocks.is_dir() {
124 let pb = project_blocks.to_string_lossy();
125 out.push_str(&format!("{pb}/?.lua;{pb}/?/init.lua;"));
126 }
127
128 // 2. exe_dir/blocks/
129 match std::env::current_exe() {
130 Ok(exe) => {
131 if let Some(exe_dir) = exe.parent() {
132 let exe_blocks = exe_dir.join("blocks");
133 if exe_blocks.is_dir() {
134 let eb = exe_blocks.to_string_lossy();
135 out.push_str(&format!("{eb}/?.lua;{eb}/?/init.lua;"));
136 }
137 }
138 }
139 Err(e) => {
140 warn!(error = %e, "current_exe() failed; skipping exe_dir/blocks/ from package.path");
141 }
142 }
143
144 out
145}
146
147pub struct BlockConfig {
148 /// Lua script to execute. See [`ScriptSource`] for the supported
149 /// shapes (filesystem path / inline source / embedded default
150 /// agent invoker).
151 pub script: ScriptSource,
152 pub project_root: PathBuf,
153 pub relay_url: Option<String>,
154 /// Ed25519 secret key for mesh identity. See [`SecretKeySource`]
155 /// for the supported shapes (inline 64-hex / environment variable).
156 /// `None` generates a random keypair. Required to talk to
157 /// registry/ACL-gated hosted meshes.
158 pub secret_key: Option<SecretKeySource>,
159 /// Per-RPC timeout for every MCP round-trip (connect / list / call).
160 /// Defaults to [`agent_block_mcp::DEFAULT_RPC_TIMEOUT`].
161 pub mcp_rpc_timeout: Duration,
162 /// Prompt payload injected as `_PROMPT` Lua global. See
163 /// [`PromptSource`] for the supported shapes. `None` leaves the
164 /// global unset.
165 pub prompt: Option<PromptSource>,
166 /// Context payload injected as `_CONTEXT` Lua global (typically
167 /// the system prompt). Same shape rules as [`Self::prompt`].
168 pub context: Option<PromptSource>,
169 /// Host-side Rust handlers pre-installed on the EventBus before the user
170 /// script starts. Each entry registers `handler` against `kind` via
171 /// [`EventBus::on`], so a script-side `bus.emit(kind, payload)` is
172 /// captured by the Rust handler rather than dispatched to a Lua function.
173 ///
174 /// Intended for SDK consumers that embed `agent-block-core` and need to
175 /// receive script output programmatically (e.g. a Spawner adapter that
176 /// turns LLM script output into a typed `WorkerResult`). Lua-side
177 /// `bus.on(kind, fn)` registrations layered on top of the handler Isle
178 /// are still possible, but the EventBus dispatches a single handler per
179 /// `kind` (last-write-wins), so host-side and Lua-side registrations on
180 /// the same `kind` collide; choose one side per routing key.
181 ///
182 /// Defaults to an empty map (no host handlers).
183 pub host_handlers: HashMap<String, Arc<dyn Handler>>,
184 /// When `true`, the EventBus dispatcher loop is driven in the background
185 /// for the duration of the script and shut down gracefully after the
186 /// script completes. Required for SDK-embed callers that supply
187 /// [`Self::host_handlers`] and need `bus.emit(kind, payload)` events
188 /// emitted from the script to actually reach those handlers without
189 /// requiring the script to call `bus.serve()` (which blocks on
190 /// SIGTERM / Ctrl+C and never returns under programmatic embedding).
191 ///
192 /// After the script finishes, the dispatcher is given a grace window
193 /// (`AGENT_BLOCK_TASK_GRACE_MS`, default 1000ms) to drain queued events
194 /// and finish any in-flight handler, then is cancelled.
195 ///
196 /// Mutually exclusive with Lua-side `bus.serve()`: enabling this flag
197 /// takes ownership of the EventBus before the script runs, so a script
198 /// that calls `bus.on(...)` followed by `bus.serve()` will error
199 /// ("bus.serve() has already taken ownership"). Use this flag when the
200 /// script's sole purpose is to push events to host handlers.
201 ///
202 /// Defaults to `false` (legacy behavior: dispatcher only runs when the
203 /// script calls `bus.serve()`).
204 pub auto_serve_bus: bool,
205 /// Optional caller-supplied cancellation token. When cancelled, the
206 /// in-flight script is interrupted via the Isle's debug-hook cancel
207 /// path, the auto-serve dispatcher (if any) is shut down, and `run()`
208 /// returns `Err(BlockError::Cancelled)`.
209 ///
210 /// Intended for SDK consumers that spawn `run()` as a tokio task and
211 /// need an out-of-band abort signal (timeouts, parent-task cancellation
212 /// propagation, user-driven stop). The token is observed across the
213 /// `coroutine_eval` await; once cancellation propagates, the shutdown
214 /// sequence (MCP disconnect, Isle drivers, auto-serve dispatcher)
215 /// still runs so file descriptors and remote handles are released.
216 ///
217 /// Defaults to `None` (legacy behavior: `run()` only completes when
218 /// the script returns naturally).
219 pub shutdown_token: Option<CancellationToken>,
220}
221
222/// Shared context passed into Lua bridge functions.
223#[derive(Clone)]
224pub struct HostContext {
225 pub project_root: PathBuf,
226 pub mesh_agent: Option<Arc<agent_mesh_sdk::MeshAgent>>,
227 pub mcp_manager: Arc<RwLock<McpManager>>,
228 /// Shared async HTTP client for `http.*` bridge.
229 pub http_client: reqwest::Client,
230 /// Shared SQLite connection for `sql.*` bridge (user tables).
231 pub sql_conn: Arc<Mutex<rusqlite::Connection>>,
232 /// Interrupt handle for the sql connection.
233 /// Used to cancel in-flight queries on timeout (see `bridge/sql.rs`).
234 pub sql_interrupt: Arc<rusqlite::InterruptHandle>,
235 /// Shared SQLite connection for `kv.*` bridge (`__kv` table only).
236 /// Separate from sql_conn so KV scratch state and user SQL data don't
237 /// share WAL, page cache, or backup lifecycle.
238 pub kv_conn: Arc<Mutex<rusqlite::Connection>>,
239 /// Interrupt handle for the kv connection.
240 pub kv_interrupt: Arc<rusqlite::InterruptHandle>,
241 /// Shared SQLite connection for `ts.*` bridge (TSDB — time-series table).
242 /// Separate DB file so TSDB WAL does not share page cache with kv/sql.
243 pub ts_conn: Arc<Mutex<rusqlite::Connection>>,
244 /// Interrupt handle for the ts connection.
245 /// Used by `bridge::ts` to cancel in-flight queries on timeout (Subtask 2).
246 #[allow(dead_code)]
247 pub ts_interrupt: Arc<rusqlite::InterruptHandle>,
248 /// Async handle to the main Isle Lua VM that runs the user script via
249 /// `coroutine_eval`. After Subtask 2, `bridge::bus` no longer dispatches
250 /// handlers against this Isle; handlers live on `handler_isle` instead.
251 /// The field is retained because bridge code still keyed to the main
252 /// Isle (future `coroutine_call` back-edges, introspection APIs) may
253 /// need it, and removing it would force another HostContext reshape.
254 #[allow(dead_code)]
255 pub isle: Arc<AsyncIsle>,
256 /// Dedicated Isle for EventBus handler execution. Lua handlers
257 /// registered via `bus.on` / `bus.on_any` run here so that CPU-bound
258 /// handler code does not occupy the main Isle's LocalSet and block
259 /// grace timers / shutdown wakers on the main VM side.
260 ///
261 /// Used by `bridge::bus` to forward handler bytecode
262 /// (`Function::dump(true)` → `handler_isle.exec(...)`) and by
263 /// [`LuaHandler::call`](crate::bridge::bus) to dispatch via
264 /// `coroutine_call("__bus_dispatch", ...)`.
265 pub handler_isle: Arc<AsyncIsle>,
266 /// Ingress sender for the EventBus. Adapters (mesh / webhook / …)
267 /// clone this and push `Event`s. The ST3 mesh adapter captures its own
268 /// clone at `MeshAgent::connect` time, so the field itself is not read
269 /// elsewhere in the ST3 cut — kept `pub` for ST4+ adapter wiring.
270 #[allow(dead_code)]
271 pub bus_tx: mpsc::Sender<Event>,
272 /// Mutex-wrapped `Option<EventBus>` so `bus.on` / `bus.on_any` can lock
273 /// briefly from sync Lua context, and `bus.serve` can `Option::take`
274 /// ownership before entering the long-lived `run()` await (avoiding the
275 /// await-holding-lock anti-pattern on a `std::sync::Mutex`).
276 pub event_bus: Arc<Mutex<Option<EventBus>>>,
277}
278
279/// Open a SQLite connection at `path` (or `:memory:`) and apply the shared
280/// pragmas driven by ENV (`journal_mode`, `busy_timeout`). Returns the
281/// connection wrapped in Arc<Mutex<_>> together with its interrupt handle.
282///
283/// `label` is used only for the init log line (`sql` / `kv`) so that the two
284/// databases are distinguishable in tracing output.
285fn open_sqlite(
286 path: &Path,
287 label: &'static str,
288) -> BlockResult<(
289 Arc<Mutex<rusqlite::Connection>>,
290 Arc<rusqlite::InterruptHandle>,
291)> {
292 let is_memory = crate::bridge::config::is_memory_sql(path);
293 if !is_memory {
294 if let Some(parent) = path.parent() {
295 std::fs::create_dir_all(parent)
296 .map_err(|e| BlockError::Runtime(format!("{label} dir create: {e}")))?;
297 }
298 }
299 let conn = rusqlite::Connection::open(path)
300 .map_err(|e| BlockError::Runtime(format!("sqlite open {}: {e}", path.display())))?;
301 if !is_memory {
302 let journal = crate::bridge::config::sql_journal_mode();
303 conn.pragma_update(None, "journal_mode", &journal)
304 .map_err(|e| BlockError::Runtime(format!("journal_mode={journal}: {e}")))?;
305 }
306 let busy_ms = crate::bridge::config::sql_busy_timeout().as_millis() as i64;
307 conn.pragma_update(None, "busy_timeout", busy_ms)
308 .map_err(|e| BlockError::Runtime(format!("busy_timeout pragma: {e}")))?;
309 info!(label, path = %path.display(), busy_ms, "sqlite initialized");
310 let interrupt = Arc::new(conn.get_interrupt_handle());
311 let conn = Arc::new(Mutex::new(conn));
312 Ok((conn, interrupt))
313}
314
315/// Build the init closure shared between the main Isle and the handler
316/// Isle. Sets `_SCRIPT_NAME`, registers `mlua-batteries` `std.*`, and
317/// configures `package.path` / `package.searchers` so `require "agent"`
318/// (and any `blocks/` module) works inside the Lua VM.
319///
320/// Returns an `FnOnce` so each call produces a fresh closure; this lets
321/// both Isles be spawned from the same config without `Clone` bounds on
322/// the captured `HashMap`.
323fn build_isle_init(
324 script_name: String,
325 script_dir: String,
326 blocks_paths: String,
327 prompt: Option<String>,
328 context: Option<String>,
329) -> impl FnOnce(&mlua::Lua) -> mlua::Result<()> + Send + 'static {
330 move |lua| {
331 // Set script name before registering bridges (used by log.* for attribution)
332 lua.globals().set("_SCRIPT_NAME", script_name.as_str())?;
333 if let Some(ref p) = prompt {
334 lua.globals().set("_PROMPT", p.as_str())?;
335 }
336 if let Some(ref c) = context {
337 lua.globals().set("_CONTEXT", c.as_str())?;
338 }
339
340 mlua_batteries::register_all(lua, "std")?;
341
342 // ── package.path ──────────────────────────────────────────────
343 // Priority: script_dir > project_root/blocks/ > exe_dir/blocks/ > default
344 let package: mlua::Table = lua.globals().get("package")?;
345 let current_path: String = package.get("path")?;
346 let new_path =
347 format!("{script_dir}/?.lua;{script_dir}/?/init.lua;{blocks_paths}{current_path}");
348 package.set("path", new_path)?;
349
350 // ── package.searchers — embedded fallback ─────────────────────
351 // Register a custom searcher that loads blocks/ modules from the
352 // sources baked in at compile time. This is the lowest-priority
353 // searcher so filesystem copies always win.
354 let embedded: HashMap<&'static str, &'static str> =
355 EMBEDDED_BLOCKS.iter().copied().collect();
356
357 let searchers: mlua::Table = package.get("searchers")?;
358 let loader =
359 lua.create_function(move |lua, name: String| match embedded.get(name.as_str()) {
360 Some(source) => {
361 let chunk = lua
362 .load(*source)
363 .set_name(format!("@embedded:blocks/{name}/init.lua"));
364 let func = chunk.into_function()?;
365 Ok(mlua::Value::Function(func))
366 }
367 None => {
368 let msg = lua.create_string(format!("\n\tno embedded block '{name}'"))?;
369 Ok(mlua::Value::String(msg))
370 }
371 })?;
372 // Append as the last searcher so filesystem paths remain preferred.
373 let next_idx = searchers.raw_len() + 1;
374 searchers.raw_set(next_idx, loader)?;
375
376 Ok(())
377 }
378}
379
380/// Spawn the dedicated handler Isle.
381///
382/// The handler Isle runs Lua bus handlers (`bus.on` / `bus.on_any`) on a
383/// separate OS thread with its own `tokio` current-thread runtime, keeping
384/// CPU-bound handlers from starving the main Isle's grace timers.
385///
386/// Bridge registration is deferred to a follow-up `exec` in `run()` because
387/// `HostContext` is not constructible until both Isles exist (the struct
388/// itself holds `Arc<AsyncIsle>` for both).
389async fn spawn_handler_isle(
390 script_name: String,
391 script_dir: String,
392 blocks_paths: String,
393 prompt: Option<String>,
394 context: Option<String>,
395) -> BlockResult<(Arc<AsyncIsle>, AsyncIsleDriver)> {
396 let init = build_isle_init(script_name, script_dir, blocks_paths, prompt, context);
397 let (isle, driver) = AsyncIsle::builder()
398 .thread_name("agent-block-handler-isle")
399 .spawn(init)
400 .await
401 .map_err(|e| BlockError::Runtime(format!("handler isle spawn failed: {e}")))?;
402 info!(
403 thread_name = "agent-block-handler-isle",
404 "handler Isle spawned"
405 );
406 Ok((Arc::new(isle), driver))
407}
408
409fn hex_decode_32(s: &str) -> Result<[u8; 32], String> {
410 let s = s.trim();
411 if s.len() != 64 {
412 return Err(format!("expected 64 hex chars, got {}", s.len()));
413 }
414 let mut out = [0u8; 32];
415 for (i, byte) in out.iter_mut().enumerate() {
416 let hi = u8::from_str_radix(&s[2 * i..2 * i + 1], 16)
417 .map_err(|e| format!("invalid hex at position {}: {e}", 2 * i))?;
418 let lo = u8::from_str_radix(&s[2 * i + 1..2 * i + 2], 16)
419 .map_err(|e| format!("invalid hex at position {}: {e}", 2 * i + 1))?;
420 *byte = (hi << 4) | lo;
421 }
422 Ok(out)
423}
424
425pub async fn run(config: BlockConfig) -> BlockResult<()> {
426 // ── Resolve sources ───────────────────────────────────────────
427 // Convert the `Source` enums on `BlockConfig` to their concrete
428 // payloads before any Isle setup. `File`/`Path`/`Env` variants
429 // read from disk / environment exactly once, here at the start.
430 let (script_source, script_name, script_dir_pathbuf) = match &config.script {
431 ScriptSource::Path(p) => {
432 let source = std::fs::read_to_string(p)
433 .map_err(|e| BlockError::Script(format!("{}: {e}", p.display())))?;
434 let name = p
435 .file_name()
436 .map(|n| n.to_string_lossy().to_string())
437 .unwrap_or_else(|| "unknown".to_string());
438 let dir = p
439 .parent()
440 .map(|d| d.to_path_buf())
441 .unwrap_or_else(|| PathBuf::from("."));
442 (source, name, dir)
443 }
444 ScriptSource::Inline { source, name } => {
445 (source.clone(), name.clone(), config.project_root.clone())
446 }
447 ScriptSource::DefaultAgent => (
448 DEFAULT_AGENT_INVOKER.to_string(),
449 "default_agent_invoker.lua".to_string(),
450 config.project_root.clone(),
451 ),
452 };
453
454 let prompt_resolved: Option<String> = match &config.prompt {
455 Some(PromptSource::Inline(s)) => Some(s.clone()),
456 Some(PromptSource::File(p)) => Some(
457 std::fs::read_to_string(p)
458 .map_err(|e| BlockError::Script(format!("prompt file {}: {e}", p.display())))?,
459 ),
460 None => None,
461 };
462 let context_resolved: Option<String> = match &config.context {
463 Some(PromptSource::Inline(s)) => Some(s.clone()),
464 Some(PromptSource::File(p)) => Some(
465 std::fs::read_to_string(p)
466 .map_err(|e| BlockError::Script(format!("context file {}: {e}", p.display())))?,
467 ),
468 None => None,
469 };
470 let secret_key_resolved: Option<String> = match &config.secret_key {
471 Some(SecretKeySource::Inline(s)) => Some(s.clone()),
472 Some(SecretKeySource::Env(var)) => std::env::var(var).ok(),
473 None => None,
474 };
475
476 // NOTE: We previously held entered span guards across awaits for nested
477 // span context. That made the `run()` future `!Send`, which prevents
478 // SDK consumers from `tokio::spawn(run(config))`. Span context is
479 // attached to events via fields on the `info_span!` calls below; the
480 // missing nesting is an acceptable trade-off for `Send` correctness.
481 let _root_span = info_span!("agent_block", script = %script_name);
482
483 // ── .env ──────────────────────────────────────────────────────
484 // Load .env from project_root if present. Variables are merged into
485 // the process environment so Lua's `std.env.get()` picks them up.
486 let env_path = config.project_root.join(".env");
487 match dotenvy::from_path(&env_path) {
488 Ok(()) => info!(path = %env_path.display(), ".env loaded"),
489 Err(dotenvy::Error::Io(_)) => {} // file not found — fine
490 Err(e) => tracing::warn!(path = %env_path.display(), error = %e, ".env parse error"),
491 }
492
493 // ── Init ──────────────────────────────────────────────────────
494 let _init_span = info_span!("init");
495
496 // ── EventBus channel ─────────────────────────────────────────────
497 // Construct the bounded mpsc BEFORE MeshAgent::connect so the relay
498 // handler can hold a `bus_tx` clone and forward incoming requests
499 // into the dispatcher. Capacity is ENV-driven (see bridge::config).
500 let bus_capacity = crate::bridge::config::bus_capacity();
501 let (bus_tx, bus_rx) = mpsc::channel::<Event>(bus_capacity);
502 let event_bus = Arc::new(Mutex::new(Some(EventBus::new(bus_rx))));
503
504 // ── Pre-install host-side Rust handlers ───────────────────────────
505 // SDK consumers attach Rust handlers via `BlockConfig.host_handlers`
506 // so that script-side `bus.emit(kind, payload)` is captured by a Rust
507 // `Arc<dyn Handler>` instead of being dispatched to a Lua function.
508 // Registered here (before any Lua bridge registers handlers and before
509 // `bus.serve` takes ownership of the bus) so the EventBus already
510 // carries the host handlers when the script starts.
511 if !config.host_handlers.is_empty() {
512 let mut guard = event_bus
513 .lock()
514 .map_err(|_| BlockError::Bus("event_bus mutex poisoned".into()))?;
515 let bus = guard
516 .as_mut()
517 .ok_or_else(|| BlockError::Bus("event_bus already taken".into()))?;
518 for (kind, handler) in &config.host_handlers {
519 bus.on(kind.clone(), Arc::clone(handler))
520 .map_err(|e| BlockError::Bus(format!("host_handlers on({kind}): {e}")))?;
521 }
522 info!(
523 count = config.host_handlers.len(),
524 "host handlers pre-installed"
525 );
526 }
527
528 // ── auto-serve: background dispatcher for SDK-embed callers ───────
529 // When `auto_serve_bus` is on and host handlers are installed, take the
530 // EventBus out of the Mutex *before* the script runs and spawn the
531 // dispatcher loop on the runtime. This lets `bus.emit(kind, payload)`
532 // from the script reach the host handler without requiring the script
533 // to call `bus.serve()` (which blocks on signals and never returns
534 // under programmatic embedding).
535 let auto_serve = config.auto_serve_bus && !config.host_handlers.is_empty();
536 let auto_serve_state: Option<(tokio::task::JoinHandle<()>, CancellationToken)> = if auto_serve {
537 let bus = {
538 let mut guard = event_bus
539 .lock()
540 .map_err(|_| BlockError::Bus("event_bus mutex poisoned".into()))?;
541 guard
542 .take()
543 .ok_or_else(|| BlockError::Bus("event_bus already taken".into()))?
544 };
545 let token = CancellationToken::new();
546 let token_for_task = token.clone();
547 let handle = tokio::spawn(async move {
548 let mut bus = bus;
549 if let Err(e) = bus.run(token_for_task).await {
550 tracing::error!(error = %e, "auto-serve: dispatcher loop returned error");
551 }
552 });
553 info!("auto-serve: dispatcher spawned");
554 Some((handle, token))
555 } else {
556 None
557 };
558
559 let mesh_agent = if let Some(ref relay_url) = config.relay_url {
560 let keypair = match &secret_key_resolved {
561 Some(hex_str) => {
562 let bytes = hex_decode_32(hex_str)
563 .map_err(|e| BlockError::Runtime(format!("secret-key: {e}")))?;
564 agent_mesh_core::identity::AgentKeypair::from_bytes(&bytes)
565 }
566 None => agent_mesh_core::identity::AgentKeypair::generate(),
567 };
568 info!(agent_id = %keypair.agent_id(), "mesh identity");
569 let acl = agent_mesh_core::acl::AclPolicy {
570 default_deny: false,
571 rules: vec![],
572 };
573 let handler: Arc<dyn agent_mesh_sdk::RequestHandler> =
574 Arc::new(BusRelayHandler::new(bus_tx.clone()));
575 let url = relay_url.clone();
576 let agent = agent_mesh_sdk::MeshAgent::connect(keypair, &url, acl, handler)
577 .await
578 .map_err(|e| BlockError::Mesh(format!("connect to {relay_url} failed: {e}")))?;
579 info!(relay_url = %relay_url, "mesh connected");
580 Some(Arc::new(agent))
581 } else {
582 None
583 };
584
585 let mcp_manager = Arc::new(RwLock::new(McpManager::with_rpc_timeout(
586 config.mcp_rpc_timeout,
587 )?));
588
589 // Resolve project_root to absolute path.
590 // canonicalize() can fail if the path doesn't exist; fall back to
591 // joining with current_dir to guarantee an absolute path.
592 let project_root = config
593 .project_root
594 .canonicalize()
595 .or_else(|_| std::env::current_dir().map(|cwd| cwd.join(&config.project_root)))?;
596
597 let http_client = reqwest::Client::new();
598
599 // ── SQLite init (kv + sql get separate DB files) ──────────────────────
600 // All knobs are ENV-driven (see `bridge/config.rs`).
601 let sql_path = crate::bridge::config::sql_path().map_err(BlockError::Runtime)?;
602 let (sql_conn, sql_interrupt) = open_sqlite(&sql_path, "sql")?;
603
604 let kv_path = crate::bridge::config::kv_path().map_err(BlockError::Runtime)?;
605 let (kv_conn, kv_interrupt) = open_sqlite(&kv_path, "kv")?;
606
607 let ts_path = crate::bridge::config::ts_path().map_err(BlockError::Runtime)?;
608 let (ts_conn, ts_interrupt) = open_sqlite(&ts_path, "ts")?;
609
610 // Use the script dir derived from the resolved `ScriptSource` for
611 // `package.path` lookups. For inline / default-agent variants the dir
612 // falls back to `project_root` (set during source resolution above).
613 let script_dir = script_dir_pathbuf.to_string_lossy().to_string();
614
615 // Precompute values captured by the init closure so we don't need to
616 // move the full `HostContext` into it (HostContext now holds
617 // `Arc<AsyncIsle>`, which is available only after `AsyncIsle::spawn`
618 // returns — classic chicken-and-egg). All bridge registrations run in a
619 // second pass via `isle.exec` below.
620 let blocks_paths = build_blocks_path(&project_root);
621 let prompt = prompt_resolved.clone();
622 let context = context_resolved.clone();
623
624 // ── main Isle ─────────────────────────────────────────────────
625 let (isle, driver) = AsyncIsle::spawn(build_isle_init(
626 script_name.clone(),
627 script_dir.clone(),
628 blocks_paths.clone(),
629 prompt.clone(),
630 context.clone(),
631 ))
632 .await
633 .map_err(|e| BlockError::Runtime(format!("AsyncIsle spawn failed: {e}")))?;
634 let isle = Arc::new(isle);
635
636 // ── handler Isle (sequential, dependencies are trivial) ────────
637 let (handler_isle, handler_driver) = spawn_handler_isle(
638 script_name.clone(),
639 script_dir.clone(),
640 blocks_paths.clone(),
641 prompt,
642 context,
643 )
644 .await?;
645
646 // Wire both Isles into McpManager so Lua notification callbacks can be
647 // dispatched from the rmcp task thread.
648 // - handler_isle: sampling/createMessage dispatch (exec on handler Isle)
649 // - main_isle: progress/log notification dispatch (exec on main Isle so
650 // user callback upvalues are preserved — no bytecode dump/reload needed)
651 {
652 let mut mgr = mcp_manager.write().await;
653 mgr.set_handler_isle(Arc::clone(&handler_isle));
654 mgr.set_main_isle(Arc::clone(&isle));
655 }
656
657 // ── HostContext + bridge registration ──────────────────────────────
658 // Wrap the isle in an Arc so `HostContext` can hand it to
659 // `bridge::bus` (which uses `AsyncIsle::coroutine_call` to invoke Lua
660 // handlers from the EventBus dispatcher task).
661 let ctx = HostContext {
662 project_root,
663 mesh_agent,
664 mcp_manager: Arc::clone(&mcp_manager),
665 http_client,
666 sql_conn,
667 sql_interrupt,
668 kv_conn,
669 kv_interrupt,
670 ts_conn,
671 ts_interrupt,
672 isle: Arc::clone(&isle),
673 handler_isle: Arc::clone(&handler_isle),
674 bus_tx: bus_tx.clone(),
675 event_bus: Arc::clone(&event_bus),
676 };
677
678 {
679 let ctx = ctx.clone();
680 isle.exec(move |lua| {
681 bridge::register_all(lua, &ctx)
682 .map_err(|e| mlua_isle::IsleError::Lua(format!("bridge register failed: {e}")))?;
683 Ok(String::new())
684 })
685 .await
686 .map_err(|e| BlockError::Runtime(format!("bridge register: {e}")))?;
687 }
688
689 {
690 let ctx = ctx.clone();
691 handler_isle
692 .exec(move |lua| {
693 bridge::register_all_handler_side(lua, &ctx).map_err(|e| {
694 mlua_isle::IsleError::Lua(format!("handler bridge register failed: {e}"))
695 })?;
696 Ok(String::new())
697 })
698 .await
699 .map_err(|e| BlockError::Runtime(format!("handler bridge register: {e}")))?;
700 }
701
702 drop(_init_span);
703
704 // ── Execute ───────────────────────────────────────────────────
705 // When `shutdown_token` is supplied, race the script future against
706 // the caller's cancellation signal. On cancel, propagate to the Isle
707 // via the AsyncTask's cancel token so the debug hook unwinds the Lua
708 // VM, then continue into the shutdown sequence below (we still want
709 // to release MCP/mesh handles and join the auto-serve dispatcher
710 // before returning).
711 let script_result: Result<(), BlockError> = {
712 let _exec_span = info_span!("execute", script = %script_name);
713
714 let mut task = isle.spawn_coroutine_eval(&script_source);
715 let task_cancel = task.cancel_token().clone();
716 match config.shutdown_token.as_ref() {
717 Some(token) => {
718 tokio::select! {
719 biased;
720 _ = token.cancelled() => {
721 task_cancel.cancel();
722 // Wait for the Isle to unwind so the VM is in a
723 // consistent state before driver shutdown. The
724 // debug hook fires at the next HOOK_INTERVAL.
725 let _ = (&mut task).await;
726 info!("shutdown_token: cancelled by caller");
727 Err(BlockError::Cancelled)
728 }
729 res = &mut task => res.map(|_| ()).map_err(|e| BlockError::Script(format!("{e}"))),
730 }
731 }
732 None => (&mut task)
733 .await
734 .map(|_| ())
735 .map_err(|e| BlockError::Script(format!("{e}"))),
736 }
737 };
738
739 // ── auto-serve drain + cancel ─────────────────────────────────
740 // Let the dispatcher drain events queued by the script, then signal
741 // shutdown and bound the join. Mirrors `bus.serve`'s grace pattern.
742 if let Some((handle, token)) = auto_serve_state {
743 let grace_ms = crate::bridge::config::task_grace_ms();
744 let grace = Duration::from_millis(grace_ms);
745 tokio::time::sleep(grace).await;
746 token.cancel();
747 match tokio::time::timeout(grace, handle).await {
748 Ok(Ok(())) => info!("auto-serve: dispatcher shut down cleanly"),
749 Ok(Err(join_err)) => {
750 tracing::error!(error = %join_err, "auto-serve: dispatcher task join error");
751 }
752 Err(_) => {
753 tracing::warn!(
754 grace_ms,
755 "auto-serve: dispatcher join timed out after cancel; forcing exit"
756 );
757 }
758 }
759 }
760
761 // ── Shutdown ──────────────────────────────────────────────────
762 {
763 let _shutdown_span = info_span!("shutdown");
764
765 mcp_manager.write().await.disconnect_all().await?;
766
767 driver
768 .shutdown()
769 .await
770 .map_err(|e| BlockError::Runtime(format!("AsyncIsle shutdown failed: {e}")))?;
771
772 // Handler Isle shutdown is independent of main shutdown: a failure
773 // here (e.g. ThreadPanic on the handler thread) is logged but does
774 // not poison the main process exit. The main Isle has already
775 // been stopped cleanly above.
776 match handler_driver.shutdown().await {
777 Ok(()) => info!(
778 thread_name = "agent-block-handler-isle",
779 "handler Isle shut down"
780 ),
781 Err(e) => tracing::error!(
782 error = %e,
783 thread_name = "agent-block-handler-isle",
784 "handler Isle shutdown failed"
785 ),
786 }
787 }
788
789 script_result
790}
791
792/// mesh → bus source adapter.
793///
794/// Implements [`agent_mesh_sdk::RequestHandler`] by packaging every incoming
795/// mesh request into an [`Event`] with `kind = "mesh"`, pushing it onto the
796/// bounded `bus_tx` channel, and awaiting the Lua handler's ack over a
797/// oneshot channel carried inside the event.
798///
799/// Error paths (all `tracing::error!`-logged — silent-err-drop policy):
800///
801/// | Failure | Return value |
802/// |---------------------------|----------------------------------------|
803/// | `bus_tx.send` closed/full | `{"error": "bus channel closed"}` |
804/// | ack receiver dropped | `{"error": "ack dropped"}` |
805/// | Lua handler `BlockError` | `{"error": "<handler error>"}` |
806/// | Handler exceeded 30s | `{"error": "handler timeout"}` |
807///
808/// The 30s ack timeout mirrors the client-side timeout on `mesh.request`
809/// (see `src/bridge/mesh.rs`).
810struct BusRelayHandler {
811 tx: mpsc::Sender<Event>,
812}
813
814impl BusRelayHandler {
815 fn new(tx: mpsc::Sender<Event>) -> Self {
816 Self { tx }
817 }
818}
819
820/// Bound used for both the mesh-adapter ack wait and other source timeouts.
821const BUS_ACK_TIMEOUT: Duration = Duration::from_secs(30);
822
823#[async_trait::async_trait]
824impl agent_mesh_sdk::RequestHandler for BusRelayHandler {
825 async fn handle(
826 &self,
827 from: &agent_mesh_core::identity::AgentId,
828 payload: &serde_json::Value,
829 _cancel: agent_mesh_sdk::CancelToken,
830 ) -> serde_json::Value {
831 let id = uuid::Uuid::new_v4().to_string();
832 let meta = serde_json::json!({"from": from.to_string()});
833 let (ack_tx, ack_rx) = oneshot::channel();
834 let event = Event {
835 kind: "mesh".into(),
836 id: id.clone(),
837 payload: payload.clone(),
838 meta,
839 ack_tx: Some(ack_tx),
840 };
841
842 if let Err(e) = self.tx.send(event).await {
843 tracing::error!(error = %e, id = %id, "bus channel closed; rejecting mesh request");
844 return serde_json::json!({"error": "bus channel closed"});
845 }
846
847 match tokio::time::timeout(BUS_ACK_TIMEOUT, ack_rx).await {
848 Ok(Ok(Ok(v))) => v,
849 Ok(Ok(Err(e))) => {
850 tracing::error!(id = %id, error = %e, "mesh handler returned error");
851 serde_json::json!({"error": e.to_string()})
852 }
853 Ok(Err(e)) => {
854 tracing::error!(id = %id, error = %e, "mesh ack receiver dropped");
855 serde_json::json!({"error": "ack dropped"})
856 }
857 Err(_) => {
858 tracing::error!(id = %id, timeout_secs = BUS_ACK_TIMEOUT.as_secs(), "mesh handler timeout");
859 serde_json::json!({"error": "handler timeout"})
860 }
861 }
862 }
863}