#[cfg(feature = "cortex")]
pub use net::adapter::net::behavior::fold::capability_aggregation::{TagMatcher, TagMatcherError};
#[cfg(feature = "cortex")]
pub use net::adapter::net::cortex::tool::{
description_metadata_key, streaming_metadata_key, tags_metadata_key, ToolDescriptor, ToolEvent,
ToolListChange, ToolListWatch, ToolMetadataRegistry, ToolMetadataRequest, ToolMetadataResponse,
TOOL_METADATA_FETCH_SERVICE,
};
#[cfg(feature = "cortex")]
use std::sync::Arc;
#[cfg(feature = "cortex")]
use crate::mesh::Mesh;
#[cfg(feature = "cortex")]
use crate::mesh_rpc::{Codec, ServeError, ServeHandle};
#[cfg(feature = "cortex")]
use serde::{de::DeserializeOwned, Serialize};
#[must_use = "ToolMetadataBuilder does nothing until `.build()` is called"]
pub struct ToolMetadataBuilder {
descriptor: ToolDescriptor,
}
impl ToolMetadataBuilder {
pub fn description(mut self, description: impl Into<String>) -> Self {
self.descriptor.description = Some(description.into());
self
}
pub fn version(mut self, version: impl Into<String>) -> Self {
self.descriptor.version = version.into();
self
}
pub fn streaming(mut self, streaming: bool) -> Self {
self.descriptor.streaming = streaming;
self
}
pub fn stateless(mut self, stateless: bool) -> Self {
self.descriptor.stateless = stateless;
self
}
pub fn estimated_time_ms(mut self, ms: u32) -> Self {
self.descriptor.estimated_time_ms = ms;
self
}
pub fn tag(mut self, tag: impl Into<String>) -> Self {
self.descriptor.tags.push(tag.into());
self
}
pub fn tags(mut self, tags: Vec<String>) -> Self {
self.descriptor.tags = tags;
self
}
pub fn requires(mut self, dep: impl Into<String>) -> Self {
self.descriptor.requires.push(dep.into());
self
}
pub fn build(self) -> ToolDescriptor {
self.descriptor
}
}
pub fn metadata_for<Req, Resp>(name: impl Into<String>) -> ToolMetadataBuilder
where
Req: schemars::JsonSchema,
Resp: schemars::JsonSchema,
{
let name = name.into();
let input_schema = schemars::schema_for!(Req);
let output_schema = schemars::schema_for!(Resp);
let input_schema_json =
serde_json::to_string(&input_schema).expect("schemars output is always valid JSON");
let output_schema_json =
serde_json::to_string(&output_schema).expect("schemars output is always valid JSON");
ToolMetadataBuilder {
descriptor: ToolDescriptor {
tool_id: name.clone(),
name,
version: "1.0.0".to_string(),
description: None,
input_schema: Some(input_schema_json),
output_schema: Some(output_schema_json),
requires: Vec::new(),
estimated_time_ms: 0,
stateless: true,
streaming: false,
tags: Vec::new(),
node_count: 0,
},
}
}
#[cfg(feature = "cortex")]
pub struct ToolServeHandle {
#[allow(dead_code)] inner: ServeHandle,
registry: Arc<ToolMetadataRegistry>,
tool_id: String,
}
#[cfg(feature = "cortex")]
impl Drop for ToolServeHandle {
fn drop(&mut self) {
self.registry.remove(&self.tool_id);
}
}
#[cfg(feature = "cortex")]
impl Mesh {
pub fn serve_tool<Req, Resp, F, Fut>(
&self,
descriptor: ToolDescriptor,
handler: F,
) -> std::result::Result<ToolServeHandle, ServeError>
where
Req: DeserializeOwned + Send + Sync + 'static,
Resp: Serialize + Send + Sync + 'static,
F: Fn(Req) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = std::result::Result<Resp, String>> + Send + 'static,
{
let tool_id = descriptor.tool_id.clone();
let registry = self.inner().tool_registry().clone();
let prior = registry.insert(descriptor);
if let Some(prior) = prior {
registry.insert(prior);
return Err(ServeError::AlreadyServing(tool_id));
}
let inner = match self.serve_rpc_typed::<Req, Resp, _, _>(&tool_id, Codec::Json, handler) {
Ok(h) => h,
Err(e) => {
registry.remove(&tool_id);
return Err(e);
}
};
self.ensure_tool_metadata_fetch_installed();
Ok(ToolServeHandle {
inner,
registry,
tool_id,
})
}
pub fn serve_tool_streaming<Req, F, Fut, St>(
&self,
mut descriptor: ToolDescriptor,
handler: F,
) -> std::result::Result<ToolServeHandle, ServeError>
where
Req: DeserializeOwned + Send + Sync + 'static,
F: Fn(Req) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = St> + Send + 'static,
St: futures::Stream<Item = ToolEvent> + Send + 'static,
{
descriptor.streaming = true;
let tool_id = descriptor.tool_id.clone();
let registry = self.inner().tool_registry().clone();
let prior = registry.insert(descriptor);
if let Some(prior) = prior {
registry.insert(prior);
return Err(ServeError::AlreadyServing(tool_id));
}
let handler = Arc::new(handler);
let inner = match self
.serve_rpc_streaming_typed::<Req, ToolEvent, _, _>(&tool_id, Codec::Json, move |req, sink| {
let handler = handler.clone();
async move {
use futures::StreamExt;
let stream = handler(req).await;
futures::pin_mut!(stream);
let mut seen_terminal = false;
while let Some(event) = stream.next().await {
let terminal = event.is_terminal();
sink.send(&event)
.map_err(|e| format!("tool event send: {e}"))?;
if terminal {
seen_terminal = true;
break;
}
}
if !seen_terminal {
let synthesized = ToolEvent::Error {
code: "missing_terminal".to_string(),
message:
"tool handler ended its stream without emitting a terminal Result or Error event"
.to_string(),
details: None,
};
sink.send(&synthesized)
.map_err(|e| format!("synthesized terminal send: {e}"))?;
}
Ok(())
}
}) {
Ok(h) => h,
Err(e) => {
registry.remove(&tool_id);
return Err(e);
}
};
self.ensure_tool_metadata_fetch_installed();
Ok(ToolServeHandle {
inner,
registry,
tool_id,
})
}
pub async fn call_tool<Req, Resp>(
&self,
tool_id: &str,
request: &Req,
) -> std::result::Result<Resp, crate::mesh_rpc::RpcError>
where
Req: serde::Serialize,
Resp: serde::de::DeserializeOwned,
{
self.call_service_typed::<Req, Resp>(
tool_id,
request,
crate::mesh_rpc::CallOptionsTyped {
raw: Default::default(),
codec: Codec::Json,
},
)
.await
}
pub async fn call_tool_streaming<Req>(
&self,
tool_id: &str,
request: &Req,
) -> std::result::Result<crate::mesh_rpc::RpcStreamTyped<ToolEvent>, crate::mesh_rpc::RpcError>
where
Req: serde::Serialize,
{
self.call_service_streaming_typed::<Req, ToolEvent>(
tool_id,
request,
crate::mesh_rpc::CallOptionsTyped {
raw: Default::default(),
codec: Codec::Json,
},
)
.await
}
pub fn list_tools(&self, matcher: Option<&TagMatcher>) -> Vec<ToolDescriptor> {
self.inner().list_tools(matcher)
}
pub fn watch_tools(
&self,
matcher: Option<TagMatcher>,
interval: Option<std::time::Duration>,
) -> ToolListWatch {
self.node_arc().watch_tools(matcher, interval)
}
fn ensure_tool_metadata_fetch_installed(&self) {
let mut slot = self.tool_metadata_fetch.lock();
if slot.is_some() {
return;
}
let registry = self.node().tool_registry().clone();
let handler = move |req: ToolMetadataRequest| {
let registry = registry.clone();
async move {
Ok(match registry.get(&req.name) {
Some(descriptor) => ToolMetadataResponse::Found { descriptor },
None => ToolMetadataResponse::NotFound { name: req.name },
})
}
};
if let Ok(handle) = self.serve_rpc_typed::<ToolMetadataRequest, ToolMetadataResponse, _, _>(
TOOL_METADATA_FETCH_SERVICE,
Codec::Json,
handler,
) {
*slot = Some(handle);
}
}
}
#[cfg(feature = "cortex")]
pub mod formats {
use super::ToolDescriptor;
use serde_json::{json, Value};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ToolCallSpec {
pub name: String,
pub arguments_json: String,
pub provider_call_id: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ToolCallParseError {
MissingField(&'static str),
WrongType {
field: &'static str,
expected: &'static str,
},
InvalidArgumentsJson(String),
}
impl std::fmt::Display for ToolCallParseError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::MissingField(name) => write!(f, "tool-call reply missing field `{name}`"),
Self::WrongType { field, expected } => write!(
f,
"tool-call reply field `{field}` had wrong type (expected {expected})"
),
Self::InvalidArgumentsJson(detail) => {
write!(f, "tool-call arguments were not valid JSON: {detail}")
}
}
}
}
impl std::error::Error for ToolCallParseError {}
fn input_schema_value(desc: &ToolDescriptor) -> Value {
desc.input_schema
.as_deref()
.and_then(|s| serde_json::from_str::<Value>(s).ok())
.unwrap_or_else(|| json!({"type": "object", "properties": {}}))
}
pub mod openai {
use super::*;
pub fn to_openai_tool(desc: &ToolDescriptor) -> Value {
let parameters = input_schema_value(desc);
let strict = desc.input_schema.is_some();
json!({
"type": "function",
"function": {
"name": desc.tool_id,
"description": desc.description.clone().unwrap_or_default(),
"parameters": parameters,
"strict": strict,
}
})
}
pub fn lower_openai_tool_call(call: &Value) -> Result<ToolCallSpec, ToolCallParseError> {
let function = call
.get("function")
.ok_or(ToolCallParseError::MissingField("function"))?;
let name = function
.get("name")
.ok_or(ToolCallParseError::MissingField("function.name"))?
.as_str()
.ok_or(ToolCallParseError::WrongType {
field: "function.name",
expected: "string",
})?
.to_string();
let arguments_json = function
.get("arguments")
.ok_or(ToolCallParseError::MissingField("function.arguments"))?
.as_str()
.ok_or(ToolCallParseError::WrongType {
field: "function.arguments",
expected: "string (JSON-encoded)",
})?
.to_string();
if let Err(e) = serde_json::from_str::<Value>(&arguments_json) {
return Err(ToolCallParseError::InvalidArgumentsJson(format!("{e}")));
}
let provider_call_id = call.get("id").and_then(|v| v.as_str()).map(String::from);
Ok(ToolCallSpec {
name,
arguments_json,
provider_call_id,
})
}
}
pub mod anthropic {
use super::*;
pub fn to_anthropic_tool(desc: &ToolDescriptor) -> Value {
json!({
"name": desc.tool_id,
"description": desc.description.clone().unwrap_or_default(),
"input_schema": input_schema_value(desc),
})
}
pub fn lower_anthropic_tool_use(block: &Value) -> Result<ToolCallSpec, ToolCallParseError> {
let name = block
.get("name")
.ok_or(ToolCallParseError::MissingField("name"))?
.as_str()
.ok_or(ToolCallParseError::WrongType {
field: "name",
expected: "string",
})?
.to_string();
let input = block
.get("input")
.ok_or(ToolCallParseError::MissingField("input"))?;
let arguments_json = serde_json::to_string(input)
.map_err(|e| ToolCallParseError::InvalidArgumentsJson(format!("{e}")))?;
let provider_call_id = block.get("id").and_then(|v| v.as_str()).map(String::from);
Ok(ToolCallSpec {
name,
arguments_json,
provider_call_id,
})
}
}
pub mod mcp {
use super::*;
pub fn to_mcp_tool(desc: &ToolDescriptor) -> Value {
json!({
"name": desc.tool_id,
"description": desc.description.clone().unwrap_or_default(),
"inputSchema": input_schema_value(desc),
})
}
pub fn lower_mcp_tools_call(params: &Value) -> Result<ToolCallSpec, ToolCallParseError> {
let name = params
.get("name")
.ok_or(ToolCallParseError::MissingField("name"))?
.as_str()
.ok_or(ToolCallParseError::WrongType {
field: "name",
expected: "string",
})?
.to_string();
let arguments = params
.get("arguments")
.ok_or(ToolCallParseError::MissingField("arguments"))?;
let arguments_json = serde_json::to_string(arguments)
.map_err(|e| ToolCallParseError::InvalidArgumentsJson(format!("{e}")))?;
Ok(ToolCallSpec {
name,
arguments_json,
provider_call_id: None,
})
}
}
pub mod gemini {
use super::*;
pub fn to_gemini_function_declaration(desc: &ToolDescriptor) -> Value {
json!({
"name": desc.tool_id,
"description": desc.description.clone().unwrap_or_default(),
"parameters": input_schema_value(desc),
})
}
pub fn lower_gemini_function_call(
call: &Value,
) -> Result<ToolCallSpec, ToolCallParseError> {
let name = call
.get("name")
.ok_or(ToolCallParseError::MissingField("name"))?
.as_str()
.ok_or(ToolCallParseError::WrongType {
field: "name",
expected: "string",
})?
.to_string();
let args = call
.get("args")
.ok_or(ToolCallParseError::MissingField("args"))?;
let arguments_json = serde_json::to_string(args)
.map_err(|e| ToolCallParseError::InvalidArgumentsJson(format!("{e}")))?;
Ok(ToolCallSpec {
name,
arguments_json,
provider_call_id: None,
})
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
#[derive(JsonSchema, Deserialize, Serialize)]
#[allow(dead_code)]
struct WebSearchReq {
query: String,
max_results: u32,
}
#[derive(JsonSchema, Deserialize, Serialize)]
#[allow(dead_code)]
struct WebSearchResp {
results: Vec<String>,
}
#[test]
fn metadata_for_derives_schemas_and_sets_defaults() {
let descriptor = metadata_for::<WebSearchReq, WebSearchResp>("web_search").build();
assert_eq!(descriptor.tool_id, "web_search");
assert_eq!(descriptor.name, "web_search");
assert_eq!(descriptor.version, "1.0.0");
assert!(descriptor.description.is_none());
assert!(descriptor.stateless);
assert!(!descriptor.streaming);
assert_eq!(descriptor.estimated_time_ms, 0);
assert_eq!(descriptor.node_count, 0);
assert!(descriptor.tags.is_empty());
assert!(descriptor.requires.is_empty());
let input = descriptor
.input_schema
.as_ref()
.expect("input schema present");
let parsed: serde_json::Value =
serde_json::from_str(input).expect("input schema must be valid JSON");
let props = parsed
.get("properties")
.expect("object schema has properties");
assert!(props.get("query").is_some());
assert!(props.get("max_results").is_some());
let output = descriptor
.output_schema
.as_ref()
.expect("output schema present");
let _: serde_json::Value =
serde_json::from_str(output).expect("output schema must be valid JSON");
}
#[test]
fn builder_setters_apply_in_chain() {
let descriptor = metadata_for::<WebSearchReq, WebSearchResp>("web_search")
.description("Search the web.")
.version("2.1.0")
.streaming(true)
.stateless(false)
.estimated_time_ms(500)
.tag("web")
.tag("research")
.requires("api_key:tavily")
.build();
assert_eq!(descriptor.description.as_deref(), Some("Search the web."));
assert_eq!(descriptor.version, "2.1.0");
assert!(descriptor.streaming);
assert!(!descriptor.stateless);
assert_eq!(descriptor.estimated_time_ms, 500);
assert_eq!(descriptor.tags, vec!["web", "research"]);
assert_eq!(descriptor.requires, vec!["api_key:tavily"]);
}
#[test]
fn builder_tags_replaces_wholesale() {
let descriptor = metadata_for::<WebSearchReq, WebSearchResp>("web_search")
.tag("first")
.tags(vec!["replaced".into(), "second".into()])
.build();
assert_eq!(descriptor.tags, vec!["replaced", "second"]);
}
fn sample_descriptor() -> ToolDescriptor {
metadata_for::<WebSearchReq, WebSearchResp>("web_search")
.description("Search the web.")
.build()
}
#[test]
fn openai_tool_has_function_type_and_strict_when_schema_present() {
let desc = sample_descriptor();
let tool = formats::openai::to_openai_tool(&desc);
assert_eq!(tool["type"], "function");
let function = &tool["function"];
assert_eq!(function["name"], "web_search");
assert_eq!(function["description"], "Search the web.");
assert_eq!(function["strict"], true);
let params = &function["parameters"];
assert!(
params["properties"]["query"].is_object(),
"input_schema's `query` property must surface in parameters",
);
}
#[test]
fn anthropic_tool_carries_input_schema_directly() {
let desc = sample_descriptor();
let tool = formats::anthropic::to_anthropic_tool(&desc);
assert_eq!(tool["name"], "web_search");
assert_eq!(tool["description"], "Search the web.");
let schema = &tool["input_schema"];
assert!(schema["properties"]["query"].is_object());
assert!(
tool.get("strict").is_none(),
"Anthropic has no tool-level strict flag"
);
}
#[test]
fn mcp_tool_uses_input_schema_camelcase() {
let desc = sample_descriptor();
let tool = formats::mcp::to_mcp_tool(&desc);
assert_eq!(tool["name"], "web_search");
assert_eq!(tool["description"], "Search the web.");
let schema = &tool["inputSchema"];
assert!(schema["properties"]["query"].is_object());
}
#[test]
fn gemini_function_declaration_uses_parameters_field() {
let desc = sample_descriptor();
let decl = formats::gemini::to_gemini_function_declaration(&desc);
assert_eq!(decl["name"], "web_search");
assert_eq!(decl["description"], "Search the web.");
let params = &decl["parameters"];
assert!(params["properties"]["query"].is_object());
}
#[test]
fn openai_lower_tool_call_extracts_name_and_arguments() {
use formats::openai::lower_openai_tool_call;
let call = serde_json::json!({
"id": "call_abc123",
"type": "function",
"function": {
"name": "web_search",
"arguments": "{\"query\":\"mesh\"}"
}
});
let spec = lower_openai_tool_call(&call).expect("valid call parses");
assert_eq!(spec.name, "web_search");
assert_eq!(spec.arguments_json, "{\"query\":\"mesh\"}");
assert_eq!(spec.provider_call_id.as_deref(), Some("call_abc123"));
}
#[test]
fn openai_lower_tool_call_rejects_invalid_arguments_json() {
use formats::openai::lower_openai_tool_call;
use formats::ToolCallParseError;
let call = serde_json::json!({
"function": {
"name": "x",
"arguments": "not valid json {"
}
});
match lower_openai_tool_call(&call) {
Err(ToolCallParseError::InvalidArgumentsJson(_)) => {}
other => panic!("expected InvalidArgumentsJson, got {other:?}"),
}
}
#[test]
fn anthropic_lower_tool_use_serializes_input_object() {
use formats::anthropic::lower_anthropic_tool_use;
let block = serde_json::json!({
"type": "tool_use",
"id": "toolu_xyz",
"name": "web_search",
"input": { "query": "mesh", "max_results": 5 }
});
let spec = lower_anthropic_tool_use(&block).expect("valid block parses");
assert_eq!(spec.name, "web_search");
let parsed: serde_json::Value =
serde_json::from_str(&spec.arguments_json).expect("arguments round-trip JSON");
assert_eq!(parsed["query"], "mesh");
assert_eq!(parsed["max_results"], 5);
assert_eq!(spec.provider_call_id.as_deref(), Some("toolu_xyz"));
}
#[test]
fn mcp_lower_tools_call_threads_arguments_through() {
use formats::mcp::lower_mcp_tools_call;
let params = serde_json::json!({
"name": "web_search",
"arguments": { "query": "mesh" }
});
let spec = lower_mcp_tools_call(¶ms).expect("valid params parse");
assert_eq!(spec.name, "web_search");
let parsed: serde_json::Value =
serde_json::from_str(&spec.arguments_json).expect("arguments round-trip JSON");
assert_eq!(parsed["query"], "mesh");
assert!(spec.provider_call_id.is_none());
}
#[test]
fn gemini_lower_function_call_handles_args_field() {
use formats::gemini::lower_gemini_function_call;
let call = serde_json::json!({
"name": "web_search",
"args": { "query": "mesh" }
});
let spec = lower_gemini_function_call(&call).expect("valid call parses");
assert_eq!(spec.name, "web_search");
let parsed: serde_json::Value =
serde_json::from_str(&spec.arguments_json).expect("arguments round-trip JSON");
assert_eq!(parsed["query"], "mesh");
assert!(spec.provider_call_id.is_none(), "Gemini has no call_id");
}
#[test]
fn formats_handle_missing_input_schema_with_empty_object() {
let desc = ToolDescriptor {
tool_id: "no_schema_tool".into(),
name: "no_schema_tool".into(),
version: "1.0.0".into(),
description: Some("Bare tool.".into()),
input_schema: None,
output_schema: None,
requires: Vec::new(),
estimated_time_ms: 0,
stateless: true,
streaming: false,
tags: Vec::new(),
node_count: 0,
};
let openai = formats::openai::to_openai_tool(&desc);
assert_eq!(openai["function"]["parameters"]["type"], "object");
assert_eq!(openai["function"]["strict"], false);
let anthropic = formats::anthropic::to_anthropic_tool(&desc);
assert_eq!(anthropic["input_schema"]["type"], "object");
let mcp = formats::mcp::to_mcp_tool(&desc);
assert_eq!(mcp["inputSchema"]["type"], "object");
let gemini = formats::gemini::to_gemini_function_declaration(&desc);
assert_eq!(gemini["parameters"]["type"], "object");
}
}