jumperless-mcp 0.1.0

MCP server for the Jumperless V5 — persistent USB-serial bridge exposing the firmware API to LLMs
//! Federation gateway support.
//!
//! [`FederationAggregator`] implements [`Subsystem`] itself — that's the fractal.
//! A federation is just a `Subsystem` that aggregates other `Subsystem`s, using
//! [`crate::base::wire::WireFormat`] to translate tool names between the internal
//! dot-separated namespace and the wire representation.
//!
//! # Inbound dispatch (F3 — mapping table)
//!
//! Wire → internal tool name translation uses a pre-built `HashMap` rather than
//! the longest-prefix heuristic. The map is populated in [`FederationAggregator::connect_all`]
//! after all subsystems have been connected and their tool catalogs are known.
//!
//! This eliminates the empty-tool-segment edge case (`"subsystem_"` → `Some("subsystem.")`
//! in the old heuristic) — a wire name is valid if and only if it appears in the map.

use crate::base::errors::McpError;
use crate::base::health::HealthStatus;
use crate::base::subsystem::{ConnectArgs, RingId, Subsystem};
use crate::base::tool::ToolDescriptor;
use crate::base::wire::WireFormat;
use async_trait::async_trait;
use serde_json::Value;
use std::collections::{HashMap, HashSet};

/// Aggregates multiple `Subsystem`s behind a single MCP endpoint.
///
/// Tool names are prefixed with `<subsystem.name()>.` in the internal namespace;
/// [`WireFormat`] translates them for the wire. Implements [`Subsystem`] itself,
/// so a `FederationAggregator` can be handed to `serve_mcp` exactly like any
/// other subsystem — or nested inside another `FederationAggregator`.
pub struct FederationAggregator {
    subsystems: Vec<Box<dyn Subsystem>>,
    wire_format: WireFormat,
    /// Mapping from wire name → internal dotted name, built at connect_all().
    /// Example entry: `"jumperless_connect"` → `"jumperless.connect"`.
    wire_to_internal: HashMap<String, String>,
}

impl FederationAggregator {
    /// Create a new aggregator.
    ///
    /// `wire_format` controls how dotted internal tool names are translated
    /// to wire names. Use [`WireFormat::default()`] (underscore-separated)
    /// for OSS-safe distribution.
    ///
    /// # Panics
    ///
    /// Panics if any two subsystems share the same name. Subsystem names must
    /// be unique within a federation to avoid dispatch ambiguity.
    pub fn new(subsystems: Vec<Box<dyn Subsystem>>, wire_format: WireFormat) -> Self {
        // F6: duplicate-name guard
        let mut seen = HashSet::new();
        for s in &subsystems {
            if !seen.insert(s.name().to_string()) {
                panic!(
                    "FederationAggregator: duplicate subsystem name '{}'",
                    s.name()
                );
            }
            debug_assert!(
                !s.name().contains('.'),
                "subsystem name '{}' contains a dot — federation prefix would produce ambiguous internal names; first-dot split would misroute",
                s.name()
            );
        }
        Self {
            subsystems,
            wire_format,
            wire_to_internal: HashMap::new(),
        }
    }

    /// Call `connect` on all member subsystems, then build the wire→internal mapping table.
    ///
    /// Stops at the first connection error. Phase 0b.4 may add partial-failure
    /// handling (mark degraded, continue).
    ///
    /// The mapping table is built after all subsystems are connected so that
    /// tool catalogs (which may be populated during `connect`) are fully available.
    pub async fn connect_all(&mut self, args: &ConnectArgs) -> Result<(), McpError> {
        for s in &mut self.subsystems {
            s.connect(args).await?;
        }
        // Build the wire→internal mapping table (F3).
        self.rebuild_wire_map();
        Ok(())
    }

    /// Rebuild the wire→internal name mapping from current subsystem tool catalogs.
    ///
    /// Called after `connect_all`. Can be re-called if subsystem tool catalogs
    /// change (e.g., hot-reload).
    fn rebuild_wire_map(&mut self) {
        self.wire_to_internal.clear();
        for subsystem in &self.subsystems {
            for tool in subsystem.tools() {
                // Internal name: "subsystem.tool"
                let internal = format!("{}.{}", subsystem.name(), tool.name);
                // Wire name: translated by the configured WireFormat
                let wire = self.wire_format.outbound(&internal);
                self.wire_to_internal.insert(wire, internal);
            }
        }
    }
}

#[async_trait]
impl Subsystem for FederationAggregator {
    fn name(&self) -> &str {
        // A federation aggregator acting as the top-level gateway.
        // Concrete gateway crates (mcp/gateway) will override this.
        "federation"
    }

