use std::collections::{BTreeMap, HashMap};
use serde::{Deserialize, Serialize};
use crate::adapter::net::behavior::ToolCapability;
pub fn description_metadata_key(tool_id: &str) -> String {
format!("tool::{tool_id}::description")
}
pub fn streaming_metadata_key(tool_id: &str) -> String {
format!("tool::{tool_id}::streaming")
}
pub fn tags_metadata_key(tool_id: &str) -> String {
format!("tool::{tool_id}::tags")
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ToolDescriptor {
pub tool_id: String,
pub name: String,
pub version: String,
pub description: Option<String>,
pub input_schema: Option<String>,
pub output_schema: Option<String>,
pub requires: Vec<String>,
pub estimated_time_ms: u32,
pub stateless: bool,
pub streaming: bool,
pub tags: Vec<String>,
pub node_count: u32,
}
impl ToolDescriptor {
pub fn from_capability(cap: &ToolCapability, metadata: &BTreeMap<String, String>) -> Self {
let description = metadata
.get(&description_metadata_key(&cap.tool_id))
.cloned();
let streaming = metadata
.get(&streaming_metadata_key(&cap.tool_id))
.map(|s| s == "1")
.unwrap_or(false);
let tags = metadata
.get(&tags_metadata_key(&cap.tool_id))
.map(|raw| {
raw.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect()
})
.unwrap_or_default();
Self {
tool_id: cap.tool_id.clone(),
name: cap.name.clone(),
version: cap.version.clone(),
description,
input_schema: cap.input_schema.clone(),
output_schema: cap.output_schema.clone(),
requires: cap.requires.clone(),
estimated_time_ms: cap.estimated_time_ms,
stateless: cap.stateless,
streaming,
tags,
node_count: 0,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ToolEvent {
Start {
tool_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
call_id: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
metadata: Option<serde_json::Value>,
},
Progress {
#[serde(default, skip_serializing_if = "Option::is_none")]
pct: Option<f32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
message: Option<String>,
},
Delta {
data: serde_json::Value,
},
Result {
data: serde_json::Value,
},
Error {
code: String,
message: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
details: Option<serde_json::Value>,
},
}
impl ToolEvent {
pub fn is_terminal(&self) -> bool {
matches!(self, Self::Result { .. } | Self::Error { .. })
}
}
pub struct ToolListWatch {
pub(crate) receiver: tokio::sync::mpsc::Receiver<ToolListChange>,
}
impl futures::Stream for ToolListWatch {
type Item = ToolListChange;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.receiver.poll_recv(cx)
}
}
impl ToolListWatch {
pub async fn recv(&mut self) -> Option<ToolListChange> {
self.receiver.recv().await
}
pub fn try_recv(&mut self) -> Option<ToolListChange> {
self.receiver.try_recv().ok()
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum ToolListChange {
Added(ToolDescriptor),
Removed(ToolDescriptor),
NodeCountChanged {
descriptor: ToolDescriptor,
prev_node_count: u32,
},
}
pub const TOOL_METADATA_FETCH_SERVICE: &str = "tool.metadata.fetch";
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ToolMetadataRequest {
pub name: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ToolMetadataResponse {
Found {
descriptor: ToolDescriptor,
},
NotFound {
name: String,
},
}
#[derive(Debug, Default)]
pub struct ToolMetadataRegistry {
inner: parking_lot::Mutex<RegistryState>,
}
#[derive(Debug, Default)]
struct RegistryState {
map: HashMap<String, ToolDescriptor>,
snapshot: Option<std::sync::Arc<[ToolDescriptor]>>,
}
impl ToolMetadataRegistry {
pub fn new() -> Self {
Self::default()
}
pub fn insert(&self, descriptor: ToolDescriptor) -> Option<ToolDescriptor> {
let mut guard = self.inner.lock();
guard.snapshot = None;
guard.map.insert(descriptor.tool_id.clone(), descriptor)
}
pub fn get(&self, name: &str) -> Option<ToolDescriptor> {
self.inner.lock().map.get(name).cloned()
}
pub fn remove(&self, name: &str) -> Option<ToolDescriptor> {
let mut guard = self.inner.lock();
let prev = guard.map.remove(name);
if prev.is_some() {
guard.snapshot = None;
}
prev
}
pub fn len(&self) -> usize {
self.inner.lock().map.len()
}
pub fn is_empty(&self) -> bool {
self.inner.lock().map.is_empty()
}
pub fn snapshot(&self) -> std::sync::Arc<[ToolDescriptor]> {
let mut guard = self.inner.lock();
if let Some(s) = &guard.snapshot {
return s.clone();
}
let snap: std::sync::Arc<[ToolDescriptor]> =
guard.map.values().cloned().collect::<Vec<_>>().into();
guard.snapshot = Some(snap.clone());
snap
}
}
#[cfg(test)]
mod tests {
use super::*;
fn cap(tool_id: &str) -> ToolCapability {
ToolCapability::new(tool_id, format!("Name for {tool_id}"))
.with_version("1.2.3")
.with_input_schema(r#"{"type":"object"}"#)
}
#[test]
fn metadata_keys_follow_existing_convention() {
assert_eq!(
description_metadata_key("web_search"),
"tool::web_search::description"
);
assert_eq!(
streaming_metadata_key("web_search"),
"tool::web_search::streaming"
);
assert_eq!(tags_metadata_key("web_search"), "tool::web_search::tags");
}
#[test]
fn descriptor_from_capability_picks_up_metadata_fields() {
let cap = cap("web_search");
let mut meta = BTreeMap::new();
meta.insert(
description_metadata_key("web_search"),
"Search the web.".to_string(),
);
meta.insert(streaming_metadata_key("web_search"), "1".to_string());
meta.insert(
tags_metadata_key("web_search"),
"web,research,external".to_string(),
);
let desc = ToolDescriptor::from_capability(&cap, &meta);
assert_eq!(desc.tool_id, "web_search");
assert_eq!(desc.version, "1.2.3");
assert_eq!(desc.description.as_deref(), Some("Search the web."));
assert!(desc.streaming);
assert_eq!(desc.tags, vec!["web", "research", "external"]);
assert_eq!(desc.input_schema.as_deref(), Some(r#"{"type":"object"}"#));
assert_eq!(
desc.node_count, 0,
"node_count is filled by the aggregator, not here"
);
}
#[test]
fn descriptor_from_capability_handles_missing_metadata() {
let cap = cap("legacy");
let meta = BTreeMap::new();
let desc = ToolDescriptor::from_capability(&cap, &meta);
assert!(desc.description.is_none());
assert!(!desc.streaming);
assert!(desc.tags.is_empty());
}
#[test]
fn descriptor_tags_parsing_strips_whitespace_and_drops_empty() {
let cap = cap("messy");
let mut meta = BTreeMap::new();
meta.insert(tags_metadata_key("messy"), " a , b ,, c ,".to_string());
let desc = ToolDescriptor::from_capability(&cap, &meta);
assert_eq!(desc.tags, vec!["a", "b", "c"]);
}
#[test]
fn tool_event_serde_roundtrip_each_variant() {
let cases = vec![
ToolEvent::Start {
tool_id: "web_search".into(),
call_id: Some(42),
metadata: Some(serde_json::json!({"model": "claude-opus-4-7"})),
},
ToolEvent::Progress {
pct: Some(33.3),
message: Some("indexing".into()),
},
ToolEvent::Delta {
data: serde_json::json!({"token": "the"}),
},
ToolEvent::Result {
data: serde_json::json!({"results": ["a", "b"]}),
},
ToolEvent::Error {
code: "upstream_timeout".into(),
message: "took >30s".into(),
details: Some(serde_json::json!({"upstream": "anthropic"})),
},
];
for event in cases {
let encoded = serde_json::to_string(&event).expect("encode");
let decoded: ToolEvent = serde_json::from_str(&encoded).expect("decode");
assert_eq!(event, decoded, "round-trip must be byte-stable");
}
}
#[test]
fn tool_event_is_terminal_only_for_result_and_error() {
assert!(!ToolEvent::Start {
tool_id: "x".into(),
call_id: None,
metadata: None
}
.is_terminal());
assert!(!ToolEvent::Progress {
pct: None,
message: None
}
.is_terminal());
assert!(!ToolEvent::Delta {
data: serde_json::Value::Null
}
.is_terminal());
assert!(ToolEvent::Result {
data: serde_json::Value::Null
}
.is_terminal());
assert!(ToolEvent::Error {
code: "".into(),
message: "".into(),
details: None
}
.is_terminal());
}
#[test]
fn tool_event_optional_fields_omitted_when_none() {
let event = ToolEvent::Start {
tool_id: "x".into(),
call_id: None,
metadata: None,
};
let json = serde_json::to_string(&event).unwrap();
assert_eq!(json, r#"{"type":"start","tool_id":"x"}"#);
let event = ToolEvent::Progress {
pct: None,
message: None,
};
assert_eq!(
serde_json::to_string(&event).unwrap(),
r#"{"type":"progress"}"#
);
}
fn descriptor(tool_id: &str) -> ToolDescriptor {
let cap = cap(tool_id);
ToolDescriptor::from_capability(&cap, &BTreeMap::new())
}
#[test]
fn tool_metadata_fetch_service_name_is_canonical() {
assert_eq!(TOOL_METADATA_FETCH_SERVICE, "tool.metadata.fetch");
}
#[test]
fn tool_metadata_response_serde_distinguishes_found_and_not_found() {
let found = ToolMetadataResponse::Found {
descriptor: descriptor("web_search"),
};
let not_found = ToolMetadataResponse::NotFound {
name: "missing".into(),
};
for resp in [&found, ¬_found] {
let encoded = serde_json::to_string(resp).unwrap();
let decoded: ToolMetadataResponse = serde_json::from_str(&encoded).unwrap();
assert_eq!(*resp, decoded, "round-trip must be byte-stable");
}
let found_json = serde_json::to_value(&found).unwrap();
assert_eq!(found_json["type"], "found");
let nf_json = serde_json::to_value(¬_found).unwrap();
assert_eq!(nf_json["type"], "not_found");
}
#[test]
fn tool_metadata_registry_insert_lookup_remove_roundtrip() {
let reg = ToolMetadataRegistry::new();
assert!(reg.is_empty());
assert_eq!(reg.len(), 0);
let desc = descriptor("web_search");
assert!(
reg.insert(desc.clone()).is_none(),
"first insert returns None"
);
assert_eq!(reg.len(), 1);
let got = reg.get("web_search").expect("get must find it");
assert_eq!(got, desc);
let prior = reg
.insert(desc.clone())
.expect("second insert returns prior");
assert_eq!(prior, desc);
let removed = reg.remove("web_search").expect("remove must find it");
assert_eq!(removed, desc);
assert!(reg.is_empty());
assert!(reg.get("web_search").is_none());
assert!(reg.remove("web_search").is_none());
}
#[test]
fn tool_metadata_registry_snapshot_returns_all_entries() {
let reg = ToolMetadataRegistry::new();
reg.insert(descriptor("a"));
reg.insert(descriptor("b"));
reg.insert(descriptor("c"));
let mut names: Vec<String> = reg.snapshot().iter().map(|d| d.tool_id.clone()).collect();
names.sort();
assert_eq!(names, vec!["a", "b", "c"]);
}
#[test]
fn tool_metadata_registry_snapshot_caches_until_mutation() {
let reg = ToolMetadataRegistry::new();
reg.insert(descriptor("a"));
let s1 = reg.snapshot();
let s2 = reg.snapshot();
assert!(
std::sync::Arc::ptr_eq(&s1, &s2),
"two consecutive snapshots without mutation must share the same Arc"
);
reg.insert(descriptor("b"));
let s3 = reg.snapshot();
assert!(
!std::sync::Arc::ptr_eq(&s1, &s3),
"insert must invalidate the cached snapshot"
);
let s4 = reg.snapshot();
assert!(
std::sync::Arc::ptr_eq(&s3, &s4),
"snapshot after insert must cache again"
);
reg.remove("a");
let s5 = reg.snapshot();
assert!(
!std::sync::Arc::ptr_eq(&s3, &s5),
"remove must invalidate the cached snapshot"
);
let s6 = reg.snapshot();
reg.remove("nonexistent");
let s7 = reg.snapshot();
assert!(
std::sync::Arc::ptr_eq(&s6, &s7),
"no-op remove must not invalidate the cached snapshot"
);
}
}