agent_block_core/bridge/
mesh.rs1use mlua::prelude::*;
7use serde_json::Map;
8use std::sync::Arc;
9use std::time::Duration;
10
11use crate::host::HostContext;
12use agent_block_types::obs;
13
14pub fn register(lua: &Lua, ctx: &HostContext, is_handler_side: bool) -> LuaResult<()> {
28 let mesh_tbl = lua.create_table()?;
29 let script_name: String = lua
30 .globals()
31 .get::<Option<String>>("_SCRIPT_NAME")?
32 .unwrap_or_else(|| "unknown".to_string());
33
34 match &ctx.mesh_agent {
35 None => {
36 let names: &[&str] = if is_handler_side {
39 &["send", "request", "agent_id"]
40 } else {
41 &["send", "request", "on", "agent_id"]
42 };
43 for name in names {
44 let n = name.to_string();
45 mesh_tbl.set(
46 *name,
47 lua.create_function(move |_, _: LuaValue| {
48 Err::<LuaValue, _>(LuaError::external(format!(
49 "mesh.{n}: mesh not connected (no --relay specified)"
50 )))
51 })?,
52 )?;
53 }
54 }
55 Some(agent) => {
56 let agent_send = Arc::clone(agent);
57 let script_name_send = script_name.clone();
58 mesh_tbl.set(
59 "send",
60 lua.create_async_function(
61 move |lua, (agent_id_str, payload): (String, LuaValue)| {
62 let agent = Arc::clone(&agent_send);
63 let script_name = script_name_send.clone();
64 async move {
65 use crate::bridge::lua_to_json;
66 let mut payload_json = lua_to_json(&lua, payload)?;
67 let local_agent_id = agent.agent_id().to_string();
68 inject_obs_context(&mut payload_json, Some(local_agent_id.clone()));
69 tracing::info!(
70 target: "lua",
71 script = %script_name,
72 "{}",
73 obs::obs_line(
74 "mesh",
75 "mesh_send",
76 &obs::obs_context(Some(local_agent_id.as_str())),
77 &[("target", agent_id_str.as_str())],
78 )
79 );
80 let target = agent_mesh_core::identity::AgentId::from_raw(agent_id_str);
81 agent
82 .request(&target, payload_json, Duration::from_secs(10))
83 .await
84 .map_err(LuaError::external)?;
85 Ok(())
86 }
87 },
88 )?,
89 )?;
90
91 let agent_req = Arc::clone(agent);
92 let script_name_req = script_name.clone();
93 mesh_tbl.set(
94 "request",
95 lua.create_async_function(
96 move |lua, (agent_id_str, payload): (String, LuaValue)| {
97 let agent = Arc::clone(&agent_req);
98 let script_name = script_name_req.clone();
99 async move {
100 use crate::bridge::{json_to_lua, lua_to_json};
101 let mut payload_json = lua_to_json(&lua, payload)?;
102 let local_agent_id = agent.agent_id().to_string();
103 inject_obs_context(&mut payload_json, Some(local_agent_id.clone()));
104 tracing::info!(
105 target: "lua",
106 script = %script_name,
107 "{}",
108 obs::obs_line(
109 "mesh",
110 "mesh_request",
111 &obs::obs_context(Some(local_agent_id.as_str())),
112 &[("target", agent_id_str.as_str())],
113 )
114 );
115 let target = agent_mesh_core::identity::AgentId::from_raw(agent_id_str);
116 let resp = agent
117 .request(&target, payload_json, Duration::from_secs(30))
118 .await
119 .map_err(LuaError::external)?;
120 json_to_lua(&lua, resp)
121 }
122 },
123 )?,
124 )?;
125
126 let agent_id_str = agent.agent_id().to_string();
127 mesh_tbl.set(
128 "agent_id",
129 lua.create_function(move |_, ()| Ok(agent_id_str.clone()))?,
130 )?;
131
132 if !is_handler_side {
133 let bus_tbl: LuaTable = lua.globals().get("bus")?;
144 let bus_on: LuaFunction = bus_tbl.get("on")?;
145 mesh_tbl.set(
146 "on",
147 lua.create_async_function(move |_, func: LuaFunction| {
148 let bus_on = bus_on.clone();
149 async move { bus_on.call_async::<()>(("mesh", func)).await }
150 })?,
151 )?;
152 }
153 }
154 }
155
156 lua.globals().set("mesh", mesh_tbl)?;
157 Ok(())
158}
159
160fn inject_obs_context(payload_json: &mut serde_json::Value, fallback_agent_id: Option<String>) {
161 fn insert_obs(obj: &mut Map<String, serde_json::Value>, fallback_agent_id: Option<String>) {
162 if obj.contains_key("__ab_obs") {
163 return;
164 }
165 let mut obs = Map::<String, serde_json::Value>::new();
166 if let Ok(v) = std::env::var("AGENT_BLOCK_TRACE_ID") {
167 if !v.is_empty() {
168 obs.insert("trace_id".to_string(), serde_json::Value::String(v));
169 }
170 }
171 if let Ok(v) = std::env::var("AGENT_BLOCK_RUN_ID") {
172 if !v.is_empty() {
173 obs.insert("run_id".to_string(), serde_json::Value::String(v));
174 }
175 }
176 let agent_id = std::env::var("AGENT_BLOCK_AGENT_ID")
177 .ok()
178 .filter(|s| !s.is_empty())
179 .or(fallback_agent_id);
180 if let Some(v) = agent_id {
181 obs.insert("agent_id".to_string(), serde_json::Value::String(v));
182 }
183 if let Ok(v) = std::env::var("AGENT_BLOCK_AGENT_NAME") {
184 if !v.is_empty() {
185 obs.insert("agent_name".to_string(), serde_json::Value::String(v));
186 }
187 }
188 if !obs.is_empty() {
189 obj.insert("__ab_obs".to_string(), serde_json::Value::Object(obs));
190 }
191 }
192
193 match payload_json {
194 serde_json::Value::Object(obj) => insert_obs(obj, fallback_agent_id),
195 serde_json::Value::Null => {
196 let mut obj = Map::<String, serde_json::Value>::new();
197 insert_obs(&mut obj, fallback_agent_id);
198 if !obj.is_empty() {
199 *payload_json = serde_json::Value::Object(obj);
200 }
201 }
202 _ => {}
203 }
204}