    fn tools(&self) -> Vec<ToolDescriptor> {
        // Collect all subsystem tools and prefix each with `<subsystem.name>.`.
        // The transport layer applies WireFormat outbound translation to these
        // dotted internal names before emitting them to MCP clients (D8/D13/D15).
        // Inbound dispatch reverses via the pre-built wire_to_internal map (F3).
        self.subsystems
            .iter()
            .flat_map(|s| {
                s.tools().into_iter().map(|td| {
                    // Direct struct literal here intentionally bypasses ToolDescriptor::try_new()
                    // validation — federation internal names contain a dot prefix (subsystem.tool).
                    // Wire boundary applies WireFormat::outbound() before names reach rmcp.
                    // Subsystem name dot-freeness enforced at registration via debug_assert.
                    //
                    // Phase 0b.5 Wave 2 will use ToolDescriptor::timeout_ms here to set the
                    // read timeout before each tool's exec_code call. Tools declare their own
                    // budgets (fast queries 1-2s, medium ops 5s default, slow ops 30+s).
                    ToolDescriptor {
                        name: format!("{}.{}", s.name(), td.name),
                        description: td.description,
                        input_schema: td.input_schema,
                        timeout_ms: td.timeout_ms,
                    }
                })
            })
            .collect()
    }

    async fn connect(&mut self, args: &ConnectArgs) -> Result<(), McpError> {
        self.connect_all(args).await
    }

    async fn disconnect(&mut self) -> Result<(), McpError> {
        // F7: accumulate ALL errors, not just the first.
        let mut errors: Vec<McpError> = Vec::new();
        for s in &mut self.subsystems {
            if let Err(e) = s.disconnect().await {
                errors.push(e);
            }
        }
        match errors.len() {
            0 => Ok(()),
            1 => Err(errors.into_iter().next().unwrap()),
            _ => Err(McpError::Multiple(errors)),
        }
    }

    async fn health_check(&self) -> Result<HealthStatus, McpError> {
        // Aggregate health: Unhealthy if any member is Unhealthy;
        // Degraded if any member is Degraded; Healthy otherwise.
        // Short-circuits routing decision on the first Unhealthy result,
        // but accumulates ALL degraded + unhealthy reasons into subsystem_specific
        // so operators can see the full picture even after a short-circuit.
        use crate::base::health::HealthLevel;
        let mut worst = HealthLevel::Healthy;
        let mut degraded_reasons: Vec<(String, String)> = Vec::new();
        let mut unhealthy: Option<(String, String)> = None;

        for s in &self.subsystems {
            match s.health_check().await {
                Ok(h) => match h.level {
                    HealthLevel::Healthy => {}
                    HealthLevel::Degraded { reason } => {
                        degraded_reasons.push((s.name().to_string(), reason.clone()));
                        worst = HealthLevel::Degraded {
                            reason: if degraded_reasons.len() == 1 {
                                format!("{}: {}", s.name(), reason)
                            } else {
                                format!(
                                    "{} subsystems degraded (see subsystem_specific.degraded)",
                                    degraded_reasons.len()
                                )
                            },
                        };
                    }
                    HealthLevel::Unhealthy { reason } => {
                        unhealthy = Some((s.name().to_string(), reason.clone()));
                        worst = HealthLevel::Unhealthy {
                            reason: format!("{}: {}", s.name(), reason),
                        };
                        break; // Unhealthy is terminal for routing purposes.
                    }
                },
                Err(e) => {
                    let reason = format!("health_check error: {e}");
                    unhealthy = Some((s.name().to_string(), reason.clone()));
                    worst = HealthLevel::Unhealthy {
                        reason: format!("{}: {}", s.name(), reason),
                    };
                    break;
                }
            }
        }

        let subsystem_specific = serde_json::json!({
            "degraded": degraded_reasons.iter().map(|(n, r)| serde_json::json!({"subsystem": n, "reason": r})).collect::<Vec<_>>(),
            "unhealthy": unhealthy.as_ref().map(|(n, r)| serde_json::json!({"subsystem": n, "reason": r})),
        });

        Ok(HealthStatus {
            level: worst,
            last_seen_unix_ms: 0, // Federation does not track a unified timestamp.
            latency_ms: None,
            version: env!("CARGO_PKG_VERSION").to_string(),
            ring: None,
            subsystem_specific,
        })
    }

    async fn shutdown(&mut self) -> Result<(), McpError> {
        // F7: accumulate ALL errors, not just the first.
        let mut errors: Vec<McpError> = Vec::new();
        for s in &mut self.subsystems {
            if let Err(e) = s.shutdown().await {
                errors.push(e);
            }
        }
        match errors.len() {
            0 => Ok(()),
            1 => Err(errors.into_iter().next().unwrap()),
            _ => Err(McpError::Multiple(errors)),
        }
    }

