Skip to main content

agent_block_core/
host.rs

1//! Host — the thin Rust shell that wires up Lua VM, Mesh, HTTP, and MCP.
2//!
3//! # Responsibilities
4//!
5//! 1. Spawn an mlua-isle `AsyncIsle` (dedicated Lua VM thread with coroutine support)
6//! 2. Optionally connect to agent-mesh relay
7//! 3. Initialize the MCP manager for stdio-based MCP server connections
8//! 4. Inject all Lua stdlib bridges (`mesh.*`, `http.*`, `sh.*`, `tool.*`, `log.*`, `mcp.*`)
9//! 5. Execute the user-provided Lua script via `coroutine_eval` (async-aware)
10//! 6. Graceful shutdown (Isle + MCP servers + mesh)
11
12use std::collections::HashMap;
13use std::path::{Path, PathBuf};
14use std::sync::{Arc, Mutex};
15use std::time::Duration;
16use tokio::sync::{mpsc, oneshot, RwLock};
17
18use mlua_isle::{AsyncIsle, AsyncIsleDriver};
19use tracing::{info, info_span, warn};
20
21use crate::bridge;
22use crate::bus::{Event, EventBus};
23use agent_block_mcp::McpManager;
24use agent_block_types::error::{BlockError, BlockResult};
25
26/// Embedded Lua sources for blocks/ StdPkg modules.
27/// These are baked into the binary at compile time so `cargo install` works
28/// without any extra file distribution.
29const EMBEDDED_BLOCKS: &[(&str, &str)] = &[
30    ("agent", include_str!("../blocks/agent/init.lua")),
31    ("session", include_str!("../blocks/session/init.lua")),
32];
33
34/// Build the `blocks/` portion of `package.path` from filesystem locations.
35///
36/// Priority (highest first):
37/// 1. `project_root/blocks/` — user-customisable, overrides embedded StdPkg
38/// 2. `exe_dir/blocks/`      — development hot-reload (next to the binary)
39///
40/// Returns a semicolon-terminated string ready to prepend to `package.path`,
41/// or an empty string when no `blocks/` directories are found.
42fn build_blocks_path(project_root: &Path) -> String {
43    let mut out = String::new();
44
45    // 1. project_root/blocks/
46    let project_blocks = project_root.join("blocks");
47    if project_blocks.is_dir() {
48        let pb = project_blocks.to_string_lossy();
49        out.push_str(&format!("{pb}/?.lua;{pb}/?/init.lua;"));
50    }
51
52    // 2. exe_dir/blocks/
53    match std::env::current_exe() {
54        Ok(exe) => {
55            if let Some(exe_dir) = exe.parent() {
56                let exe_blocks = exe_dir.join("blocks");
57                if exe_blocks.is_dir() {
58                    let eb = exe_blocks.to_string_lossy();
59                    out.push_str(&format!("{eb}/?.lua;{eb}/?/init.lua;"));
60                }
61            }
62        }
63        Err(e) => {
64            warn!(error = %e, "current_exe() failed; skipping exe_dir/blocks/ from package.path");
65        }
66    }
67
68    out
69}
70
71pub struct BlockConfig {
72    pub script_path: PathBuf,
73    pub project_root: PathBuf,
74    pub relay_url: Option<String>,
75    /// Ed25519 secret key (64 hex chars). If `None`, a random keypair is
76    /// generated. Required to talk to registry/ACL-gated hosted meshes.
77    pub secret_key: Option<String>,
78    /// Per-RPC timeout for every MCP round-trip (connect / list / call).
79    /// Defaults to [`agent_block_mcp::DEFAULT_RPC_TIMEOUT`].
80    pub mcp_rpc_timeout: Duration,
81    /// Prompt string injected as `_PROMPT` Lua global. `None` = global not set.
82    pub prompt: Option<String>,
83    /// Context string injected as `_CONTEXT` Lua global. `None` = global not set.
84    pub context: Option<String>,
85}
86
87/// Shared context passed into Lua bridge functions.
88#[derive(Clone)]
89pub struct HostContext {
90    pub project_root: PathBuf,
91    pub mesh_agent: Option<Arc<agent_mesh_sdk::MeshAgent>>,
92    pub mcp_manager: Arc<RwLock<McpManager>>,
93    /// Shared async HTTP client for `http.*` bridge.
94    pub http_client: reqwest::Client,
95    /// Shared SQLite connection for `sql.*` bridge (user tables).
96    pub sql_conn: Arc<Mutex<rusqlite::Connection>>,
97    /// Interrupt handle for the sql connection.
98    /// Used to cancel in-flight queries on timeout (see `bridge/sql.rs`).
99    pub sql_interrupt: Arc<rusqlite::InterruptHandle>,
100    /// Shared SQLite connection for `kv.*` bridge (`__kv` table only).
101    /// Separate from sql_conn so KV scratch state and user SQL data don't
102    /// share WAL, page cache, or backup lifecycle.
103    pub kv_conn: Arc<Mutex<rusqlite::Connection>>,
104    /// Interrupt handle for the kv connection.
105    pub kv_interrupt: Arc<rusqlite::InterruptHandle>,
106    /// Shared SQLite connection for `ts.*` bridge (TSDB — time-series table).
107    /// Separate DB file so TSDB WAL does not share page cache with kv/sql.
108    pub ts_conn: Arc<Mutex<rusqlite::Connection>>,
109    /// Interrupt handle for the ts connection.
110    /// Used by `bridge::ts` to cancel in-flight queries on timeout (Subtask 2).
111    #[allow(dead_code)]
112    pub ts_interrupt: Arc<rusqlite::InterruptHandle>,
113    /// Async handle to the main Isle Lua VM that runs the user script via
114    /// `coroutine_eval`. After Subtask 2, `bridge::bus` no longer dispatches
115    /// handlers against this Isle; handlers live on `handler_isle` instead.
116    /// The field is retained because bridge code still keyed to the main
117    /// Isle (future `coroutine_call` back-edges, introspection APIs) may
118    /// need it, and removing it would force another HostContext reshape.
119    #[allow(dead_code)]
120    pub isle: Arc<AsyncIsle>,
121    /// Dedicated Isle for EventBus handler execution. Lua handlers
122    /// registered via `bus.on` / `bus.on_any` run here so that CPU-bound
123    /// handler code does not occupy the main Isle's LocalSet and block
124    /// grace timers / shutdown wakers on the main VM side.
125    ///
126    /// Used by `bridge::bus` to forward handler bytecode
127    /// (`Function::dump(true)` → `handler_isle.exec(...)`) and by
128    /// [`LuaHandler::call`](crate::bridge::bus) to dispatch via
129    /// `coroutine_call("__bus_dispatch", ...)`.
130    pub handler_isle: Arc<AsyncIsle>,
131    /// Ingress sender for the EventBus. Adapters (mesh / webhook / …)
132    /// clone this and push `Event`s. The ST3 mesh adapter captures its own
133    /// clone at `MeshAgent::connect` time, so the field itself is not read
134    /// elsewhere in the ST3 cut — kept `pub` for ST4+ adapter wiring.
135    #[allow(dead_code)]
136    pub bus_tx: mpsc::Sender<Event>,
137    /// Mutex-wrapped `Option<EventBus>` so `bus.on` / `bus.on_any` can lock
138    /// briefly from sync Lua context, and `bus.serve` can `Option::take`
139    /// ownership before entering the long-lived `run()` await (avoiding the
140    /// await-holding-lock anti-pattern on a `std::sync::Mutex`).
141    pub event_bus: Arc<Mutex<Option<EventBus>>>,
142}
143
144/// Open a SQLite connection at `path` (or `:memory:`) and apply the shared
145/// pragmas driven by ENV (`journal_mode`, `busy_timeout`). Returns the
146/// connection wrapped in Arc<Mutex<_>> together with its interrupt handle.
147///
148/// `label` is used only for the init log line (`sql` / `kv`) so that the two
149/// databases are distinguishable in tracing output.
150fn open_sqlite(
151    path: &Path,
152    label: &'static str,
153) -> BlockResult<(
154    Arc<Mutex<rusqlite::Connection>>,
155    Arc<rusqlite::InterruptHandle>,
156)> {
157    let is_memory = crate::bridge::config::is_memory_sql(path);
158    if !is_memory {
159        if let Some(parent) = path.parent() {
160            std::fs::create_dir_all(parent)
161                .map_err(|e| BlockError::Runtime(format!("{label} dir create: {e}")))?;
162        }
163    }
164    let conn = rusqlite::Connection::open(path)
165        .map_err(|e| BlockError::Runtime(format!("sqlite open {}: {e}", path.display())))?;
166    if !is_memory {
167        let journal = crate::bridge::config::sql_journal_mode();
168        conn.pragma_update(None, "journal_mode", &journal)
169            .map_err(|e| BlockError::Runtime(format!("journal_mode={journal}: {e}")))?;
170    }
171    let busy_ms = crate::bridge::config::sql_busy_timeout().as_millis() as i64;
172    conn.pragma_update(None, "busy_timeout", busy_ms)
173        .map_err(|e| BlockError::Runtime(format!("busy_timeout pragma: {e}")))?;
174    info!(label, path = %path.display(), busy_ms, "sqlite initialized");
175    let interrupt = Arc::new(conn.get_interrupt_handle());
176    let conn = Arc::new(Mutex::new(conn));
177    Ok((conn, interrupt))
178}
179
180/// Build the init closure shared between the main Isle and the handler
181/// Isle.  Sets `_SCRIPT_NAME`, registers `mlua-batteries` `std.*`, and
182/// configures `package.path` / `package.searchers` so `require "agent"`
183/// (and any `blocks/` module) works inside the Lua VM.
184///
185/// Returns an `FnOnce` so each call produces a fresh closure; this lets
186/// both Isles be spawned from the same config without `Clone` bounds on
187/// the captured `HashMap`.
188fn build_isle_init(
189    script_name: String,
190    script_dir: String,
191    blocks_paths: String,
192    prompt: Option<String>,
193    context: Option<String>,
194) -> impl FnOnce(&mlua::Lua) -> mlua::Result<()> + Send + 'static {
195    move |lua| {
196        // Set script name before registering bridges (used by log.* for attribution)
197        lua.globals().set("_SCRIPT_NAME", script_name.as_str())?;
198        if let Some(ref p) = prompt {
199            lua.globals().set("_PROMPT", p.as_str())?;
200        }
201        if let Some(ref c) = context {
202            lua.globals().set("_CONTEXT", c.as_str())?;
203        }
204
205        mlua_batteries::register_all(lua, "std")?;
206
207        // ── package.path ──────────────────────────────────────────────
208        // Priority: script_dir > project_root/blocks/ > exe_dir/blocks/ > default
209        let package: mlua::Table = lua.globals().get("package")?;
210        let current_path: String = package.get("path")?;
211        let new_path =
212            format!("{script_dir}/?.lua;{script_dir}/?/init.lua;{blocks_paths}{current_path}");
213        package.set("path", new_path)?;
214
215        // ── package.searchers — embedded fallback ─────────────────────
216        // Register a custom searcher that loads blocks/ modules from the
217        // sources baked in at compile time.  This is the lowest-priority
218        // searcher so filesystem copies always win.
219        let embedded: HashMap<&'static str, &'static str> =
220            EMBEDDED_BLOCKS.iter().copied().collect();
221
222        let searchers: mlua::Table = package.get("searchers")?;
223        let loader =
224            lua.create_function(move |lua, name: String| match embedded.get(name.as_str()) {
225                Some(source) => {
226                    let chunk = lua
227                        .load(*source)
228                        .set_name(format!("@embedded:blocks/{name}/init.lua"));
229                    let func = chunk.into_function()?;
230                    Ok(mlua::Value::Function(func))
231                }
232                None => {
233                    let msg = lua.create_string(format!("\n\tno embedded block '{name}'"))?;
234                    Ok(mlua::Value::String(msg))
235                }
236            })?;
237        // Append as the last searcher so filesystem paths remain preferred.
238        let next_idx = searchers.raw_len() + 1;
239        searchers.raw_set(next_idx, loader)?;
240
241        Ok(())
242    }
243}
244
245/// Spawn the dedicated handler Isle.
246///
247/// The handler Isle runs Lua bus handlers (`bus.on` / `bus.on_any`) on a
248/// separate OS thread with its own `tokio` current-thread runtime, keeping
249/// CPU-bound handlers from starving the main Isle's grace timers.
250///
251/// Bridge registration is deferred to a follow-up `exec` in `run()` because
252/// `HostContext` is not constructible until both Isles exist (the struct
253/// itself holds `Arc<AsyncIsle>` for both).
254async fn spawn_handler_isle(
255    script_name: String,
256    script_dir: String,
257    blocks_paths: String,
258    prompt: Option<String>,
259    context: Option<String>,
260) -> BlockResult<(Arc<AsyncIsle>, AsyncIsleDriver)> {
261    let init = build_isle_init(script_name, script_dir, blocks_paths, prompt, context);
262    let (isle, driver) = AsyncIsle::builder()
263        .thread_name("agent-block-handler-isle")
264        .spawn(init)
265        .await
266        .map_err(|e| BlockError::Runtime(format!("handler isle spawn failed: {e}")))?;
267    info!(
268        thread_name = "agent-block-handler-isle",
269        "handler Isle spawned"
270    );
271    Ok((Arc::new(isle), driver))
272}
273
274fn hex_decode_32(s: &str) -> Result<[u8; 32], String> {
275    let s = s.trim();
276    if s.len() != 64 {
277        return Err(format!("expected 64 hex chars, got {}", s.len()));
278    }
279    let mut out = [0u8; 32];
280    for (i, byte) in out.iter_mut().enumerate() {
281        let hi = u8::from_str_radix(&s[2 * i..2 * i + 1], 16)
282            .map_err(|e| format!("invalid hex at position {}: {e}", 2 * i))?;
283        let lo = u8::from_str_radix(&s[2 * i + 1..2 * i + 2], 16)
284            .map_err(|e| format!("invalid hex at position {}: {e}", 2 * i + 1))?;
285        *byte = (hi << 4) | lo;
286    }
287    Ok(out)
288}
289
290pub async fn run(config: BlockConfig) -> BlockResult<()> {
291    let script_name = config
292        .script_path
293        .file_name()
294        .map(|n| n.to_string_lossy().to_string())
295        .unwrap_or_else(|| "unknown".to_string());
296
297    let root_span = info_span!("agent_block", script = %script_name);
298    let _root_guard = root_span.enter();
299
300    // ── .env ──────────────────────────────────────────────────────
301    // Load .env from project_root if present. Variables are merged into
302    // the process environment so Lua's `std.env.get()` picks them up.
303    let env_path = config.project_root.join(".env");
304    match dotenvy::from_path(&env_path) {
305        Ok(()) => info!(path = %env_path.display(), ".env loaded"),
306        Err(dotenvy::Error::Io(_)) => {} // file not found — fine
307        Err(e) => tracing::warn!(path = %env_path.display(), error = %e, ".env parse error"),
308    }
309
310    // ── Init ──────────────────────────────────────────────────────
311    let _init_guard = info_span!("init").entered();
312
313    // ── EventBus channel ─────────────────────────────────────────────
314    // Construct the bounded mpsc BEFORE MeshAgent::connect so the relay
315    // handler can hold a `bus_tx` clone and forward incoming requests
316    // into the dispatcher. Capacity is ENV-driven (see bridge::config).
317    let bus_capacity = crate::bridge::config::bus_capacity();
318    let (bus_tx, bus_rx) = mpsc::channel::<Event>(bus_capacity);
319    let event_bus = Arc::new(Mutex::new(Some(EventBus::new(bus_rx))));
320
321    let mesh_agent = if let Some(ref relay_url) = config.relay_url {
322        let keypair = match &config.secret_key {
323            Some(hex_str) => {
324                let bytes = hex_decode_32(hex_str)
325                    .map_err(|e| BlockError::Runtime(format!("--secret-key: {e}")))?;
326                agent_mesh_core::identity::AgentKeypair::from_bytes(&bytes)
327            }
328            None => agent_mesh_core::identity::AgentKeypair::generate(),
329        };
330        info!(agent_id = %keypair.agent_id(), "mesh identity");
331        let acl = agent_mesh_core::acl::AclPolicy {
332            default_deny: false,
333            rules: vec![],
334        };
335        let handler: Arc<dyn agent_mesh_sdk::RequestHandler> =
336            Arc::new(BusRelayHandler::new(bus_tx.clone()));
337        let url = relay_url.clone();
338        let agent = agent_mesh_sdk::MeshAgent::connect(keypair, &url, acl, handler)
339            .await
340            .map_err(|e| BlockError::Mesh(format!("connect to {relay_url} failed: {e}")))?;
341        info!(relay_url = %relay_url, "mesh connected");
342        Some(Arc::new(agent))
343    } else {
344        None
345    };
346
347    let mcp_manager = Arc::new(RwLock::new(McpManager::with_rpc_timeout(
348        config.mcp_rpc_timeout,
349    )?));
350
351    // Resolve project_root to absolute path.
352    // canonicalize() can fail if the path doesn't exist; fall back to
353    // joining with current_dir to guarantee an absolute path.
354    let project_root = config
355        .project_root
356        .canonicalize()
357        .or_else(|_| std::env::current_dir().map(|cwd| cwd.join(&config.project_root)))?;
358
359    let http_client = reqwest::Client::new();
360
361    // ── SQLite init (kv + sql get separate DB files) ──────────────────────
362    // All knobs are ENV-driven (see `bridge/config.rs`).
363    let sql_path = crate::bridge::config::sql_path().map_err(BlockError::Runtime)?;
364    let (sql_conn, sql_interrupt) = open_sqlite(&sql_path, "sql")?;
365
366    let kv_path = crate::bridge::config::kv_path().map_err(BlockError::Runtime)?;
367    let (kv_conn, kv_interrupt) = open_sqlite(&kv_path, "kv")?;
368
369    let ts_path = crate::bridge::config::ts_path().map_err(BlockError::Runtime)?;
370    let (ts_conn, ts_interrupt) = open_sqlite(&ts_path, "ts")?;
371
372    let script_path = config.script_path.clone();
373    let script_dir = script_path
374        .parent()
375        .map(|p| p.to_string_lossy().to_string())
376        .unwrap_or_else(|| ".".to_string());
377
378    // Precompute values captured by the init closure so we don't need to
379    // move the full `HostContext` into it (HostContext now holds
380    // `Arc<AsyncIsle>`, which is available only after `AsyncIsle::spawn`
381    // returns — classic chicken-and-egg). All bridge registrations run in a
382    // second pass via `isle.exec` below.
383    let blocks_paths = build_blocks_path(&project_root);
384    let prompt = config.prompt.clone();
385    let context = config.context.clone();
386
387    // ── main Isle ─────────────────────────────────────────────────
388    let (isle, driver) = AsyncIsle::spawn(build_isle_init(
389        script_name.clone(),
390        script_dir.clone(),
391        blocks_paths.clone(),
392        prompt.clone(),
393        context.clone(),
394    ))
395    .await
396    .map_err(|e| BlockError::Runtime(format!("AsyncIsle spawn failed: {e}")))?;
397    let isle = Arc::new(isle);
398
399    // ── handler Isle (sequential, dependencies are trivial) ────────
400    let (handler_isle, handler_driver) = spawn_handler_isle(
401        script_name.clone(),
402        script_dir.clone(),
403        blocks_paths.clone(),
404        prompt,
405        context,
406    )
407    .await?;
408
409    // Wire both Isles into McpManager so Lua notification callbacks can be
410    // dispatched from the rmcp task thread.
411    // - handler_isle: sampling/createMessage dispatch (exec on handler Isle)
412    // - main_isle: progress/log notification dispatch (exec on main Isle so
413    //   user callback upvalues are preserved — no bytecode dump/reload needed)
414    {
415        let mut mgr = mcp_manager.write().await;
416        mgr.set_handler_isle(Arc::clone(&handler_isle));
417        mgr.set_main_isle(Arc::clone(&isle));
418    }
419
420    // ── HostContext + bridge registration ──────────────────────────────
421    // Wrap the isle in an Arc so `HostContext` can hand it to
422    // `bridge::bus` (which uses `AsyncIsle::coroutine_call` to invoke Lua
423    // handlers from the EventBus dispatcher task).
424    let ctx = HostContext {
425        project_root,
426        mesh_agent,
427        mcp_manager: Arc::clone(&mcp_manager),
428        http_client,
429        sql_conn,
430        sql_interrupt,
431        kv_conn,
432        kv_interrupt,
433        ts_conn,
434        ts_interrupt,
435        isle: Arc::clone(&isle),
436        handler_isle: Arc::clone(&handler_isle),
437        bus_tx: bus_tx.clone(),
438        event_bus: Arc::clone(&event_bus),
439    };
440
441    {
442        let ctx = ctx.clone();
443        isle.exec(move |lua| {
444            bridge::register_all(lua, &ctx)
445                .map_err(|e| mlua_isle::IsleError::Lua(format!("bridge register failed: {e}")))?;
446            Ok(String::new())
447        })
448        .await
449        .map_err(|e| BlockError::Runtime(format!("bridge register: {e}")))?;
450    }
451
452    {
453        let ctx = ctx.clone();
454        handler_isle
455            .exec(move |lua| {
456                bridge::register_all_handler_side(lua, &ctx).map_err(|e| {
457                    mlua_isle::IsleError::Lua(format!("handler bridge register failed: {e}"))
458                })?;
459                Ok(String::new())
460            })
461            .await
462            .map_err(|e| BlockError::Runtime(format!("handler bridge register: {e}")))?;
463    }
464
465    drop(_init_guard);
466
467    // ── Execute ───────────────────────────────────────────────────
468    {
469        let _exec_guard = info_span!("execute", script = %script_name).entered();
470
471        let script = std::fs::read_to_string(&script_path)
472            .map_err(|e| BlockError::Script(format!("{}: {e}", script_path.display())))?;
473
474        isle.coroutine_eval(&script)
475            .await
476            .map_err(|e| BlockError::Script(format!("{e}")))?;
477    }
478
479    // ── Shutdown ──────────────────────────────────────────────────
480    {
481        let _shutdown_guard = info_span!("shutdown").entered();
482
483        mcp_manager.write().await.disconnect_all().await?;
484
485        driver
486            .shutdown()
487            .await
488            .map_err(|e| BlockError::Runtime(format!("AsyncIsle shutdown failed: {e}")))?;
489
490        // Handler Isle shutdown is independent of main shutdown: a failure
491        // here (e.g. ThreadPanic on the handler thread) is logged but does
492        // not poison the main process exit. The main Isle has already
493        // been stopped cleanly above.
494        match handler_driver.shutdown().await {
495            Ok(()) => info!(
496                thread_name = "agent-block-handler-isle",
497                "handler Isle shut down"
498            ),
499            Err(e) => tracing::error!(
500                error = %e,
501                thread_name = "agent-block-handler-isle",
502                "handler Isle shutdown failed"
503            ),
504        }
505    }
506
507    Ok(())
508}
509
510/// mesh → bus source adapter.
511///
512/// Implements [`agent_mesh_sdk::RequestHandler`] by packaging every incoming
513/// mesh request into an [`Event`] with `kind = "mesh"`, pushing it onto the
514/// bounded `bus_tx` channel, and awaiting the Lua handler's ack over a
515/// oneshot channel carried inside the event.
516///
517/// Error paths (all `tracing::error!`-logged — silent-err-drop policy):
518///
519/// | Failure                   | Return value                           |
520/// |---------------------------|----------------------------------------|
521/// | `bus_tx.send` closed/full | `{"error": "bus channel closed"}`      |
522/// | ack receiver dropped      | `{"error": "ack dropped"}`             |
523/// | Lua handler `BlockError`  | `{"error": "<handler error>"}`         |
524/// | Handler exceeded 30s      | `{"error": "handler timeout"}`         |
525///
526/// The 30s ack timeout mirrors the client-side timeout on `mesh.request`
527/// (see `src/bridge/mesh.rs`).
528struct BusRelayHandler {
529    tx: mpsc::Sender<Event>,
530}
531
532impl BusRelayHandler {
533    fn new(tx: mpsc::Sender<Event>) -> Self {
534        Self { tx }
535    }
536}
537
538/// Bound used for both the mesh-adapter ack wait and other source timeouts.
539const BUS_ACK_TIMEOUT: Duration = Duration::from_secs(30);
540
541#[async_trait::async_trait]
542impl agent_mesh_sdk::RequestHandler for BusRelayHandler {
543    async fn handle(
544        &self,
545        from: &agent_mesh_core::identity::AgentId,
546        payload: &serde_json::Value,
547        _cancel: agent_mesh_sdk::CancelToken,
548    ) -> serde_json::Value {
549        let id = uuid::Uuid::new_v4().to_string();
550        let meta = serde_json::json!({"from": from.to_string()});
551        let (ack_tx, ack_rx) = oneshot::channel();
552        let event = Event {
553            kind: "mesh".into(),
554            id: id.clone(),
555            payload: payload.clone(),
556            meta,
557            ack_tx: Some(ack_tx),
558        };
559
560        if let Err(e) = self.tx.send(event).await {
561            tracing::error!(error = %e, id = %id, "bus channel closed; rejecting mesh request");
562            return serde_json::json!({"error": "bus channel closed"});
563        }
564
565        match tokio::time::timeout(BUS_ACK_TIMEOUT, ack_rx).await {
566            Ok(Ok(Ok(v))) => v,
567            Ok(Ok(Err(e))) => {
568                tracing::error!(id = %id, error = %e, "mesh handler returned error");
569                serde_json::json!({"error": e.to_string()})
570            }
571            Ok(Err(e)) => {
572                tracing::error!(id = %id, error = %e, "mesh ack receiver dropped");
573                serde_json::json!({"error": "ack dropped"})
574            }
575            Err(_) => {
576                tracing::error!(id = %id, timeout_secs = BUS_ACK_TIMEOUT.as_secs(), "mesh handler timeout");
577                serde_json::json!({"error": "handler timeout"})
578            }
579        }
580    }
581}