1use rusqlite::params;
4use serde::Serialize;
5
6use super::QueryEngine;
7
8pub struct SchemaParams {
10 pub proxy: Option<String>,
12 pub method: Option<String>,
13}
14
15pub struct SchemaChangesParams {
17 pub proxy: Option<String>,
19 pub method: Option<String>,
20 pub limit: i64,
21}
22
23#[derive(Debug, Clone, Serialize)]
25pub struct SchemaRow {
26 pub upstream_url: String,
27 pub method: String,
28 pub payload: String,
29 pub captured_at: i64,
30 pub schema_hash: String,
31}
32
33#[derive(Debug, Clone)]
37pub struct LatestSchemaRow {
38 pub proxy: String,
39 pub upstream_url: String,
40 pub method: String,
41 pub payload: String,
42 pub captured_at: i64,
43 pub schema_hash: String,
44}
45
46#[derive(Debug, Clone, Serialize)]
48pub struct SchemaChangeRow {
49 pub upstream_url: String,
50 pub method: String,
51 pub change_type: String,
52 pub item_name: Option<String>,
53 pub old_hash: Option<String>,
54 pub new_hash: Option<String>,
55 pub detected_at: i64,
56}
57
58pub struct SchemaUnusedParams {
60 pub proxy: Option<String>,
61 pub since_ts: i64,
62}
63
64#[derive(Debug, Clone, Serialize)]
66pub struct SchemaToolUsageRow {
67 pub tool_name: String,
68 pub description: String,
69 pub calls: i64,
70 pub errors: i64,
71 pub last_called_at: Option<i64>,
72}
73
74#[derive(Debug, Clone, Serialize)]
76pub struct SchemaStatusRow {
77 pub upstream_url: String,
78 pub status: String,
80 pub server_name: Option<String>,
81 pub server_version: Option<String>,
82 pub protocol_version: Option<String>,
83 pub capabilities: Vec<String>,
84 pub methods_captured: Vec<String>,
85 pub last_captured_at: Option<i64>,
86}
87
88impl QueryEngine {
89 pub fn schema(&self, params: &SchemaParams) -> Result<Vec<SchemaRow>, rusqlite::Error> {
92 let sql = "
93 SELECT upstream_url, method, payload, captured_at, schema_hash
94 FROM server_schema
95 WHERE (?1 IS NULL OR proxy = ?1)
96 AND (?2 IS NULL OR method = ?2)
97 ORDER BY upstream_url, method
98 ";
99 let mut stmt = self.conn().prepare(sql)?;
100 let rows = stmt.query_map(params![params.proxy, params.method], |row| {
101 Ok(SchemaRow {
102 upstream_url: row.get(0)?,
103 method: row.get(1)?,
104 payload: row.get(2)?,
105 captured_at: row.get(3)?,
106 schema_hash: row.get(4)?,
107 })
108 })?;
109 rows.collect()
110 }
111
112 pub fn schema_changes(
115 &self,
116 params: &SchemaChangesParams,
117 ) -> Result<Vec<SchemaChangeRow>, rusqlite::Error> {
118 let sql = "
119 SELECT upstream_url, method, change_type, item_name, old_hash, new_hash, detected_at
120 FROM schema_changes
121 WHERE (?1 IS NULL OR proxy = ?1)
122 AND (?2 IS NULL OR method = ?2)
123 ORDER BY detected_at DESC
124 LIMIT ?3
125 ";
126 let mut stmt = self.conn().prepare(sql)?;
127 let rows = stmt.query_map(params![params.proxy, params.method, params.limit], |row| {
128 Ok(SchemaChangeRow {
129 upstream_url: row.get(0)?,
130 method: row.get(1)?,
131 change_type: row.get(2)?,
132 item_name: row.get(3)?,
133 old_hash: row.get(4)?,
134 new_hash: row.get(5)?,
135 detected_at: row.get(6)?,
136 })
137 })?;
138 rows.collect()
139 }
140
141 pub fn schema_status(&self, upstream_url: &str) -> Result<SchemaStatusRow, rusqlite::Error> {
143 let methods_sql = "
144 SELECT method, captured_at FROM server_schema
145 WHERE upstream_url = ?1
146 ORDER BY method
147 ";
148 let mut stmt = self.conn().prepare(methods_sql)?;
149 let methods: Vec<(String, i64)> = stmt
150 .query_map(params![upstream_url], |row| Ok((row.get(0)?, row.get(1)?)))?
151 .collect::<Result<Vec<_>, _>>()?;
152
153 if methods.is_empty() {
154 return Ok(SchemaStatusRow {
155 upstream_url: upstream_url.to_string(),
156 status: "unknown".to_string(),
157 server_name: None,
158 server_version: None,
159 protocol_version: None,
160 capabilities: vec![],
161 methods_captured: vec![],
162 last_captured_at: None,
163 });
164 }
165
166 let method_names: Vec<String> = methods.iter().map(|(m, _)| m.clone()).collect();
167 let last_captured = methods.iter().map(|(_, ts)| *ts).max();
168
169 let (server_name, server_version, protocol_version, capabilities) =
171 self.extract_server_info(upstream_url);
172
173 let has_initialize = method_names.iter().any(|m| m == "initialize");
174 let list_methods = [
175 "tools/list",
176 "resources/list",
177 "resources/templates/list",
178 "prompts/list",
179 ];
180 let has_any_list = list_methods
181 .iter()
182 .any(|m| method_names.iter().any(|n| n == m));
183
184 let status = if has_initialize && has_any_list {
185 "complete"
186 } else {
187 "partial"
188 };
189
190 Ok(SchemaStatusRow {
191 upstream_url: upstream_url.to_string(),
192 status: status.to_string(),
193 server_name,
194 server_version,
195 protocol_version,
196 capabilities,
197 methods_captured: method_names,
198 last_captured_at: last_captured,
199 })
200 }
201
202 pub fn latest_schema_row(
211 &self,
212 proxy: &str,
213 method: &str,
214 ) -> Result<Option<LatestSchemaRow>, rusqlite::Error> {
215 let sql = "
216 SELECT proxy, upstream_url, method, payload, captured_at, schema_hash
217 FROM server_schema
218 WHERE proxy = ?1 AND method = ?2
219 ORDER BY captured_at DESC
220 LIMIT 1
221 ";
222 self.conn()
223 .query_row(sql, params![proxy, method], |row| {
224 Ok(LatestSchemaRow {
225 proxy: row.get(0)?,
226 upstream_url: row.get(1)?,
227 method: row.get(2)?,
228 payload: row.get(3)?,
229 captured_at: row.get(4)?,
230 schema_hash: row.get(5)?,
231 })
232 })
233 .map(Some)
234 .or_else(|e| match e {
235 rusqlite::Error::QueryReturnedNoRows => Ok(None),
236 other => Err(other),
237 })
238 }
239
240 pub fn schema_unused(
243 &self,
244 params: &SchemaUnusedParams,
245 ) -> Result<Vec<SchemaToolUsageRow>, rusqlite::Error> {
246 let payload: Option<String> = self
248 .conn()
249 .query_row(
250 "SELECT payload FROM server_schema WHERE method = 'tools/list' LIMIT 1",
251 [],
252 |row| row.get(0),
253 )
254 .ok();
255
256 let payload = match payload {
257 Some(p) => p,
258 None => return Ok(vec![]),
259 };
260
261 let val: serde_json::Value = serde_json::from_str(&payload).unwrap_or_default();
263 let tools = match val.get("tools").and_then(|t| t.as_array()) {
264 Some(arr) => arr,
265 None => return Ok(vec![]),
266 };
267
268 let mut tool_info: Vec<(String, String)> = Vec::new();
269 for tool in tools {
270 let name = tool
271 .get("name")
272 .and_then(|n| n.as_str())
273 .unwrap_or("")
274 .to_string();
275 let desc = tool
276 .get("description")
277 .and_then(|d| d.as_str())
278 .unwrap_or("")
279 .to_string();
280 if !name.is_empty() {
281 tool_info.push((name, desc));
282 }
283 }
284
285 if tool_info.is_empty() {
286 return Ok(vec![]);
287 }
288
289 let sql = "
291 SELECT COUNT(*) as calls,
292 SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as errors,
293 MAX(ts) as last_called_at
294 FROM requests
295 WHERE (?1 IS NULL OR proxy = ?1) AND ts >= ?2 AND tool = ?3
296 ";
297
298 let mut result = Vec::new();
299 for (name, desc) in &tool_info {
300 let row =
301 self.conn()
302 .query_row(sql, params![params.proxy, params.since_ts, name], |row| {
303 Ok((
304 row.get::<_, i64>(0)?,
305 row.get::<_, i64>(1)?,
306 row.get::<_, Option<i64>>(2)?,
307 ))
308 });
309
310 let (calls, errors, last_called_at) = row.unwrap_or((0, 0, None));
311 result.push(SchemaToolUsageRow {
312 tool_name: name.clone(),
313 description: desc.clone(),
314 calls,
315 errors,
316 last_called_at,
317 });
318 }
319
320 result.sort_by(|a, b| a.calls.cmp(&b.calls));
322
323 Ok(result)
324 }
325
326 fn extract_server_info(
328 &self,
329 upstream_url: &str,
330 ) -> (Option<String>, Option<String>, Option<String>, Vec<String>) {
331 let payload: Option<String> = self
332 .conn()
333 .query_row(
334 "SELECT payload FROM server_schema WHERE upstream_url = ?1 AND method = 'initialize'",
335 params![upstream_url],
336 |row| row.get(0),
337 )
338 .ok();
339
340 let payload = match payload {
341 Some(p) => p,
342 None => return (None, None, None, vec![]),
343 };
344
345 let val: serde_json::Value = match serde_json::from_str(&payload) {
346 Ok(v) => v,
347 Err(_) => return (None, None, None, vec![]),
348 };
349
350 let server_name = val
351 .get("serverInfo")
352 .and_then(|i| i.get("name"))
353 .and_then(|n| n.as_str())
354 .map(String::from);
355 let server_version = val
356 .get("serverInfo")
357 .and_then(|i| i.get("version"))
358 .and_then(|v| v.as_str())
359 .map(String::from);
360 let protocol_version = val
361 .get("protocolVersion")
362 .and_then(|p| p.as_str())
363 .map(String::from);
364 let capabilities = val
365 .get("capabilities")
366 .and_then(|c| c.as_object())
367 .map(|obj| obj.keys().cloned().collect())
368 .unwrap_or_default();
369
370 (server_name, server_version, protocol_version, capabilities)
371 }
372}