    async fn invoke(&self, tool_name: &str, args: Value) -> Result<Value, McpError> {
        // F3: map-based dispatch (replaces longest-prefix heuristic).
        let internal = self
            .wire_to_internal
            .get(tool_name)
            .cloned()
            .ok_or_else(|| McpError::ToolNotFound {
                name: tool_name.to_string(),
            })?;

        // Split "subsystem.local_tool" into prefix + local name.
        let dot = internal
            .find('.')
            .ok_or_else(|| McpError::ToolNameMalformed {
                name: internal.clone(),
                reason: "no subsystem prefix (expected dot separator)".to_string(),
            })?;
        let prefix = &internal[..dot];
        let local_name = &internal[dot + 1..];

        let subsystem = self
            .subsystems
            .iter()
            .find(|s| s.name() == prefix)
            .ok_or_else(|| McpError::ToolNotFound {
                name: tool_name.to_string(),
            })?;

        subsystem.invoke(local_name, args).await
    }

    fn carabiner_ring(&self) -> Option<RingId> {
        None // A top-level federation gateway is ungated.
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::base::health::{HealthLevel, HealthStatus};
    use crate::base::subsystem::ConnectArgs;
    use crate::base::tool::ToolDescriptor;
    use serde_json::{json, Value};

    // ── Minimal subsystem stub for tests ────────────────────────────────────

    struct StubSubsystem {
        name: &'static str,
        tools: Vec<ToolDescriptor>,
    }

    impl StubSubsystem {
        fn new(name: &'static str, tool_names: &[&'static str]) -> Self {
            let tools = tool_names
                .iter()
                .map(|&n| {
                    ToolDescriptor::new(n, "stub tool", json!({"type": "object", "properties": {}}))
                })
                .collect();
            Self { name, tools }
        }
    }

    #[async_trait]
    impl Subsystem for StubSubsystem {
        fn name(&self) -> &str {
            self.name
        }
        fn tools(&self) -> Vec<ToolDescriptor> {
            self.tools.clone()
        }
        async fn connect(&mut self, _args: &ConnectArgs) -> Result<(), McpError> {
            Ok(())
        }
        async fn disconnect(&mut self) -> Result<(), McpError> {
            Ok(())
        }
        async fn health_check(&self) -> Result<HealthStatus, McpError> {
            Ok(HealthStatus {
                level: HealthLevel::Healthy,
                last_seen_unix_ms: 0,
                latency_ms: None,
                version: "0.0.0".to_string(),
                ring: None,
                subsystem_specific: Value::Null,
            })
        }
        async fn shutdown(&mut self) -> Result<(), McpError> {
            Ok(())
        }
        async fn invoke(&self, tool_name: &str, _args: Value) -> Result<Value, McpError> {
            Ok(json!({"dispatched_to": self.name, "tool": tool_name}))
        }
    }

    /// F3: mapping table dispatches correctly to two registered subsystems.
    #[tokio::test]
    async fn mapping_table_dispatches_to_correct_subsystem() {
        let subsystems: Vec<Box<dyn Subsystem>> = vec![
            Box::new(StubSubsystem::new("esp32", &["dac_set"])),
            Box::new(StubSubsystem::new("jumperless", &["connect"])),
        ];
        let mut agg = FederationAggregator::new(subsystems, WireFormat::default());
        let args = ConnectArgs {
            port_override: None,
        };
        agg.connect_all(&args).await.unwrap();

        // UnderscoreSeparated wires: esp32_dac_set, jumperless_connect
        let r1 = agg.invoke("esp32_dac_set", json!({})).await.unwrap();
        assert_eq!(r1["dispatched_to"], "esp32");
        assert_eq!(r1["tool"], "dac_set");

        let r2 = agg.invoke("jumperless_connect", json!({})).await.unwrap();
        assert_eq!(r2["dispatched_to"], "jumperless");
        assert_eq!(r2["tool"], "connect");
    }

    /// F3: empty-tool-segment naturally rejected — not in map.
    #[tokio::test]
    async fn empty_tool_segment_rejected() {
        let subsystems: Vec<Box<dyn Subsystem>> =
            vec![Box::new(StubSubsystem::new("jumperless", &["connect"]))];
        let mut agg = FederationAggregator::new(subsystems, WireFormat::default());
        let args = ConnectArgs {
            port_override: None,
        };
        agg.connect_all(&args).await.unwrap();

        // "jumperless_" has an empty tool segment — never registered, so ToolNotFound.
        let err = agg.invoke("jumperless_", json!({})).await.unwrap_err();
        assert!(matches!(err, McpError::ToolNotFound { .. }));
    }

    /// F6: duplicate subsystem name panics at construction.
    #[test]
    #[should_panic(expected = "duplicate subsystem name 'esp32'")]
    fn duplicate_subsystem_name_panics() {
        let _: FederationAggregator = FederationAggregator::new(
            vec![
                Box::new(StubSubsystem::new("esp32", &["tool_a"])),
                Box::new(StubSubsystem::new("esp32", &["tool_b"])),
            ],
            WireFormat::default(),
        );
    }
}