Skip to main content

agent_block_core/
host.rs

1//! Host — the thin Rust shell that wires up Lua VM, Mesh, HTTP, and MCP.
2//!
3//! # Responsibilities
4//!
5//! 1. Spawn an mlua-isle `AsyncIsle` (dedicated Lua VM thread with coroutine support)
6//! 2. Optionally connect to agent-mesh relay
7//! 3. Initialize the MCP manager for stdio-based MCP server connections
8//! 4. Inject all Lua stdlib bridges (`mesh.*`, `http.*`, `sh.*`, `tool.*`, `log.*`, `mcp.*`)
9//! 5. Execute the user-provided Lua script via `coroutine_eval` (async-aware)
10//! 6. Graceful shutdown (Isle + MCP servers + mesh)
11
12use std::collections::HashMap;
13use std::path::{Path, PathBuf};
14use std::sync::{Arc, Mutex};
15use std::time::Duration;
16use tokio::sync::{mpsc, oneshot, RwLock};
17
18use mlua_isle::{AsyncIsle, AsyncIsleDriver};
19use tracing::{info, info_span, warn};
20
21use crate::bridge;
22use crate::bus::{Event, EventBus, Handler};
23use agent_block_mcp::McpManager;
24use agent_block_types::error::{BlockError, BlockResult};
25
26/// Embedded Lua sources for blocks/ StdPkg modules.
27/// These are baked into the binary at compile time so `cargo install` works
28/// without any extra file distribution.
29const EMBEDDED_BLOCKS: &[(&str, &str)] = &[
30    ("agent", include_str!("../blocks/agent/init.lua")),
31    ("session", include_str!("../blocks/session/init.lua")),
32];
33
34/// Build the `blocks/` portion of `package.path` from filesystem locations.
35///
36/// Priority (highest first):
37/// 1. `project_root/blocks/` — user-customisable, overrides embedded StdPkg
38/// 2. `exe_dir/blocks/`      — development hot-reload (next to the binary)
39///
40/// Returns a semicolon-terminated string ready to prepend to `package.path`,
41/// or an empty string when no `blocks/` directories are found.
42fn build_blocks_path(project_root: &Path) -> String {
43    let mut out = String::new();
44
45    // 1. project_root/blocks/
46    let project_blocks = project_root.join("blocks");
47    if project_blocks.is_dir() {
48        let pb = project_blocks.to_string_lossy();
49        out.push_str(&format!("{pb}/?.lua;{pb}/?/init.lua;"));
50    }
51
52    // 2. exe_dir/blocks/
53    match std::env::current_exe() {
54        Ok(exe) => {
55            if let Some(exe_dir) = exe.parent() {
56                let exe_blocks = exe_dir.join("blocks");
57                if exe_blocks.is_dir() {
58                    let eb = exe_blocks.to_string_lossy();
59                    out.push_str(&format!("{eb}/?.lua;{eb}/?/init.lua;"));
60                }
61            }
62        }
63        Err(e) => {
64            warn!(error = %e, "current_exe() failed; skipping exe_dir/blocks/ from package.path");
65        }
66    }
67
68    out
69}
70
71pub struct BlockConfig {
72    pub script_path: PathBuf,
73    pub project_root: PathBuf,
74    pub relay_url: Option<String>,
75    /// Ed25519 secret key (64 hex chars). If `None`, a random keypair is
76    /// generated. Required to talk to registry/ACL-gated hosted meshes.
77    pub secret_key: Option<String>,
78    /// Per-RPC timeout for every MCP round-trip (connect / list / call).
79    /// Defaults to [`agent_block_mcp::DEFAULT_RPC_TIMEOUT`].
80    pub mcp_rpc_timeout: Duration,
81    /// Prompt string injected as `_PROMPT` Lua global. `None` = global not set.
82    pub prompt: Option<String>,
83    /// Context string injected as `_CONTEXT` Lua global. `None` = global not set.
84    pub context: Option<String>,
85    /// Host-side Rust handlers pre-installed on the EventBus before the user
86    /// script starts. Each entry registers `handler` against `kind` via
87    /// [`EventBus::on`], so a script-side `bus.emit(kind, payload)` is
88    /// captured by the Rust handler rather than dispatched to a Lua function.
89    ///
90    /// Intended for SDK consumers that embed `agent-block-core` and need to
91    /// receive script output programmatically (e.g. a Spawner adapter that
92    /// turns LLM script output into a typed `WorkerResult`). Lua-side
93    /// `bus.on(kind, fn)` registrations layered on top of the handler Isle
94    /// are still possible, but the EventBus dispatches a single handler per
95    /// `kind` (last-write-wins), so host-side and Lua-side registrations on
96    /// the same `kind` collide; choose one side per routing key.
97    ///
98    /// Defaults to an empty map (no host handlers).
99    pub host_handlers: HashMap<String, Arc<dyn Handler>>,
100}
101
102/// Shared context passed into Lua bridge functions.
103#[derive(Clone)]
104pub struct HostContext {
105    pub project_root: PathBuf,
106    pub mesh_agent: Option<Arc<agent_mesh_sdk::MeshAgent>>,
107    pub mcp_manager: Arc<RwLock<McpManager>>,
108    /// Shared async HTTP client for `http.*` bridge.
109    pub http_client: reqwest::Client,
110    /// Shared SQLite connection for `sql.*` bridge (user tables).
111    pub sql_conn: Arc<Mutex<rusqlite::Connection>>,
112    /// Interrupt handle for the sql connection.
113    /// Used to cancel in-flight queries on timeout (see `bridge/sql.rs`).
114    pub sql_interrupt: Arc<rusqlite::InterruptHandle>,
115    /// Shared SQLite connection for `kv.*` bridge (`__kv` table only).
116    /// Separate from sql_conn so KV scratch state and user SQL data don't
117    /// share WAL, page cache, or backup lifecycle.
118    pub kv_conn: Arc<Mutex<rusqlite::Connection>>,
119    /// Interrupt handle for the kv connection.
120    pub kv_interrupt: Arc<rusqlite::InterruptHandle>,
121    /// Shared SQLite connection for `ts.*` bridge (TSDB — time-series table).
122    /// Separate DB file so TSDB WAL does not share page cache with kv/sql.
123    pub ts_conn: Arc<Mutex<rusqlite::Connection>>,
124    /// Interrupt handle for the ts connection.
125    /// Used by `bridge::ts` to cancel in-flight queries on timeout (Subtask 2).
126    #[allow(dead_code)]
127    pub ts_interrupt: Arc<rusqlite::InterruptHandle>,
128    /// Async handle to the main Isle Lua VM that runs the user script via
129    /// `coroutine_eval`. After Subtask 2, `bridge::bus` no longer dispatches
130    /// handlers against this Isle; handlers live on `handler_isle` instead.
131    /// The field is retained because bridge code still keyed to the main
132    /// Isle (future `coroutine_call` back-edges, introspection APIs) may
133    /// need it, and removing it would force another HostContext reshape.
134    #[allow(dead_code)]
135    pub isle: Arc<AsyncIsle>,
136    /// Dedicated Isle for EventBus handler execution. Lua handlers
137    /// registered via `bus.on` / `bus.on_any` run here so that CPU-bound
138    /// handler code does not occupy the main Isle's LocalSet and block
139    /// grace timers / shutdown wakers on the main VM side.
140    ///
141    /// Used by `bridge::bus` to forward handler bytecode
142    /// (`Function::dump(true)` → `handler_isle.exec(...)`) and by
143    /// [`LuaHandler::call`](crate::bridge::bus) to dispatch via
144    /// `coroutine_call("__bus_dispatch", ...)`.
145    pub handler_isle: Arc<AsyncIsle>,
146    /// Ingress sender for the EventBus. Adapters (mesh / webhook / …)
147    /// clone this and push `Event`s. The ST3 mesh adapter captures its own
148    /// clone at `MeshAgent::connect` time, so the field itself is not read
149    /// elsewhere in the ST3 cut — kept `pub` for ST4+ adapter wiring.
150    #[allow(dead_code)]
151    pub bus_tx: mpsc::Sender<Event>,
152    /// Mutex-wrapped `Option<EventBus>` so `bus.on` / `bus.on_any` can lock
153    /// briefly from sync Lua context, and `bus.serve` can `Option::take`
154    /// ownership before entering the long-lived `run()` await (avoiding the
155    /// await-holding-lock anti-pattern on a `std::sync::Mutex`).
156    pub event_bus: Arc<Mutex<Option<EventBus>>>,
157}
158
159/// Open a SQLite connection at `path` (or `:memory:`) and apply the shared
160/// pragmas driven by ENV (`journal_mode`, `busy_timeout`). Returns the
161/// connection wrapped in Arc<Mutex<_>> together with its interrupt handle.
162///
163/// `label` is used only for the init log line (`sql` / `kv`) so that the two
164/// databases are distinguishable in tracing output.
165fn open_sqlite(
166    path: &Path,
167    label: &'static str,
168) -> BlockResult<(
169    Arc<Mutex<rusqlite::Connection>>,
170    Arc<rusqlite::InterruptHandle>,
171)> {
172    let is_memory = crate::bridge::config::is_memory_sql(path);
173    if !is_memory {
174        if let Some(parent) = path.parent() {
175            std::fs::create_dir_all(parent)
176                .map_err(|e| BlockError::Runtime(format!("{label} dir create: {e}")))?;
177        }
178    }
179    let conn = rusqlite::Connection::open(path)
180        .map_err(|e| BlockError::Runtime(format!("sqlite open {}: {e}", path.display())))?;
181    if !is_memory {
182        let journal = crate::bridge::config::sql_journal_mode();
183        conn.pragma_update(None, "journal_mode", &journal)
184            .map_err(|e| BlockError::Runtime(format!("journal_mode={journal}: {e}")))?;
185    }
186    let busy_ms = crate::bridge::config::sql_busy_timeout().as_millis() as i64;
187    conn.pragma_update(None, "busy_timeout", busy_ms)
188        .map_err(|e| BlockError::Runtime(format!("busy_timeout pragma: {e}")))?;
189    info!(label, path = %path.display(), busy_ms, "sqlite initialized");
190    let interrupt = Arc::new(conn.get_interrupt_handle());
191    let conn = Arc::new(Mutex::new(conn));
192    Ok((conn, interrupt))
193}
194
195/// Build the init closure shared between the main Isle and the handler
196/// Isle.  Sets `_SCRIPT_NAME`, registers `mlua-batteries` `std.*`, and
197/// configures `package.path` / `package.searchers` so `require "agent"`
198/// (and any `blocks/` module) works inside the Lua VM.
199///
200/// Returns an `FnOnce` so each call produces a fresh closure; this lets
201/// both Isles be spawned from the same config without `Clone` bounds on
202/// the captured `HashMap`.
203fn build_isle_init(
204    script_name: String,
205    script_dir: String,
206    blocks_paths: String,
207    prompt: Option<String>,
208    context: Option<String>,
209) -> impl FnOnce(&mlua::Lua) -> mlua::Result<()> + Send + 'static {
210    move |lua| {
211        // Set script name before registering bridges (used by log.* for attribution)
212        lua.globals().set("_SCRIPT_NAME", script_name.as_str())?;
213        if let Some(ref p) = prompt {
214            lua.globals().set("_PROMPT", p.as_str())?;
215        }
216        if let Some(ref c) = context {
217            lua.globals().set("_CONTEXT", c.as_str())?;
218        }
219
220        mlua_batteries::register_all(lua, "std")?;
221
222        // ── package.path ──────────────────────────────────────────────
223        // Priority: script_dir > project_root/blocks/ > exe_dir/blocks/ > default
224        let package: mlua::Table = lua.globals().get("package")?;
225        let current_path: String = package.get("path")?;
226        let new_path =
227            format!("{script_dir}/?.lua;{script_dir}/?/init.lua;{blocks_paths}{current_path}");
228        package.set("path", new_path)?;
229
230        // ── package.searchers — embedded fallback ─────────────────────
231        // Register a custom searcher that loads blocks/ modules from the
232        // sources baked in at compile time.  This is the lowest-priority
233        // searcher so filesystem copies always win.
234        let embedded: HashMap<&'static str, &'static str> =
235            EMBEDDED_BLOCKS.iter().copied().collect();
236
237        let searchers: mlua::Table = package.get("searchers")?;
238        let loader =
239            lua.create_function(move |lua, name: String| match embedded.get(name.as_str()) {
240                Some(source) => {
241                    let chunk = lua
242                        .load(*source)
243                        .set_name(format!("@embedded:blocks/{name}/init.lua"));
244                    let func = chunk.into_function()?;
245                    Ok(mlua::Value::Function(func))
246                }
247                None => {
248                    let msg = lua.create_string(format!("\n\tno embedded block '{name}'"))?;
249                    Ok(mlua::Value::String(msg))
250                }
251            })?;
252        // Append as the last searcher so filesystem paths remain preferred.
253        let next_idx = searchers.raw_len() + 1;
254        searchers.raw_set(next_idx, loader)?;
255
256        Ok(())
257    }
258}
259
260/// Spawn the dedicated handler Isle.
261///
262/// The handler Isle runs Lua bus handlers (`bus.on` / `bus.on_any`) on a
263/// separate OS thread with its own `tokio` current-thread runtime, keeping
264/// CPU-bound handlers from starving the main Isle's grace timers.
265///
266/// Bridge registration is deferred to a follow-up `exec` in `run()` because
267/// `HostContext` is not constructible until both Isles exist (the struct
268/// itself holds `Arc<AsyncIsle>` for both).
269async fn spawn_handler_isle(
270    script_name: String,
271    script_dir: String,
272    blocks_paths: String,
273    prompt: Option<String>,
274    context: Option<String>,
275) -> BlockResult<(Arc<AsyncIsle>, AsyncIsleDriver)> {
276    let init = build_isle_init(script_name, script_dir, blocks_paths, prompt, context);
277    let (isle, driver) = AsyncIsle::builder()
278        .thread_name("agent-block-handler-isle")
279        .spawn(init)
280        .await
281        .map_err(|e| BlockError::Runtime(format!("handler isle spawn failed: {e}")))?;
282    info!(
283        thread_name = "agent-block-handler-isle",
284        "handler Isle spawned"
285    );
286    Ok((Arc::new(isle), driver))
287}
288
289fn hex_decode_32(s: &str) -> Result<[u8; 32], String> {
290    let s = s.trim();
291    if s.len() != 64 {
292        return Err(format!("expected 64 hex chars, got {}", s.len()));
293    }
294    let mut out = [0u8; 32];
295    for (i, byte) in out.iter_mut().enumerate() {
296        let hi = u8::from_str_radix(&s[2 * i..2 * i + 1], 16)
297            .map_err(|e| format!("invalid hex at position {}: {e}", 2 * i))?;
298        let lo = u8::from_str_radix(&s[2 * i + 1..2 * i + 2], 16)
299            .map_err(|e| format!("invalid hex at position {}: {e}", 2 * i + 1))?;
300        *byte = (hi << 4) | lo;
301    }
302    Ok(out)
303}
304
305pub async fn run(config: BlockConfig) -> BlockResult<()> {
306    let script_name = config
307        .script_path
308        .file_name()
309        .map(|n| n.to_string_lossy().to_string())
310        .unwrap_or_else(|| "unknown".to_string());
311
312    let root_span = info_span!("agent_block", script = %script_name);
313    let _root_guard = root_span.enter();
314
315    // ── .env ──────────────────────────────────────────────────────
316    // Load .env from project_root if present. Variables are merged into
317    // the process environment so Lua's `std.env.get()` picks them up.
318    let env_path = config.project_root.join(".env");
319    match dotenvy::from_path(&env_path) {
320        Ok(()) => info!(path = %env_path.display(), ".env loaded"),
321        Err(dotenvy::Error::Io(_)) => {} // file not found — fine
322        Err(e) => tracing::warn!(path = %env_path.display(), error = %e, ".env parse error"),
323    }
324
325    // ── Init ──────────────────────────────────────────────────────
326    let _init_guard = info_span!("init").entered();
327
328    // ── EventBus channel ─────────────────────────────────────────────
329    // Construct the bounded mpsc BEFORE MeshAgent::connect so the relay
330    // handler can hold a `bus_tx` clone and forward incoming requests
331    // into the dispatcher. Capacity is ENV-driven (see bridge::config).
332    let bus_capacity = crate::bridge::config::bus_capacity();
333    let (bus_tx, bus_rx) = mpsc::channel::<Event>(bus_capacity);
334    let event_bus = Arc::new(Mutex::new(Some(EventBus::new(bus_rx))));
335
336    // ── Pre-install host-side Rust handlers ───────────────────────────
337    // SDK consumers attach Rust handlers via `BlockConfig.host_handlers`
338    // so that script-side `bus.emit(kind, payload)` is captured by a Rust
339    // `Arc<dyn Handler>` instead of being dispatched to a Lua function.
340    // Registered here (before any Lua bridge registers handlers and before
341    // `bus.serve` takes ownership of the bus) so the EventBus already
342    // carries the host handlers when the script starts.
343    if !config.host_handlers.is_empty() {
344        let mut guard = event_bus
345            .lock()
346            .map_err(|_| BlockError::Bus("event_bus mutex poisoned".into()))?;
347        let bus = guard
348            .as_mut()
349            .ok_or_else(|| BlockError::Bus("event_bus already taken".into()))?;
350        for (kind, handler) in &config.host_handlers {
351            bus.on(kind.clone(), Arc::clone(handler))
352                .map_err(|e| BlockError::Bus(format!("host_handlers on({kind}): {e}")))?;
353        }
354        info!(count = config.host_handlers.len(), "host handlers pre-installed");
355    }
356
357    let mesh_agent = if let Some(ref relay_url) = config.relay_url {
358        let keypair = match &config.secret_key {
359            Some(hex_str) => {
360                let bytes = hex_decode_32(hex_str)
361                    .map_err(|e| BlockError::Runtime(format!("--secret-key: {e}")))?;
362                agent_mesh_core::identity::AgentKeypair::from_bytes(&bytes)
363            }
364            None => agent_mesh_core::identity::AgentKeypair::generate(),
365        };
366        info!(agent_id = %keypair.agent_id(), "mesh identity");
367        let acl = agent_mesh_core::acl::AclPolicy {
368            default_deny: false,
369            rules: vec![],
370        };
371        let handler: Arc<dyn agent_mesh_sdk::RequestHandler> =
372            Arc::new(BusRelayHandler::new(bus_tx.clone()));
373        let url = relay_url.clone();
374        let agent = agent_mesh_sdk::MeshAgent::connect(keypair, &url, acl, handler)
375            .await
376            .map_err(|e| BlockError::Mesh(format!("connect to {relay_url} failed: {e}")))?;
377        info!(relay_url = %relay_url, "mesh connected");
378        Some(Arc::new(agent))
379    } else {
380        None
381    };
382
383    let mcp_manager = Arc::new(RwLock::new(McpManager::with_rpc_timeout(
384        config.mcp_rpc_timeout,
385    )?));
386
387    // Resolve project_root to absolute path.
388    // canonicalize() can fail if the path doesn't exist; fall back to
389    // joining with current_dir to guarantee an absolute path.
390    let project_root = config
391        .project_root
392        .canonicalize()
393        .or_else(|_| std::env::current_dir().map(|cwd| cwd.join(&config.project_root)))?;
394
395    let http_client = reqwest::Client::new();
396
397    // ── SQLite init (kv + sql get separate DB files) ──────────────────────
398    // All knobs are ENV-driven (see `bridge/config.rs`).
399    let sql_path = crate::bridge::config::sql_path().map_err(BlockError::Runtime)?;
400    let (sql_conn, sql_interrupt) = open_sqlite(&sql_path, "sql")?;
401
402    let kv_path = crate::bridge::config::kv_path().map_err(BlockError::Runtime)?;
403    let (kv_conn, kv_interrupt) = open_sqlite(&kv_path, "kv")?;
404
405    let ts_path = crate::bridge::config::ts_path().map_err(BlockError::Runtime)?;
406    let (ts_conn, ts_interrupt) = open_sqlite(&ts_path, "ts")?;
407
408    let script_path = config.script_path.clone();
409    let script_dir = script_path
410        .parent()
411        .map(|p| p.to_string_lossy().to_string())
412        .unwrap_or_else(|| ".".to_string());
413
414    // Precompute values captured by the init closure so we don't need to
415    // move the full `HostContext` into it (HostContext now holds
416    // `Arc<AsyncIsle>`, which is available only after `AsyncIsle::spawn`
417    // returns — classic chicken-and-egg). All bridge registrations run in a
418    // second pass via `isle.exec` below.
419    let blocks_paths = build_blocks_path(&project_root);
420    let prompt = config.prompt.clone();
421    let context = config.context.clone();
422
423    // ── main Isle ─────────────────────────────────────────────────
424    let (isle, driver) = AsyncIsle::spawn(build_isle_init(
425        script_name.clone(),
426        script_dir.clone(),
427        blocks_paths.clone(),
428        prompt.clone(),
429        context.clone(),
430    ))
431    .await
432    .map_err(|e| BlockError::Runtime(format!("AsyncIsle spawn failed: {e}")))?;
433    let isle = Arc::new(isle);
434
435    // ── handler Isle (sequential, dependencies are trivial) ────────
436    let (handler_isle, handler_driver) = spawn_handler_isle(
437        script_name.clone(),
438        script_dir.clone(),
439        blocks_paths.clone(),
440        prompt,
441        context,
442    )
443    .await?;
444
445    // Wire both Isles into McpManager so Lua notification callbacks can be
446    // dispatched from the rmcp task thread.
447    // - handler_isle: sampling/createMessage dispatch (exec on handler Isle)
448    // - main_isle: progress/log notification dispatch (exec on main Isle so
449    //   user callback upvalues are preserved — no bytecode dump/reload needed)
450    {
451        let mut mgr = mcp_manager.write().await;
452        mgr.set_handler_isle(Arc::clone(&handler_isle));
453        mgr.set_main_isle(Arc::clone(&isle));
454    }
455
456    // ── HostContext + bridge registration ──────────────────────────────
457    // Wrap the isle in an Arc so `HostContext` can hand it to
458    // `bridge::bus` (which uses `AsyncIsle::coroutine_call` to invoke Lua
459    // handlers from the EventBus dispatcher task).
460    let ctx = HostContext {
461        project_root,
462        mesh_agent,
463        mcp_manager: Arc::clone(&mcp_manager),
464        http_client,
465        sql_conn,
466        sql_interrupt,
467        kv_conn,
468        kv_interrupt,
469        ts_conn,
470        ts_interrupt,
471        isle: Arc::clone(&isle),
472        handler_isle: Arc::clone(&handler_isle),
473        bus_tx: bus_tx.clone(),
474        event_bus: Arc::clone(&event_bus),
475    };
476
477    {
478        let ctx = ctx.clone();
479        isle.exec(move |lua| {
480            bridge::register_all(lua, &ctx)
481                .map_err(|e| mlua_isle::IsleError::Lua(format!("bridge register failed: {e}")))?;
482            Ok(String::new())
483        })
484        .await
485        .map_err(|e| BlockError::Runtime(format!("bridge register: {e}")))?;
486    }
487
488    {
489        let ctx = ctx.clone();
490        handler_isle
491            .exec(move |lua| {
492                bridge::register_all_handler_side(lua, &ctx).map_err(|e| {
493                    mlua_isle::IsleError::Lua(format!("handler bridge register failed: {e}"))
494                })?;
495                Ok(String::new())
496            })
497            .await
498            .map_err(|e| BlockError::Runtime(format!("handler bridge register: {e}")))?;
499    }
500
501    drop(_init_guard);
502
503    // ── Execute ───────────────────────────────────────────────────
504    {
505        let _exec_guard = info_span!("execute", script = %script_name).entered();
506
507        let script = std::fs::read_to_string(&script_path)
508            .map_err(|e| BlockError::Script(format!("{}: {e}", script_path.display())))?;
509
510        isle.coroutine_eval(&script)
511            .await
512            .map_err(|e| BlockError::Script(format!("{e}")))?;
513    }
514
515    // ── Shutdown ──────────────────────────────────────────────────
516    {
517        let _shutdown_guard = info_span!("shutdown").entered();
518
519        mcp_manager.write().await.disconnect_all().await?;
520
521        driver
522            .shutdown()
523            .await
524            .map_err(|e| BlockError::Runtime(format!("AsyncIsle shutdown failed: {e}")))?;
525
526        // Handler Isle shutdown is independent of main shutdown: a failure
527        // here (e.g. ThreadPanic on the handler thread) is logged but does
528        // not poison the main process exit. The main Isle has already
529        // been stopped cleanly above.
530        match handler_driver.shutdown().await {
531            Ok(()) => info!(
532                thread_name = "agent-block-handler-isle",
533                "handler Isle shut down"
534            ),
535            Err(e) => tracing::error!(
536                error = %e,
537                thread_name = "agent-block-handler-isle",
538                "handler Isle shutdown failed"
539            ),
540        }
541    }
542
543    Ok(())
544}
545
546/// mesh → bus source adapter.
547///
548/// Implements [`agent_mesh_sdk::RequestHandler`] by packaging every incoming
549/// mesh request into an [`Event`] with `kind = "mesh"`, pushing it onto the
550/// bounded `bus_tx` channel, and awaiting the Lua handler's ack over a
551/// oneshot channel carried inside the event.
552///
553/// Error paths (all `tracing::error!`-logged — silent-err-drop policy):
554///
555/// | Failure                   | Return value                           |
556/// |---------------------------|----------------------------------------|
557/// | `bus_tx.send` closed/full | `{"error": "bus channel closed"}`      |
558/// | ack receiver dropped      | `{"error": "ack dropped"}`             |
559/// | Lua handler `BlockError`  | `{"error": "<handler error>"}`         |
560/// | Handler exceeded 30s      | `{"error": "handler timeout"}`         |
561///
562/// The 30s ack timeout mirrors the client-side timeout on `mesh.request`
563/// (see `src/bridge/mesh.rs`).
564struct BusRelayHandler {
565    tx: mpsc::Sender<Event>,
566}
567
568impl BusRelayHandler {
569    fn new(tx: mpsc::Sender<Event>) -> Self {
570        Self { tx }
571    }
572}
573
574/// Bound used for both the mesh-adapter ack wait and other source timeouts.
575const BUS_ACK_TIMEOUT: Duration = Duration::from_secs(30);
576
577#[async_trait::async_trait]
578impl agent_mesh_sdk::RequestHandler for BusRelayHandler {
579    async fn handle(
580        &self,
581        from: &agent_mesh_core::identity::AgentId,
582        payload: &serde_json::Value,
583        _cancel: agent_mesh_sdk::CancelToken,
584    ) -> serde_json::Value {
585        let id = uuid::Uuid::new_v4().to_string();
586        let meta = serde_json::json!({"from": from.to_string()});
587        let (ack_tx, ack_rx) = oneshot::channel();
588        let event = Event {
589            kind: "mesh".into(),
590            id: id.clone(),
591            payload: payload.clone(),
592            meta,
593            ack_tx: Some(ack_tx),
594        };
595
596        if let Err(e) = self.tx.send(event).await {
597            tracing::error!(error = %e, id = %id, "bus channel closed; rejecting mesh request");
598            return serde_json::json!({"error": "bus channel closed"});
599        }
600
601        match tokio::time::timeout(BUS_ACK_TIMEOUT, ack_rx).await {
602            Ok(Ok(Ok(v))) => v,
603            Ok(Ok(Err(e))) => {
604                tracing::error!(id = %id, error = %e, "mesh handler returned error");
605                serde_json::json!({"error": e.to_string()})
606            }
607            Ok(Err(e)) => {
608                tracing::error!(id = %id, error = %e, "mesh ack receiver dropped");
609                serde_json::json!({"error": "ack dropped"})
610            }
611            Err(_) => {
612                tracing::error!(id = %id, timeout_secs = BUS_ACK_TIMEOUT.as_secs(), "mesh handler timeout");
613                serde_json::json!({"error": "handler timeout"})
614            }
615        }
616    }
617}