Skip to main content

mcpr_integrations/store/query/
schema.rs

1//! Query: `mcpr proxy schema` — show captured MCP server schema and change history.
2
3use rusqlite::params;
4use serde::Serialize;
5
6use super::QueryEngine;
7
8/// Parameters for the schema snapshot query.
9pub struct SchemaParams {
10    pub upstream_url: Option<String>,
11    pub method: Option<String>,
12}
13
14/// Parameters for the schema changes query.
15pub struct SchemaChangesParams {
16    pub upstream_url: Option<String>,
17    pub method: Option<String>,
18    pub limit: i64,
19}
20
21/// A single schema snapshot row from `server_schema`.
22#[derive(Debug, Clone, Serialize)]
23pub struct SchemaRow {
24    pub upstream_url: String,
25    pub method: String,
26    pub payload: String,
27    pub captured_at: i64,
28    pub schema_hash: String,
29}
30
31/// A schema change record from `schema_changes`.
32#[derive(Debug, Clone, Serialize)]
33pub struct SchemaChangeRow {
34    pub upstream_url: String,
35    pub method: String,
36    pub change_type: String,
37    pub item_name: Option<String>,
38    pub old_hash: Option<String>,
39    pub new_hash: Option<String>,
40    pub detected_at: i64,
41}
42
43/// Parameters for the unused tools query.
44pub struct SchemaUnusedParams {
45    pub proxy: Option<String>,
46    pub since_ts: i64,
47}
48
49/// A tool listed in the schema with its usage stats.
50#[derive(Debug, Clone, Serialize)]
51pub struct SchemaToolUsageRow {
52    pub tool_name: String,
53    pub description: String,
54    pub calls: i64,
55    pub errors: i64,
56    pub last_called_at: Option<i64>,
57}
58
59/// Computed schema status for a given upstream.
60#[derive(Debug, Clone, Serialize)]
61pub struct SchemaStatusRow {
62    pub upstream_url: String,
63    /// "unknown", "partial", "complete", "stale"
64    pub status: String,
65    pub server_name: Option<String>,
66    pub server_version: Option<String>,
67    pub protocol_version: Option<String>,
68    pub capabilities: Vec<String>,
69    pub methods_captured: Vec<String>,
70    pub last_captured_at: Option<i64>,
71}
72
73impl QueryEngine {
74    /// Fetch all captured schema snapshots, optionally filtered.
75    pub fn schema(&self, params: &SchemaParams) -> Result<Vec<SchemaRow>, rusqlite::Error> {
76        let sql = "
77            SELECT upstream_url, method, payload, captured_at, schema_hash
78            FROM server_schema
79            WHERE (?1 IS NULL OR upstream_url = ?1)
80              AND (?2 IS NULL OR method = ?2)
81            ORDER BY upstream_url, method
82        ";
83        let mut stmt = self.conn().prepare(sql)?;
84        let rows = stmt.query_map(params![params.upstream_url, params.method], |row| {
85            Ok(SchemaRow {
86                upstream_url: row.get(0)?,
87                method: row.get(1)?,
88                payload: row.get(2)?,
89                captured_at: row.get(3)?,
90                schema_hash: row.get(4)?,
91            })
92        })?;
93        rows.collect()
94    }
95
96    /// Fetch schema change history.
97    pub fn schema_changes(
98        &self,
99        params: &SchemaChangesParams,
100    ) -> Result<Vec<SchemaChangeRow>, rusqlite::Error> {
101        let sql = "
102            SELECT upstream_url, method, change_type, item_name, old_hash, new_hash, detected_at
103            FROM schema_changes
104            WHERE (?1 IS NULL OR upstream_url = ?1)
105              AND (?2 IS NULL OR method = ?2)
106            ORDER BY detected_at DESC
107            LIMIT ?3
108        ";
109        let mut stmt = self.conn().prepare(sql)?;
110        let rows = stmt.query_map(
111            params![params.upstream_url, params.method, params.limit],
112            |row| {
113                Ok(SchemaChangeRow {
114                    upstream_url: row.get(0)?,
115                    method: row.get(1)?,
116                    change_type: row.get(2)?,
117                    item_name: row.get(3)?,
118                    old_hash: row.get(4)?,
119                    new_hash: row.get(5)?,
120                    detected_at: row.get(6)?,
121                })
122            },
123        )?;
124        rows.collect()
125    }
126
127    /// Compute the schema status for a given upstream URL.
128    pub fn schema_status(&self, upstream_url: &str) -> Result<SchemaStatusRow, rusqlite::Error> {
129        let methods_sql = "
130            SELECT method, captured_at FROM server_schema
131            WHERE upstream_url = ?1
132            ORDER BY method
133        ";
134        let mut stmt = self.conn().prepare(methods_sql)?;
135        let methods: Vec<(String, i64)> = stmt
136            .query_map(params![upstream_url], |row| Ok((row.get(0)?, row.get(1)?)))?
137            .collect::<Result<Vec<_>, _>>()?;
138
139        if methods.is_empty() {
140            return Ok(SchemaStatusRow {
141                upstream_url: upstream_url.to_string(),
142                status: "unknown".to_string(),
143                server_name: None,
144                server_version: None,
145                protocol_version: None,
146                capabilities: vec![],
147                methods_captured: vec![],
148                last_captured_at: None,
149            });
150        }
151
152        let method_names: Vec<String> = methods.iter().map(|(m, _)| m.clone()).collect();
153        let last_captured = methods.iter().map(|(_, ts)| *ts).max();
154
155        // Extract server info from initialize payload if available.
156        let (server_name, server_version, protocol_version, capabilities) =
157            self.extract_server_info(upstream_url);
158
159        // Check for stale markers newer than the latest capture for that method.
160        let stale_sql = "
161            SELECT COUNT(*) FROM schema_changes sc
162            WHERE sc.upstream_url = ?1
163              AND sc.change_type = 'stale'
164              AND sc.detected_at > COALESCE(
165                  (SELECT ss.captured_at FROM server_schema ss
166                   WHERE ss.upstream_url = sc.upstream_url AND ss.method = sc.method),
167                  0
168              )
169        ";
170        let stale_count: i64 = self
171            .conn()
172            .query_row(stale_sql, params![upstream_url], |row| row.get(0))?;
173        let is_stale = stale_count > 0;
174
175        let has_initialize = method_names.iter().any(|m| m == "initialize");
176        let list_methods = [
177            "tools/list",
178            "resources/list",
179            "resources/templates/list",
180            "prompts/list",
181        ];
182        let has_any_list = list_methods
183            .iter()
184            .any(|m| method_names.iter().any(|n| n == m));
185
186        let status = if is_stale {
187            "stale"
188        } else if has_initialize && has_any_list {
189            "complete"
190        } else {
191            "partial"
192        };
193
194        Ok(SchemaStatusRow {
195            upstream_url: upstream_url.to_string(),
196            status: status.to_string(),
197            server_name,
198            server_version,
199            protocol_version,
200            capabilities,
201            methods_captured: method_names,
202            last_captured_at: last_captured,
203        })
204    }
205
206    /// Cross-reference captured tools/list schema with actual request logs.
207    /// Returns all listed tools with their usage stats — unused tools have calls = 0.
208    pub fn schema_unused(
209        &self,
210        params: &SchemaUnusedParams,
211    ) -> Result<Vec<SchemaToolUsageRow>, rusqlite::Error> {
212        // Step 1: Get the tools/list payload from server_schema.
213        let payload: Option<String> = self
214            .conn()
215            .query_row(
216                "SELECT payload FROM server_schema WHERE method = 'tools/list' LIMIT 1",
217                [],
218                |row| row.get(0),
219            )
220            .ok();
221
222        let payload = match payload {
223            Some(p) => p,
224            None => return Ok(vec![]),
225        };
226
227        // Step 2: Parse tool names from the payload.
228        let val: serde_json::Value = serde_json::from_str(&payload).unwrap_or_default();
229        let tools = match val.get("tools").and_then(|t| t.as_array()) {
230            Some(arr) => arr,
231            None => return Ok(vec![]),
232        };
233
234        let mut tool_info: Vec<(String, String)> = Vec::new();
235        for tool in tools {
236            let name = tool
237                .get("name")
238                .and_then(|n| n.as_str())
239                .unwrap_or("")
240                .to_string();
241            let desc = tool
242                .get("description")
243                .and_then(|d| d.as_str())
244                .unwrap_or("")
245                .to_string();
246            if !name.is_empty() {
247                tool_info.push((name, desc));
248            }
249        }
250
251        if tool_info.is_empty() {
252            return Ok(vec![]);
253        }
254
255        // Step 3: Query request logs for each tool's usage.
256        let sql = "
257            SELECT COUNT(*) as calls,
258                   SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as errors,
259                   MAX(ts) as last_called_at
260            FROM requests
261            WHERE (?1 IS NULL OR proxy = ?1) AND ts >= ?2 AND tool = ?3
262        ";
263
264        let mut result = Vec::new();
265        for (name, desc) in &tool_info {
266            let row =
267                self.conn()
268                    .query_row(sql, params![params.proxy, params.since_ts, name], |row| {
269                        Ok((
270                            row.get::<_, i64>(0)?,
271                            row.get::<_, i64>(1)?,
272                            row.get::<_, Option<i64>>(2)?,
273                        ))
274                    });
275
276            let (calls, errors, last_called_at) = row.unwrap_or((0, 0, None));
277            result.push(SchemaToolUsageRow {
278                tool_name: name.clone(),
279                description: desc.clone(),
280                calls,
281                errors,
282                last_called_at,
283            });
284        }
285
286        // Sort: unused first (calls = 0), then by calls ascending.
287        result.sort_by(|a, b| a.calls.cmp(&b.calls));
288
289        Ok(result)
290    }
291
292    /// Extract server info from a captured initialize payload.
293    fn extract_server_info(
294        &self,
295        upstream_url: &str,
296    ) -> (Option<String>, Option<String>, Option<String>, Vec<String>) {
297        let payload: Option<String> = self
298            .conn()
299            .query_row(
300                "SELECT payload FROM server_schema WHERE upstream_url = ?1 AND method = 'initialize'",
301                params![upstream_url],
302                |row| row.get(0),
303            )
304            .ok();
305
306        let payload = match payload {
307            Some(p) => p,
308            None => return (None, None, None, vec![]),
309        };
310
311        let val: serde_json::Value = match serde_json::from_str(&payload) {
312            Ok(v) => v,
313            Err(_) => return (None, None, None, vec![]),
314        };
315
316        let server_name = val
317            .get("serverInfo")
318            .and_then(|i| i.get("name"))
319            .and_then(|n| n.as_str())
320            .map(String::from);
321        let server_version = val
322            .get("serverInfo")
323            .and_then(|i| i.get("version"))
324            .and_then(|v| v.as_str())
325            .map(String::from);
326        let protocol_version = val
327            .get("protocolVersion")
328            .and_then(|p| p.as_str())
329            .map(String::from);
330        let capabilities = val
331            .get("capabilities")
332            .and_then(|c| c.as_object())
333            .map(|obj| obj.keys().cloned().collect())
334            .unwrap_or_default();
335
336        (server_name, server_version, protocol_version, capabilities)
337    }
338}