use rusqlite::params;
use serde::Serialize;
use super::QueryEngine;
pub struct SchemaParams {
pub proxy: Option<String>,
pub method: Option<String>,
}
pub struct SchemaChangesParams {
pub proxy: Option<String>,
pub method: Option<String>,
pub limit: i64,
}
#[derive(Debug, Clone, Serialize)]
pub struct SchemaRow {
pub upstream_url: String,
pub method: String,
pub payload: String,
pub captured_at: i64,
pub schema_hash: String,
}
#[derive(Debug, Clone)]
pub struct LatestSchemaRow {
pub proxy: String,
pub upstream_url: String,
pub method: String,
pub payload: String,
pub captured_at: i64,
pub schema_hash: String,
}
#[derive(Debug, Clone, Serialize)]
pub struct SchemaChangeRow {
pub upstream_url: String,
pub method: String,
pub change_type: String,
pub item_name: Option<String>,
pub old_hash: Option<String>,
pub new_hash: Option<String>,
pub detected_at: i64,
}
pub struct SchemaUnusedParams {
pub proxy: Option<String>,
pub since_ts: i64,
}
#[derive(Debug, Clone, Serialize)]
pub struct SchemaToolUsageRow {
pub tool_name: String,
pub description: String,
pub calls: i64,
pub errors: i64,
pub last_called_at: Option<i64>,
}
#[derive(Debug, Clone, Serialize)]
pub struct SchemaStatusRow {
pub upstream_url: String,
pub status: String,
pub server_name: Option<String>,
pub server_version: Option<String>,
pub protocol_version: Option<String>,
pub capabilities: Vec<String>,
pub methods_captured: Vec<String>,
pub last_captured_at: Option<i64>,
}
impl QueryEngine {
pub fn schema(&self, params: &SchemaParams) -> Result<Vec<SchemaRow>, rusqlite::Error> {
let sql = "
SELECT upstream_url, method, payload, captured_at, schema_hash
FROM server_schema
WHERE (?1 IS NULL OR proxy = ?1)
AND (?2 IS NULL OR method = ?2)
ORDER BY upstream_url, method
";
let mut stmt = self.conn().prepare(sql)?;
let rows = stmt.query_map(params![params.proxy, params.method], |row| {
Ok(SchemaRow {
upstream_url: row.get(0)?,
method: row.get(1)?,
payload: row.get(2)?,
captured_at: row.get(3)?,
schema_hash: row.get(4)?,
})
})?;
rows.collect()
}
pub fn schema_changes(
&self,
params: &SchemaChangesParams,
) -> Result<Vec<SchemaChangeRow>, rusqlite::Error> {
let sql = "
SELECT upstream_url, method, change_type, item_name, old_hash, new_hash, detected_at
FROM schema_changes
WHERE (?1 IS NULL OR proxy = ?1)
AND (?2 IS NULL OR method = ?2)
ORDER BY detected_at DESC
LIMIT ?3
";
let mut stmt = self.conn().prepare(sql)?;
let rows = stmt.query_map(params![params.proxy, params.method, params.limit], |row| {
Ok(SchemaChangeRow {
upstream_url: row.get(0)?,
method: row.get(1)?,
change_type: row.get(2)?,
item_name: row.get(3)?,
old_hash: row.get(4)?,
new_hash: row.get(5)?,
detected_at: row.get(6)?,
})
})?;
rows.collect()
}
pub fn schema_status(&self, upstream_url: &str) -> Result<SchemaStatusRow, rusqlite::Error> {
let methods_sql = "
SELECT method, captured_at FROM server_schema
WHERE upstream_url = ?1
ORDER BY method
";
let mut stmt = self.conn().prepare(methods_sql)?;
let methods: Vec<(String, i64)> = stmt
.query_map(params![upstream_url], |row| Ok((row.get(0)?, row.get(1)?)))?
.collect::<Result<Vec<_>, _>>()?;
if methods.is_empty() {
return Ok(SchemaStatusRow {
upstream_url: upstream_url.to_string(),
status: "unknown".to_string(),
server_name: None,
server_version: None,
protocol_version: None,
capabilities: vec![],
methods_captured: vec![],
last_captured_at: None,
});
}
let method_names: Vec<String> = methods.iter().map(|(m, _)| m.clone()).collect();
let last_captured = methods.iter().map(|(_, ts)| *ts).max();
let (server_name, server_version, protocol_version, capabilities) =
self.extract_server_info(upstream_url);
let has_initialize = method_names.iter().any(|m| m == "initialize");
let list_methods = [
"tools/list",
"resources/list",
"resources/templates/list",
"prompts/list",
];
let has_any_list = list_methods
.iter()
.any(|m| method_names.iter().any(|n| n == m));
let status = if has_initialize && has_any_list {
"complete"
} else {
"partial"
};
Ok(SchemaStatusRow {
upstream_url: upstream_url.to_string(),
status: status.to_string(),
server_name,
server_version,
protocol_version,
capabilities,
methods_captured: method_names,
last_captured_at: last_captured,
})
}
pub fn latest_schema_row(
&self,
proxy: &str,
method: &str,
) -> Result<Option<LatestSchemaRow>, rusqlite::Error> {
let sql = "
SELECT proxy, upstream_url, method, payload, captured_at, schema_hash
FROM server_schema
WHERE proxy = ?1 AND method = ?2
ORDER BY captured_at DESC
LIMIT 1
";
self.conn()
.query_row(sql, params![proxy, method], |row| {
Ok(LatestSchemaRow {
proxy: row.get(0)?,
upstream_url: row.get(1)?,
method: row.get(2)?,
payload: row.get(3)?,
captured_at: row.get(4)?,
schema_hash: row.get(5)?,
})
})
.map(Some)
.or_else(|e| match e {
rusqlite::Error::QueryReturnedNoRows => Ok(None),
other => Err(other),
})
}
pub fn schema_unused(
&self,
params: &SchemaUnusedParams,
) -> Result<Vec<SchemaToolUsageRow>, rusqlite::Error> {
let payload: Option<String> = self
.conn()
.query_row(
"SELECT payload FROM server_schema WHERE method = 'tools/list' LIMIT 1",
[],
|row| row.get(0),
)
.ok();
let payload = match payload {
Some(p) => p,
None => return Ok(vec![]),
};
let val: serde_json::Value = serde_json::from_str(&payload).unwrap_or_default();
let tools = match val.get("tools").and_then(|t| t.as_array()) {
Some(arr) => arr,
None => return Ok(vec![]),
};
let mut tool_info: Vec<(String, String)> = Vec::new();
for tool in tools {
let name = tool
.get("name")
.and_then(|n| n.as_str())
.unwrap_or("")
.to_string();
let desc = tool
.get("description")
.and_then(|d| d.as_str())
.unwrap_or("")
.to_string();
if !name.is_empty() {
tool_info.push((name, desc));
}
}
if tool_info.is_empty() {
return Ok(vec![]);
}
let sql = "
SELECT COUNT(*) as calls,
SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as errors,
MAX(ts) as last_called_at
FROM requests
WHERE (?1 IS NULL OR proxy = ?1) AND ts >= ?2 AND tool = ?3
";
let mut result = Vec::new();
for (name, desc) in &tool_info {
let row =
self.conn()
.query_row(sql, params![params.proxy, params.since_ts, name], |row| {
Ok((
row.get::<_, i64>(0)?,
row.get::<_, i64>(1)?,
row.get::<_, Option<i64>>(2)?,
))
});
let (calls, errors, last_called_at) = row.unwrap_or((0, 0, None));
result.push(SchemaToolUsageRow {
tool_name: name.clone(),
description: desc.clone(),
calls,
errors,
last_called_at,
});
}
result.sort_by(|a, b| a.calls.cmp(&b.calls));
Ok(result)
}
fn extract_server_info(
&self,
upstream_url: &str,
) -> (Option<String>, Option<String>, Option<String>, Vec<String>) {
let payload: Option<String> = self
.conn()
.query_row(
"SELECT payload FROM server_schema WHERE upstream_url = ?1 AND method = 'initialize'",
params![upstream_url],
|row| row.get(0),
)
.ok();
let payload = match payload {
Some(p) => p,
None => return (None, None, None, vec![]),
};
let val: serde_json::Value = match serde_json::from_str(&payload) {
Ok(v) => v,
Err(_) => return (None, None, None, vec![]),
};
let server_name = val
.get("serverInfo")
.and_then(|i| i.get("name"))
.and_then(|n| n.as_str())
.map(String::from);
let server_version = val
.get("serverInfo")
.and_then(|i| i.get("version"))
.and_then(|v| v.as_str())
.map(String::from);
let protocol_version = val
.get("protocolVersion")
.and_then(|p| p.as_str())
.map(String::from);
let capabilities = val
.get("capabilities")
.and_then(|c| c.as_object())
.map(|obj| obj.keys().cloned().collect())
.unwrap_or_default();
(server_name, server_version, protocol_version, capabilities)
}
}