mcpr-integrations 0.4.70

External integrations for mcpr: cloud event sink, API client, and SQLite request storage
Documentation
//! Query: `mcpr proxy schema` — show captured MCP server schema and change history.

use rusqlite::params;
use serde::Serialize;

use super::QueryEngine;

/// Parameters for the schema snapshot query.
pub struct SchemaParams {
    /// Filter to a specific proxy name. `None` returns all proxies.
    pub proxy: Option<String>,
    pub method: Option<String>,
}

/// Parameters for the schema changes query.
pub struct SchemaChangesParams {
    /// Filter to a specific proxy name. `None` returns all proxies.
    pub proxy: Option<String>,
    pub method: Option<String>,
    pub limit: i64,
}

/// A single schema snapshot row from `server_schema`.
#[derive(Debug, Clone, Serialize)]
pub struct SchemaRow {
    pub upstream_url: String,
    pub method: String,
    pub payload: String,
    pub captured_at: i64,
    pub schema_hash: String,
}

/// A hydration-focused schema row. Adds `proxy` which the general
/// `SchemaRow` omits (it's not useful for the status-display consumers
/// that call `schema()`). Used at startup to seed `SchemaManager`.
#[derive(Debug, Clone)]
pub struct LatestSchemaRow {
    pub proxy: String,
    pub upstream_url: String,
    pub method: String,
    pub payload: String,
    pub captured_at: i64,
    pub schema_hash: String,
}

/// A schema change record from `schema_changes`.
#[derive(Debug, Clone, Serialize)]
pub struct SchemaChangeRow {
    pub upstream_url: String,
    pub method: String,
    pub change_type: String,
    pub item_name: Option<String>,
    pub old_hash: Option<String>,
    pub new_hash: Option<String>,
    pub detected_at: i64,
}

/// Parameters for the unused tools query.
pub struct SchemaUnusedParams {
    pub proxy: Option<String>,
    pub since_ts: i64,
}

/// A tool listed in the schema with its usage stats.
#[derive(Debug, Clone, Serialize)]
pub struct SchemaToolUsageRow {
    pub tool_name: String,
    pub description: String,
    pub calls: i64,
    pub errors: i64,
    pub last_called_at: Option<i64>,
}

/// Computed schema status for a given upstream.
#[derive(Debug, Clone, Serialize)]
pub struct SchemaStatusRow {
    pub upstream_url: String,
    /// "unknown", "partial", or "complete".
    pub status: String,
    pub server_name: Option<String>,
    pub server_version: Option<String>,
    pub protocol_version: Option<String>,
    pub capabilities: Vec<String>,
    pub methods_captured: Vec<String>,
    pub last_captured_at: Option<i64>,
}

impl QueryEngine {
    /// Fetch all captured schema snapshots, optionally filtered by proxy
    /// name and/or MCP method.
    pub fn schema(&self, params: &SchemaParams) -> Result<Vec<SchemaRow>, rusqlite::Error> {
        let sql = "
            SELECT upstream_url, method, payload, captured_at, schema_hash
            FROM server_schema
            WHERE (?1 IS NULL OR proxy = ?1)
              AND (?2 IS NULL OR method = ?2)
            ORDER BY upstream_url, method
        ";
        let mut stmt = self.conn().prepare(sql)?;
        let rows = stmt.query_map(params![params.proxy, params.method], |row| {
            Ok(SchemaRow {
                upstream_url: row.get(0)?,
                method: row.get(1)?,
                payload: row.get(2)?,
                captured_at: row.get(3)?,
                schema_hash: row.get(4)?,
            })
        })?;
        rows.collect()
    }

