mcpr_integrations/store/query/
schema.rs1use rusqlite::params;
4use serde::Serialize;
5
6use super::QueryEngine;
7
8pub struct SchemaParams {
10 pub proxy: Option<String>,
12 pub method: Option<String>,
13}
14
15pub struct SchemaChangesParams {
17 pub proxy: Option<String>,
19 pub method: Option<String>,
20 pub limit: i64,
21}
22
23#[derive(Debug, Clone, Serialize)]
25pub struct SchemaRow {
26 pub upstream_url: String,
27 pub method: String,
28 pub payload: String,
29 pub captured_at: i64,
30 pub schema_hash: String,
31}
32
33#[derive(Debug, Clone, Serialize)]
35pub struct SchemaChangeRow {
36 pub upstream_url: String,
37 pub method: String,
38 pub change_type: String,
39 pub item_name: Option<String>,
40 pub old_hash: Option<String>,
41 pub new_hash: Option<String>,
42 pub detected_at: i64,
43}
44
45pub struct SchemaUnusedParams {
47 pub proxy: Option<String>,
48 pub since_ts: i64,
49}
50
51#[derive(Debug, Clone, Serialize)]
53pub struct SchemaToolUsageRow {
54 pub tool_name: String,
55 pub description: String,
56 pub calls: i64,
57 pub errors: i64,
58 pub last_called_at: Option<i64>,
59}
60
61#[derive(Debug, Clone, Serialize)]
63pub struct SchemaStatusRow {
64 pub upstream_url: String,
65 pub status: String,
67 pub server_name: Option<String>,
68 pub server_version: Option<String>,
69 pub protocol_version: Option<String>,
70 pub capabilities: Vec<String>,
71 pub methods_captured: Vec<String>,
72 pub last_captured_at: Option<i64>,
73}
74
75impl QueryEngine {
76 pub fn schema(&self, params: &SchemaParams) -> Result<Vec<SchemaRow>, rusqlite::Error> {
79 let sql = "
80 SELECT upstream_url, method, payload, captured_at, schema_hash
81 FROM server_schema
82 WHERE (?1 IS NULL OR proxy = ?1)
83 AND (?2 IS NULL OR method = ?2)
84 ORDER BY upstream_url, method
85 ";
86 let mut stmt = self.conn().prepare(sql)?;
87 let rows = stmt.query_map(params![params.proxy, params.method], |row| {
88 Ok(SchemaRow {
89 upstream_url: row.get(0)?,
90 method: row.get(1)?,
91 payload: row.get(2)?,
92 captured_at: row.get(3)?,
93 schema_hash: row.get(4)?,
94 })
95 })?;
96 rows.collect()
97 }
98
99 pub fn schema_changes(
102 &self,
103 params: &SchemaChangesParams,
104 ) -> Result<Vec<SchemaChangeRow>, rusqlite::Error> {
105 let sql = "
106 SELECT upstream_url, method, change_type, item_name, old_hash, new_hash, detected_at
107 FROM schema_changes
108 WHERE (?1 IS NULL OR proxy = ?1)
109 AND (?2 IS NULL OR method = ?2)
110 ORDER BY detected_at DESC
111 LIMIT ?3
112 ";
113 let mut stmt = self.conn().prepare(sql)?;
114 let rows = stmt.query_map(params![params.proxy, params.method, params.limit], |row| {
115 Ok(SchemaChangeRow {
116 upstream_url: row.get(0)?,
117 method: row.get(1)?,
118 change_type: row.get(2)?,
119 item_name: row.get(3)?,
120 old_hash: row.get(4)?,
121 new_hash: row.get(5)?,
122 detected_at: row.get(6)?,
123 })
124 })?;
125 rows.collect()
126 }
127
128 pub fn schema_status(&self, upstream_url: &str) -> Result<SchemaStatusRow, rusqlite::Error> {
130 let methods_sql = "
131 SELECT method, captured_at FROM server_schema
132 WHERE upstream_url = ?1
133 ORDER BY method
134 ";
135 let mut stmt = self.conn().prepare(methods_sql)?;
136 let methods: Vec<(String, i64)> = stmt
137 .query_map(params![upstream_url], |row| Ok((row.get(0)?, row.get(1)?)))?
138 .collect::<Result<Vec<_>, _>>()?;
139
140 if methods.is_empty() {
141 return Ok(SchemaStatusRow {
142 upstream_url: upstream_url.to_string(),
143 status: "unknown".to_string(),
144 server_name: None,
145 server_version: None,
146 protocol_version: None,
147 capabilities: vec![],
148 methods_captured: vec![],
149 last_captured_at: None,
150 });
151 }
152
153 let method_names: Vec<String> = methods.iter().map(|(m, _)| m.clone()).collect();
154 let last_captured = methods.iter().map(|(_, ts)| *ts).max();
155
156 let (server_name, server_version, protocol_version, capabilities) =
158 self.extract_server_info(upstream_url);
159
160 let has_initialize = method_names.iter().any(|m| m == "initialize");
161 let list_methods = [
162 "tools/list",
163 "resources/list",
164 "resources/templates/list",
165 "prompts/list",
166 ];
167 let has_any_list = list_methods
168 .iter()
169 .any(|m| method_names.iter().any(|n| n == m));
170
171 let status = if has_initialize && has_any_list {
172 "complete"
173 } else {
174 "partial"
175 };
176
177 Ok(SchemaStatusRow {
178 upstream_url: upstream_url.to_string(),
179 status: status.to_string(),
180 server_name,
181 server_version,
182 protocol_version,
183 capabilities,
184 methods_captured: method_names,
185 last_captured_at: last_captured,
186 })
187 }
188
189 pub fn schema_unused(
192 &self,
193 params: &SchemaUnusedParams,
194 ) -> Result<Vec<SchemaToolUsageRow>, rusqlite::Error> {
195 let payload: Option<String> = self
197 .conn()
198 .query_row(
199 "SELECT payload FROM server_schema WHERE method = 'tools/list' LIMIT 1",
200 [],
201 |row| row.get(0),
202 )
203 .ok();
204
205 let payload = match payload {
206 Some(p) => p,
207 None => return Ok(vec![]),
208 };
209
210 let val: serde_json::Value = serde_json::from_str(&payload).unwrap_or_default();
212 let tools = match val.get("tools").and_then(|t| t.as_array()) {
213 Some(arr) => arr,
214 None => return Ok(vec![]),
215 };
216
217 let mut tool_info: Vec<(String, String)> = Vec::new();
218 for tool in tools {
219 let name = tool
220 .get("name")
221 .and_then(|n| n.as_str())
222 .unwrap_or("")
223 .to_string();
224 let desc = tool
225 .get("description")
226 .and_then(|d| d.as_str())
227 .unwrap_or("")
228 .to_string();
229 if !name.is_empty() {
230 tool_info.push((name, desc));
231 }
232 }
233
234 if tool_info.is_empty() {
235 return Ok(vec![]);
236 }
237
238 let sql = "
240 SELECT COUNT(*) as calls,
241 SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as errors,
242 MAX(ts) as last_called_at
243 FROM requests
244 WHERE (?1 IS NULL OR proxy = ?1) AND ts >= ?2 AND tool = ?3
245 ";
246
247 let mut result = Vec::new();
248 for (name, desc) in &tool_info {
249 let row =
250 self.conn()
251 .query_row(sql, params![params.proxy, params.since_ts, name], |row| {
252 Ok((
253 row.get::<_, i64>(0)?,
254 row.get::<_, i64>(1)?,
255 row.get::<_, Option<i64>>(2)?,
256 ))
257 });
258
259 let (calls, errors, last_called_at) = row.unwrap_or((0, 0, None));
260 result.push(SchemaToolUsageRow {
261 tool_name: name.clone(),
262 description: desc.clone(),
263 calls,
264 errors,
265 last_called_at,
266 });
267 }
268
269 result.sort_by(|a, b| a.calls.cmp(&b.calls));
271
272 Ok(result)
273 }
274
275 fn extract_server_info(
277 &self,
278 upstream_url: &str,
279 ) -> (Option<String>, Option<String>, Option<String>, Vec<String>) {
280 let payload: Option<String> = self
281 .conn()
282 .query_row(
283 "SELECT payload FROM server_schema WHERE upstream_url = ?1 AND method = 'initialize'",
284 params![upstream_url],
285 |row| row.get(0),
286 )
287 .ok();
288
289 let payload = match payload {
290 Some(p) => p,
291 None => return (None, None, None, vec![]),
292 };
293
294 let val: serde_json::Value = match serde_json::from_str(&payload) {
295 Ok(v) => v,
296 Err(_) => return (None, None, None, vec![]),
297 };
298
299 let server_name = val
300 .get("serverInfo")
301 .and_then(|i| i.get("name"))
302 .and_then(|n| n.as_str())
303 .map(String::from);
304 let server_version = val
305 .get("serverInfo")
306 .and_then(|i| i.get("version"))
307 .and_then(|v| v.as_str())
308 .map(String::from);
309 let protocol_version = val
310 .get("protocolVersion")
311 .and_then(|p| p.as_str())
312 .map(String::from);
313 let capabilities = val
314 .get("capabilities")
315 .and_then(|c| c.as_object())
316 .map(|obj| obj.keys().cloned().collect())
317 .unwrap_or_default();
318
319 (server_name, server_version, protocol_version, capabilities)
320 }
321}