1use rusqlite::params;
4use serde::Serialize;
5
6use super::QueryEngine;
7
8pub struct SchemaParams {
10 pub upstream_url: Option<String>,
11 pub method: Option<String>,
12}
13
14pub struct SchemaChangesParams {
16 pub upstream_url: Option<String>,
17 pub method: Option<String>,
18 pub limit: i64,
19}
20
21#[derive(Debug, Clone, Serialize)]
23pub struct SchemaRow {
24 pub upstream_url: String,
25 pub method: String,
26 pub payload: String,
27 pub captured_at: i64,
28 pub schema_hash: String,
29}
30
31#[derive(Debug, Clone, Serialize)]
33pub struct SchemaChangeRow {
34 pub upstream_url: String,
35 pub method: String,
36 pub change_type: String,
37 pub item_name: Option<String>,
38 pub old_hash: Option<String>,
39 pub new_hash: Option<String>,
40 pub detected_at: i64,
41}
42
43pub struct SchemaUnusedParams {
45 pub proxy: Option<String>,
46 pub since_ts: i64,
47}
48
49#[derive(Debug, Clone, Serialize)]
51pub struct SchemaToolUsageRow {
52 pub tool_name: String,
53 pub description: String,
54 pub calls: i64,
55 pub errors: i64,
56 pub last_called_at: Option<i64>,
57}
58
59#[derive(Debug, Clone, Serialize)]
61pub struct SchemaStatusRow {
62 pub upstream_url: String,
63 pub status: String,
65 pub server_name: Option<String>,
66 pub server_version: Option<String>,
67 pub protocol_version: Option<String>,
68 pub capabilities: Vec<String>,
69 pub methods_captured: Vec<String>,
70 pub last_captured_at: Option<i64>,
71}
72
73impl QueryEngine {
74 pub fn schema(&self, params: &SchemaParams) -> Result<Vec<SchemaRow>, rusqlite::Error> {
76 let sql = "
77 SELECT upstream_url, method, payload, captured_at, schema_hash
78 FROM server_schema
79 WHERE (?1 IS NULL OR upstream_url = ?1)
80 AND (?2 IS NULL OR method = ?2)
81 ORDER BY upstream_url, method
82 ";
83 let mut stmt = self.conn().prepare(sql)?;
84 let rows = stmt.query_map(params![params.upstream_url, params.method], |row| {
85 Ok(SchemaRow {
86 upstream_url: row.get(0)?,
87 method: row.get(1)?,
88 payload: row.get(2)?,
89 captured_at: row.get(3)?,
90 schema_hash: row.get(4)?,
91 })
92 })?;
93 rows.collect()
94 }
95
96 pub fn schema_changes(
98 &self,
99 params: &SchemaChangesParams,
100 ) -> Result<Vec<SchemaChangeRow>, rusqlite::Error> {
101 let sql = "
102 SELECT upstream_url, method, change_type, item_name, old_hash, new_hash, detected_at
103 FROM schema_changes
104 WHERE (?1 IS NULL OR upstream_url = ?1)
105 AND (?2 IS NULL OR method = ?2)
106 ORDER BY detected_at DESC
107 LIMIT ?3
108 ";
109 let mut stmt = self.conn().prepare(sql)?;
110 let rows = stmt.query_map(
111 params![params.upstream_url, params.method, params.limit],
112 |row| {
113 Ok(SchemaChangeRow {
114 upstream_url: row.get(0)?,
115 method: row.get(1)?,
116 change_type: row.get(2)?,
117 item_name: row.get(3)?,
118 old_hash: row.get(4)?,
119 new_hash: row.get(5)?,
120 detected_at: row.get(6)?,
121 })
122 },
123 )?;
124 rows.collect()
125 }
126
127 pub fn schema_status(&self, upstream_url: &str) -> Result<SchemaStatusRow, rusqlite::Error> {
129 let methods_sql = "
130 SELECT method, captured_at FROM server_schema
131 WHERE upstream_url = ?1
132 ORDER BY method
133 ";
134 let mut stmt = self.conn().prepare(methods_sql)?;
135 let methods: Vec<(String, i64)> = stmt
136 .query_map(params![upstream_url], |row| Ok((row.get(0)?, row.get(1)?)))?
137 .collect::<Result<Vec<_>, _>>()?;
138
139 if methods.is_empty() {
140 return Ok(SchemaStatusRow {
141 upstream_url: upstream_url.to_string(),
142 status: "unknown".to_string(),
143 server_name: None,
144 server_version: None,
145 protocol_version: None,
146 capabilities: vec![],
147 methods_captured: vec![],
148 last_captured_at: None,
149 });
150 }
151
152 let method_names: Vec<String> = methods.iter().map(|(m, _)| m.clone()).collect();
153 let last_captured = methods.iter().map(|(_, ts)| *ts).max();
154
155 let (server_name, server_version, protocol_version, capabilities) =
157 self.extract_server_info(upstream_url);
158
159 let stale_sql = "
161 SELECT COUNT(*) FROM schema_changes sc
162 WHERE sc.upstream_url = ?1
163 AND sc.change_type = 'stale'
164 AND sc.detected_at > COALESCE(
165 (SELECT ss.captured_at FROM server_schema ss
166 WHERE ss.upstream_url = sc.upstream_url AND ss.method = sc.method),
167 0
168 )
169 ";
170 let stale_count: i64 = self
171 .conn()
172 .query_row(stale_sql, params![upstream_url], |row| row.get(0))?;
173 let is_stale = stale_count > 0;
174
175 let has_initialize = method_names.iter().any(|m| m == "initialize");
176 let list_methods = [
177 "tools/list",
178 "resources/list",
179 "resources/templates/list",
180 "prompts/list",
181 ];
182 let has_any_list = list_methods
183 .iter()
184 .any(|m| method_names.iter().any(|n| n == m));
185
186 let status = if is_stale {
187 "stale"
188 } else if has_initialize && has_any_list {
189 "complete"
190 } else {
191 "partial"
192 };
193
194 Ok(SchemaStatusRow {
195 upstream_url: upstream_url.to_string(),
196 status: status.to_string(),
197 server_name,
198 server_version,
199 protocol_version,
200 capabilities,
201 methods_captured: method_names,
202 last_captured_at: last_captured,
203 })
204 }
205
206 pub fn schema_unused(
209 &self,
210 params: &SchemaUnusedParams,
211 ) -> Result<Vec<SchemaToolUsageRow>, rusqlite::Error> {
212 let payload: Option<String> = self
214 .conn()
215 .query_row(
216 "SELECT payload FROM server_schema WHERE method = 'tools/list' LIMIT 1",
217 [],
218 |row| row.get(0),
219 )
220 .ok();
221
222 let payload = match payload {
223 Some(p) => p,
224 None => return Ok(vec![]),
225 };
226
227 let val: serde_json::Value = serde_json::from_str(&payload).unwrap_or_default();
229 let tools = match val.get("tools").and_then(|t| t.as_array()) {
230 Some(arr) => arr,
231 None => return Ok(vec![]),
232 };
233
234 let mut tool_info: Vec<(String, String)> = Vec::new();
235 for tool in tools {
236 let name = tool
237 .get("name")
238 .and_then(|n| n.as_str())
239 .unwrap_or("")
240 .to_string();
241 let desc = tool
242 .get("description")
243 .and_then(|d| d.as_str())
244 .unwrap_or("")
245 .to_string();
246 if !name.is_empty() {
247 tool_info.push((name, desc));
248 }
249 }
250
251 if tool_info.is_empty() {
252 return Ok(vec![]);
253 }
254
255 let sql = "
257 SELECT COUNT(*) as calls,
258 SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as errors,
259 MAX(ts) as last_called_at
260 FROM requests
261 WHERE (?1 IS NULL OR proxy = ?1) AND ts >= ?2 AND tool = ?3
262 ";
263
264 let mut result = Vec::new();
265 for (name, desc) in &tool_info {
266 let row =
267 self.conn()
268 .query_row(sql, params![params.proxy, params.since_ts, name], |row| {
269 Ok((
270 row.get::<_, i64>(0)?,
271 row.get::<_, i64>(1)?,
272 row.get::<_, Option<i64>>(2)?,
273 ))
274 });
275
276 let (calls, errors, last_called_at) = row.unwrap_or((0, 0, None));
277 result.push(SchemaToolUsageRow {
278 tool_name: name.clone(),
279 description: desc.clone(),
280 calls,
281 errors,
282 last_called_at,
283 });
284 }
285
286 result.sort_by(|a, b| a.calls.cmp(&b.calls));
288
289 Ok(result)
290 }
291
292 fn extract_server_info(
294 &self,
295 upstream_url: &str,
296 ) -> (Option<String>, Option<String>, Option<String>, Vec<String>) {
297 let payload: Option<String> = self
298 .conn()
299 .query_row(
300 "SELECT payload FROM server_schema WHERE upstream_url = ?1 AND method = 'initialize'",
301 params![upstream_url],
302 |row| row.get(0),
303 )
304 .ok();
305
306 let payload = match payload {
307 Some(p) => p,
308 None => return (None, None, None, vec![]),
309 };
310
311 let val: serde_json::Value = match serde_json::from_str(&payload) {
312 Ok(v) => v,
313 Err(_) => return (None, None, None, vec![]),
314 };
315
316 let server_name = val
317 .get("serverInfo")
318 .and_then(|i| i.get("name"))
319 .and_then(|n| n.as_str())
320 .map(String::from);
321 let server_version = val
322 .get("serverInfo")
323 .and_then(|i| i.get("version"))
324 .and_then(|v| v.as_str())
325 .map(String::from);
326 let protocol_version = val
327 .get("protocolVersion")
328 .and_then(|p| p.as_str())
329 .map(String::from);
330 let capabilities = val
331 .get("capabilities")
332 .and_then(|c| c.as_object())
333 .map(|obj| obj.keys().cloned().collect())
334 .unwrap_or_default();
335
336 (server_name, server_version, protocol_version, capabilities)
337 }
338}