    /// Fetch schema change history, optionally filtered by proxy name
    /// and/or MCP method.
    pub fn schema_changes(
        &self,
        params: &SchemaChangesParams,
    ) -> Result<Vec<SchemaChangeRow>, rusqlite::Error> {
        let sql = "
            SELECT upstream_url, method, change_type, item_name, old_hash, new_hash, detected_at
            FROM schema_changes
            WHERE (?1 IS NULL OR proxy = ?1)
              AND (?2 IS NULL OR method = ?2)
            ORDER BY detected_at DESC
            LIMIT ?3
        ";
        let mut stmt = self.conn().prepare(sql)?;
        let rows = stmt.query_map(params![params.proxy, params.method, params.limit], |row| {
            Ok(SchemaChangeRow {
                upstream_url: row.get(0)?,
                method: row.get(1)?,
                change_type: row.get(2)?,
                item_name: row.get(3)?,
                old_hash: row.get(4)?,
                new_hash: row.get(5)?,
                detected_at: row.get(6)?,
            })
        })?;
        rows.collect()
    }

    /// Compute the schema status for a given upstream URL.
    pub fn schema_status(&self, upstream_url: &str) -> Result<SchemaStatusRow, rusqlite::Error> {
        let methods_sql = "
            SELECT method, captured_at FROM server_schema
            WHERE upstream_url = ?1
            ORDER BY method
        ";
        let mut stmt = self.conn().prepare(methods_sql)?;
        let methods: Vec<(String, i64)> = stmt
            .query_map(params![upstream_url], |row| Ok((row.get(0)?, row.get(1)?)))?
            .collect::<Result<Vec<_>, _>>()?;

        if methods.is_empty() {
            return Ok(SchemaStatusRow {
                upstream_url: upstream_url.to_string(),
                status: "unknown".to_string(),
                server_name: None,
                server_version: None,
                protocol_version: None,
                capabilities: vec![],
                methods_captured: vec![],
                last_captured_at: None,
            });
        }

        let method_names: Vec<String> = methods.iter().map(|(m, _)| m.clone()).collect();
        let last_captured = methods.iter().map(|(_, ts)| *ts).max();

        // Extract server info from initialize payload if available.
        let (server_name, server_version, protocol_version, capabilities) =
            self.extract_server_info(upstream_url);

        let has_initialize = method_names.iter().any(|m| m == "initialize");
        let list_methods = [
            "tools/list",
            "resources/list",
            "resources/templates/list",
            "prompts/list",
        ];
        let has_any_list = list_methods
            .iter()
            .any(|m| method_names.iter().any(|n| n == m));

        let status = if has_initialize && has_any_list {
            "complete"
        } else {
            "partial"
        };

        Ok(SchemaStatusRow {
            upstream_url: upstream_url.to_string(),
            status: status.to_string(),
            server_name,
            server_version,
            protocol_version,
            capabilities,
            methods_captured: method_names,
            last_captured_at: last_captured,
        })
    }

    /// Fetch the latest persisted schema row for `(proxy, method)` —
    /// used at proxy startup to hydrate `SchemaManager` from disk.
    ///
    /// Returns `None` when no row matches. `server_schema` is an
    /// UPSERT-keyed-by-(proxy, upstream_url, method) table, so the
    /// "latest" is simply the single row per key; we pick by highest
    /// `captured_at` to be robust against multiple upstream URLs that
    /// share the same proxy name (unlikely today but possible).
    pub fn latest_schema_row(
        &self,
        proxy: &str,
        method: &str,
    ) -> Result<Option<LatestSchemaRow>, rusqlite::Error> {
        let sql = "
            SELECT proxy, upstream_url, method, payload, captured_at, schema_hash
            FROM server_schema
            WHERE proxy = ?1 AND method = ?2
            ORDER BY captured_at DESC
            LIMIT 1
        ";
        self.conn()
            .query_row(sql, params![proxy, method], |row| {
                Ok(LatestSchemaRow {
                    proxy: row.get(0)?,
                    upstream_url: row.get(1)?,
                    method: row.get(2)?,
                    payload: row.get(3)?,
                    captured_at: row.get(4)?,
                    schema_hash: row.get(5)?,
                })
            })
            .map(Some)
            .or_else(|e| match e {
                rusqlite::Error::QueryReturnedNoRows => Ok(None),
                other => Err(other),
            })
    }

