Skip to main content

agent_block_core/bridge/
mesh.rs

1//! mesh.* — Agent mesh communication bridge (async).
2//!
3//! `mesh.send` and `mesh.request` use `create_async_function` so that
4//! Lua coroutines yield while waiting for mesh I/O.
5
6use mlua::prelude::*;
7use serde_json::Map;
8use std::sync::Arc;
9use std::time::Duration;
10
11use crate::host::HostContext;
12use agent_block_types::obs;
13
14/// Register the `mesh.*` Lua table.
15///
16/// `is_handler_side` selects the surface exposed by this bridge:
17///
18/// - `false` (main Isle): registers `send`, `request`, `on`, `agent_id`.
19///   `mesh.on` is a thin alias over `bus.on("mesh", fn)` and therefore
20///   depends on `bus.register` having run first on the same VM
21///   (`bridge::register_all` orders them correctly).
22/// - `true` (handler Isle): registers `send`, `request`, `agent_id` only.
23///   The handler Isle does not expose the `bus.*` global, so installing
24///   `mesh.on` would fail with `bus global missing`. Handlers dispatched on
25///   the handler Isle can still call `mesh.send` / `mesh.request` because
26///   the `MeshAgent` Arc is shared across Isles via `HostContext`.
27pub fn register(lua: &Lua, ctx: &HostContext, is_handler_side: bool) -> LuaResult<()> {
28    let mesh_tbl = lua.create_table()?;
29    let script_name: String = lua
30        .globals()
31        .get::<Option<String>>("_SCRIPT_NAME")?
32        .unwrap_or_else(|| "unknown".to_string());
33
34    match &ctx.mesh_agent {
35        None => {
36            // All functions return error when mesh is not connected.
37            // Skip `on` on the handler side because it is not exposed there.
38            let names: &[&str] = if is_handler_side {
39                &["send", "request", "agent_id"]
40            } else {
41                &["send", "request", "on", "agent_id"]
42            };
43            for name in names {
44                let n = name.to_string();
45                mesh_tbl.set(
46                    *name,
47                    lua.create_function(move |_, _: LuaValue| {
48                        Err::<LuaValue, _>(LuaError::external(format!(
49                            "mesh.{n}: mesh not connected (no --relay specified)"
50                        )))
51                    })?,
52                )?;
53            }
54        }
55        Some(agent) => {
56            let agent_send = Arc::clone(agent);
57            let script_name_send = script_name.clone();
58            mesh_tbl.set(
59                "send",
60                lua.create_async_function(
61                    move |lua, (agent_id_str, payload): (String, LuaValue)| {
62                        let agent = Arc::clone(&agent_send);
63                        let script_name = script_name_send.clone();
64                        async move {
65                            use crate::bridge::lua_to_json;
66                            let mut payload_json = lua_to_json(&lua, payload)?;
67                            let local_agent_id = agent.agent_id().to_string();
68                            inject_obs_context(&mut payload_json, Some(local_agent_id.clone()));
69                            tracing::info!(
70                                target: "lua",
71                                script = %script_name,
72                                "{}",
73                                obs::obs_line(
74                                    "mesh",
75                                    "mesh_send",
76                                    &obs::obs_context(Some(local_agent_id.as_str())),
77                                    &[("target", agent_id_str.as_str())],
78                                )
79                            );
80                            let target = agent_mesh_core::identity::AgentId::from_raw(agent_id_str);
81                            agent
82                                .request(&target, payload_json, Duration::from_secs(10))
83                                .await
84                                .map_err(LuaError::external)?;
85                            Ok(())
86                        }
87                    },
88                )?,
89            )?;
90
91            let agent_req = Arc::clone(agent);
92            let script_name_req = script_name.clone();
93            mesh_tbl.set(
94                "request",
95                lua.create_async_function(
96                    move |lua, (agent_id_str, payload): (String, LuaValue)| {
97                        let agent = Arc::clone(&agent_req);
98                        let script_name = script_name_req.clone();
99                        async move {
100                            use crate::bridge::{json_to_lua, lua_to_json};
101                            let mut payload_json = lua_to_json(&lua, payload)?;
102                            let local_agent_id = agent.agent_id().to_string();
103                            inject_obs_context(&mut payload_json, Some(local_agent_id.clone()));
104                            tracing::info!(
105                                target: "lua",
106                                script = %script_name,
107                                "{}",
108                                obs::obs_line(
109                                    "mesh",
110                                    "mesh_request",
111                                    &obs::obs_context(Some(local_agent_id.as_str())),
112                                    &[("target", agent_id_str.as_str())],
113                                )
114                            );
115                            let target = agent_mesh_core::identity::AgentId::from_raw(agent_id_str);
116                            let resp = agent
117                                .request(&target, payload_json, Duration::from_secs(30))
118                                .await
119                                .map_err(LuaError::external)?;
120                            json_to_lua(&lua, resp)
121                        }
122                    },
123                )?,
124            )?;
125
126            let agent_id_str = agent.agent_id().to_string();
127            mesh_tbl.set(
128                "agent_id",
129                lua.create_function(move |_, ()| Ok(agent_id_str.clone()))?,
130            )?;
131
132            if !is_handler_side {
133                // mesh.on is a thin alias over bus.on("mesh", fn). The
134                // EventBus (registered in bridge/bus.rs before this function
135                // runs on the main Isle) owns the actual dispatch. Capture
136                // `bus.on` at registration time so subsequent reassignments
137                // of the `bus` global do not hijack the alias.
138                //
139                // Because `bus.on` is now an async function (it forwards
140                // handler bytecode to the handler Isle), `mesh.on` must be
141                // an async function too and `.call_async().await` the
142                // underlying `bus.on`.
143                let bus_tbl: LuaTable = lua.globals().get("bus")?;
144                let bus_on: LuaFunction = bus_tbl.get("on")?;
145                mesh_tbl.set(
146                    "on",
147                    lua.create_async_function(move |_, func: LuaFunction| {
148                        let bus_on = bus_on.clone();
149                        async move { bus_on.call_async::<()>(("mesh", func)).await }
150                    })?,
151                )?;
152            }
153        }
154    }
155
156    lua.globals().set("mesh", mesh_tbl)?;
157    Ok(())
158}
159
160fn inject_obs_context(payload_json: &mut serde_json::Value, fallback_agent_id: Option<String>) {
161    fn insert_obs(obj: &mut Map<String, serde_json::Value>, fallback_agent_id: Option<String>) {
162        if obj.contains_key("__ab_obs") {
163            return;
164        }
165        let mut obs = Map::<String, serde_json::Value>::new();
166        if let Ok(v) = std::env::var("AGENT_BLOCK_TRACE_ID") {
167            if !v.is_empty() {
168                obs.insert("trace_id".to_string(), serde_json::Value::String(v));
169            }
170        }
171        if let Ok(v) = std::env::var("AGENT_BLOCK_RUN_ID") {
172            if !v.is_empty() {
173                obs.insert("run_id".to_string(), serde_json::Value::String(v));
174            }
175        }
176        let agent_id = std::env::var("AGENT_BLOCK_AGENT_ID")
177            .ok()
178            .filter(|s| !s.is_empty())
179            .or(fallback_agent_id);
180        if let Some(v) = agent_id {
181            obs.insert("agent_id".to_string(), serde_json::Value::String(v));
182        }
183        if let Ok(v) = std::env::var("AGENT_BLOCK_AGENT_NAME") {
184            if !v.is_empty() {
185                obs.insert("agent_name".to_string(), serde_json::Value::String(v));
186            }
187        }
188        if !obs.is_empty() {
189            obj.insert("__ab_obs".to_string(), serde_json::Value::Object(obs));
190        }
191    }
192
193    match payload_json {
194        serde_json::Value::Object(obj) => insert_obs(obj, fallback_agent_id),
195        serde_json::Value::Null => {
196            let mut obj = Map::<String, serde_json::Value>::new();
197            insert_obs(&mut obj, fallback_agent_id);
198            if !obj.is_empty() {
199                *payload_json = serde_json::Value::Object(obj);
200            }
201        }
202        _ => {}
203    }
204}