use crate::base::errors::McpError;
use crate::base::health::HealthStatus;
use crate::base::subsystem::{ConnectArgs, RingId, Subsystem};
use crate::base::tool::ToolDescriptor;
use crate::base::wire::WireFormat;
use async_trait::async_trait;
use serde_json::Value;
use std::collections::{HashMap, HashSet};
pub struct FederationAggregator {
subsystems: Vec<Box<dyn Subsystem>>,
wire_format: WireFormat,
wire_to_internal: HashMap<String, String>,
}
impl FederationAggregator {
pub fn new(subsystems: Vec<Box<dyn Subsystem>>, wire_format: WireFormat) -> Self {
let mut seen = HashSet::new();
for s in &subsystems {
if !seen.insert(s.name().to_string()) {
panic!(
"FederationAggregator: duplicate subsystem name '{}'",
s.name()
);
}
debug_assert!(
!s.name().contains('.'),
"subsystem name '{}' contains a dot — federation prefix would produce ambiguous internal names; first-dot split would misroute",
s.name()
);
}
Self {
subsystems,
wire_format,
wire_to_internal: HashMap::new(),
}
}
pub async fn connect_all(&mut self, args: &ConnectArgs) -> Result<(), McpError> {
for s in &mut self.subsystems {
s.connect(args).await?;
}
self.rebuild_wire_map();
Ok(())
}
fn rebuild_wire_map(&mut self) {
self.wire_to_internal.clear();
for subsystem in &self.subsystems {
for tool in subsystem.tools() {
let internal = format!("{}.{}", subsystem.name(), tool.name);
let wire = self.wire_format.outbound(&internal);
self.wire_to_internal.insert(wire, internal);
}
}
}
}
#[async_trait]
impl Subsystem for FederationAggregator {
fn name(&self) -> &str {
"federation"
}
fn tools(&self) -> Vec<ToolDescriptor> {
self.subsystems
.iter()
.flat_map(|s| {
s.tools().into_iter().map(|td| {
ToolDescriptor {
name: format!("{}.{}", s.name(), td.name),
description: td.description,
input_schema: td.input_schema,
timeout_ms: td.timeout_ms,
}
})
})
.collect()
}
async fn connect(&mut self, args: &ConnectArgs) -> Result<(), McpError> {
self.connect_all(args).await
}
async fn disconnect(&mut self) -> Result<(), McpError> {
let mut errors: Vec<McpError> = Vec::new();
for s in &mut self.subsystems {
if let Err(e) = s.disconnect().await {
errors.push(e);
}
}
match errors.len() {
0 => Ok(()),
1 => Err(errors.into_iter().next().unwrap()),
_ => Err(McpError::Multiple(errors)),
}
}
async fn health_check(&self) -> Result<HealthStatus, McpError> {
use crate::base::health::HealthLevel;
let mut worst = HealthLevel::Healthy;
let mut degraded_reasons: Vec<(String, String)> = Vec::new();
let mut unhealthy: Option<(String, String)> = None;
for s in &self.subsystems {
match s.health_check().await {
Ok(h) => match h.level {
HealthLevel::Healthy => {}
HealthLevel::Degraded { reason } => {
degraded_reasons.push((s.name().to_string(), reason.clone()));
worst = HealthLevel::Degraded {
reason: if degraded_reasons.len() == 1 {
format!("{}: {}", s.name(), reason)
} else {
format!(
"{} subsystems degraded (see subsystem_specific.degraded)",
degraded_reasons.len()
)
},
};
}
HealthLevel::Unhealthy { reason } => {
unhealthy = Some((s.name().to_string(), reason.clone()));
worst = HealthLevel::Unhealthy {
reason: format!("{}: {}", s.name(), reason),
};
break; }
},
Err(e) => {
let reason = format!("health_check error: {e}");
unhealthy = Some((s.name().to_string(), reason.clone()));
worst = HealthLevel::Unhealthy {
reason: format!("{}: {}", s.name(), reason),
};
break;
}
}
}
let subsystem_specific = serde_json::json!({
"degraded": degraded_reasons.iter().map(|(n, r)| serde_json::json!({"subsystem": n, "reason": r})).collect::<Vec<_>>(),
"unhealthy": unhealthy.as_ref().map(|(n, r)| serde_json::json!({"subsystem": n, "reason": r})),
});
Ok(HealthStatus {
level: worst,
last_seen_unix_ms: 0, latency_ms: None,
version: env!("CARGO_PKG_VERSION").to_string(),
ring: None,
subsystem_specific,
})
}
async fn shutdown(&mut self) -> Result<(), McpError> {
let mut errors: Vec<McpError> = Vec::new();
for s in &mut self.subsystems {
if let Err(e) = s.shutdown().await {
errors.push(e);
}
}
match errors.len() {
0 => Ok(()),
1 => Err(errors.into_iter().next().unwrap()),
_ => Err(McpError::Multiple(errors)),
}
}
async fn invoke(&self, tool_name: &str, args: Value) -> Result<Value, McpError> {
let internal = self
.wire_to_internal
.get(tool_name)
.cloned()
.ok_or_else(|| McpError::ToolNotFound {
name: tool_name.to_string(),
})?;
let dot = internal
.find('.')
.ok_or_else(|| McpError::ToolNameMalformed {
name: internal.clone(),
reason: "no subsystem prefix (expected dot separator)".to_string(),
})?;
let prefix = &internal[..dot];
let local_name = &internal[dot + 1..];
let subsystem = self
.subsystems
.iter()
.find(|s| s.name() == prefix)
.ok_or_else(|| McpError::ToolNotFound {
name: tool_name.to_string(),
})?;
subsystem.invoke(local_name, args).await
}
fn carabiner_ring(&self) -> Option<RingId> {
None }
}
#[cfg(test)]
mod tests {
use super::*;
use crate::base::health::{HealthLevel, HealthStatus};
use crate::base::subsystem::ConnectArgs;
use crate::base::tool::ToolDescriptor;
use serde_json::{json, Value};
struct StubSubsystem {
name: &'static str,
tools: Vec<ToolDescriptor>,
}
impl StubSubsystem {
fn new(name: &'static str, tool_names: &[&'static str]) -> Self {
let tools = tool_names
.iter()
.map(|&n| {
ToolDescriptor::new(n, "stub tool", json!({"type": "object", "properties": {}}))
})
.collect();
Self { name, tools }
}
}
#[async_trait]
impl Subsystem for StubSubsystem {
fn name(&self) -> &str {
self.name
}
fn tools(&self) -> Vec<ToolDescriptor> {
self.tools.clone()
}
async fn connect(&mut self, _args: &ConnectArgs) -> Result<(), McpError> {
Ok(())
}
async fn disconnect(&mut self) -> Result<(), McpError> {
Ok(())
}
async fn health_check(&self) -> Result<HealthStatus, McpError> {
Ok(HealthStatus {
level: HealthLevel::Healthy,
last_seen_unix_ms: 0,
latency_ms: None,
version: "0.0.0".to_string(),
ring: None,
subsystem_specific: Value::Null,
})
}
async fn shutdown(&mut self) -> Result<(), McpError> {
Ok(())
}
async fn invoke(&self, tool_name: &str, _args: Value) -> Result<Value, McpError> {
Ok(json!({"dispatched_to": self.name, "tool": tool_name}))
}
}
#[tokio::test]
async fn mapping_table_dispatches_to_correct_subsystem() {
let subsystems: Vec<Box<dyn Subsystem>> = vec![
Box::new(StubSubsystem::new("esp32", &["dac_set"])),
Box::new(StubSubsystem::new("jumperless", &["connect"])),
];
let mut agg = FederationAggregator::new(subsystems, WireFormat::default());
let args = ConnectArgs {
port_override: None,
};
agg.connect_all(&args).await.unwrap();
let r1 = agg.invoke("esp32_dac_set", json!({})).await.unwrap();
assert_eq!(r1["dispatched_to"], "esp32");
assert_eq!(r1["tool"], "dac_set");
let r2 = agg.invoke("jumperless_connect", json!({})).await.unwrap();
assert_eq!(r2["dispatched_to"], "jumperless");
assert_eq!(r2["tool"], "connect");
}
#[tokio::test]
async fn empty_tool_segment_rejected() {
let subsystems: Vec<Box<dyn Subsystem>> =
vec![Box::new(StubSubsystem::new("jumperless", &["connect"]))];
let mut agg = FederationAggregator::new(subsystems, WireFormat::default());
let args = ConnectArgs {
port_override: None,
};
agg.connect_all(&args).await.unwrap();
let err = agg.invoke("jumperless_", json!({})).await.unwrap_err();
assert!(matches!(err, McpError::ToolNotFound { .. }));
}
#[test]
#[should_panic(expected = "duplicate subsystem name 'esp32'")]
fn duplicate_subsystem_name_panics() {
let _: FederationAggregator = FederationAggregator::new(
vec![
Box::new(StubSubsystem::new("esp32", &["tool_a"])),
Box::new(StubSubsystem::new("esp32", &["tool_b"])),
],
WireFormat::default(),
);
}
}