    /// Cross-reference captured tools/list schema with actual request logs.
    /// Returns all listed tools with their usage stats — unused tools have calls = 0.
    pub fn schema_unused(
        &self,
        params: &SchemaUnusedParams,
    ) -> Result<Vec<SchemaToolUsageRow>, rusqlite::Error> {
        // Step 1: Get the tools/list payload from server_schema.
        let payload: Option<String> = self
            .conn()
            .query_row(
                "SELECT payload FROM server_schema WHERE method = 'tools/list' LIMIT 1",
                [],
                |row| row.get(0),
            )
            .ok();

        let payload = match payload {
            Some(p) => p,
            None => return Ok(vec![]),
        };

        // Step 2: Parse tool names from the payload.
        let val: serde_json::Value = serde_json::from_str(&payload).unwrap_or_default();
        let tools = match val.get("tools").and_then(|t| t.as_array()) {
            Some(arr) => arr,
            None => return Ok(vec![]),
        };

        let mut tool_info: Vec<(String, String)> = Vec::new();
        for tool in tools {
            let name = tool
                .get("name")
                .and_then(|n| n.as_str())
                .unwrap_or("")
                .to_string();
            let desc = tool
                .get("description")
                .and_then(|d| d.as_str())
                .unwrap_or("")
                .to_string();
            if !name.is_empty() {
                tool_info.push((name, desc));
            }
        }

        if tool_info.is_empty() {
            return Ok(vec![]);
        }

        // Step 3: Query request logs for each tool's usage.
        let sql = "
            SELECT COUNT(*) as calls,
                   SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as errors,
                   MAX(ts) as last_called_at
            FROM requests
            WHERE (?1 IS NULL OR proxy = ?1) AND ts >= ?2 AND tool = ?3
        ";

        let mut result = Vec::new();
        for (name, desc) in &tool_info {
            let row =
                self.conn()
                    .query_row(sql, params![params.proxy, params.since_ts, name], |row| {
                        Ok((
                            row.get::<_, i64>(0)?,
                            row.get::<_, i64>(1)?,
                            row.get::<_, Option<i64>>(2)?,
                        ))
                    });

            let (calls, errors, last_called_at) = row.unwrap_or((0, 0, None));
            result.push(SchemaToolUsageRow {
                tool_name: name.clone(),
                description: desc.clone(),
                calls,
                errors,
                last_called_at,
            });
        }

        // Sort: unused first (calls = 0), then by calls ascending.
        result.sort_by(|a, b| a.calls.cmp(&b.calls));

        Ok(result)
    }

    /// Extract server info from a captured initialize payload.
    fn extract_server_info(
        &self,
        upstream_url: &str,
    ) -> (Option<String>, Option<String>, Option<String>, Vec<String>) {
        let payload: Option<String> = self
            .conn()
            .query_row(
                "SELECT payload FROM server_schema WHERE upstream_url = ?1 AND method = 'initialize'",
                params![upstream_url],
                |row| row.get(0),
            )
            .ok();

        let payload = match payload {
            Some(p) => p,
            None => return (None, None, None, vec![]),
        };

        let val: serde_json::Value = match serde_json::from_str(&payload) {
            Ok(v) => v,
            Err(_) => return (None, None, None, vec![]),
        };

        let server_name = val
            .get("serverInfo")
            .and_then(|i| i.get("name"))
            .and_then(|n| n.as_str())
            .map(String::from);
        let server_version = val
            .get("serverInfo")
            .and_then(|i| i.get("version"))
            .and_then(|v| v.as_str())
            .map(String::from);
        let protocol_version = val
            .get("protocolVersion")
            .and_then(|p| p.as_str())
            .map(String::from);
        let capabilities = val
            .get("capabilities")
            .and_then(|c| c.as_object())
            .map(|obj| obj.keys().cloned().collect())
            .unwrap_or_default();

        (server_name, server_version, protocol_version, capabilities)
    }
}