Skip to main content

mcpr_integrations/store/query/
schema.rs

1//! Query: `mcpr proxy schema` — show captured MCP server schema and change history.
2
3use rusqlite::params;
4use serde::Serialize;
5
6use super::QueryEngine;
7
8/// Parameters for the schema snapshot query.
9pub struct SchemaParams {
10    /// Filter to a specific proxy name. `None` returns all proxies.
11    pub proxy: Option<String>,
12    pub method: Option<String>,
13}
14
15/// Parameters for the schema changes query.
16pub struct SchemaChangesParams {
17    /// Filter to a specific proxy name. `None` returns all proxies.
18    pub proxy: Option<String>,
19    pub method: Option<String>,
20    pub limit: i64,
21}
22
23/// A single schema snapshot row from `server_schema`.
24#[derive(Debug, Clone, Serialize)]
25pub struct SchemaRow {
26    pub upstream_url: String,
27    pub method: String,
28    pub payload: String,
29    pub captured_at: i64,
30    pub schema_hash: String,
31}
32
33/// A schema change record from `schema_changes`.
34#[derive(Debug, Clone, Serialize)]
35pub struct SchemaChangeRow {
36    pub upstream_url: String,
37    pub method: String,
38    pub change_type: String,
39    pub item_name: Option<String>,
40    pub old_hash: Option<String>,
41    pub new_hash: Option<String>,
42    pub detected_at: i64,
43}
44
45/// Parameters for the unused tools query.
46pub struct SchemaUnusedParams {
47    pub proxy: Option<String>,
48    pub since_ts: i64,
49}
50
51/// A tool listed in the schema with its usage stats.
52#[derive(Debug, Clone, Serialize)]
53pub struct SchemaToolUsageRow {
54    pub tool_name: String,
55    pub description: String,
56    pub calls: i64,
57    pub errors: i64,
58    pub last_called_at: Option<i64>,
59}
60
61/// Computed schema status for a given upstream.
62#[derive(Debug, Clone, Serialize)]
63pub struct SchemaStatusRow {
64    pub upstream_url: String,
65    /// "unknown", "partial", or "complete".
66    pub status: String,
67    pub server_name: Option<String>,
68    pub server_version: Option<String>,
69    pub protocol_version: Option<String>,
70    pub capabilities: Vec<String>,
71    pub methods_captured: Vec<String>,
72    pub last_captured_at: Option<i64>,
73}
74
75impl QueryEngine {
76    /// Fetch all captured schema snapshots, optionally filtered by proxy
77    /// name and/or MCP method.
78    pub fn schema(&self, params: &SchemaParams) -> Result<Vec<SchemaRow>, rusqlite::Error> {
79        let sql = "
80            SELECT upstream_url, method, payload, captured_at, schema_hash
81            FROM server_schema
82            WHERE (?1 IS NULL OR proxy = ?1)
83              AND (?2 IS NULL OR method = ?2)
84            ORDER BY upstream_url, method
85        ";
86        let mut stmt = self.conn().prepare(sql)?;
87        let rows = stmt.query_map(params![params.proxy, params.method], |row| {
88            Ok(SchemaRow {
89                upstream_url: row.get(0)?,
90                method: row.get(1)?,
91                payload: row.get(2)?,
92                captured_at: row.get(3)?,
93                schema_hash: row.get(4)?,
94            })
95        })?;
96        rows.collect()
97    }
98
99    /// Fetch schema change history, optionally filtered by proxy name
100    /// and/or MCP method.
101    pub fn schema_changes(
102        &self,
103        params: &SchemaChangesParams,
104    ) -> Result<Vec<SchemaChangeRow>, rusqlite::Error> {
105        let sql = "
106            SELECT upstream_url, method, change_type, item_name, old_hash, new_hash, detected_at
107            FROM schema_changes
108            WHERE (?1 IS NULL OR proxy = ?1)
109              AND (?2 IS NULL OR method = ?2)
110            ORDER BY detected_at DESC
111            LIMIT ?3
112        ";
113        let mut stmt = self.conn().prepare(sql)?;
114        let rows = stmt.query_map(params![params.proxy, params.method, params.limit], |row| {
115            Ok(SchemaChangeRow {
116                upstream_url: row.get(0)?,
117                method: row.get(1)?,
118                change_type: row.get(2)?,
119                item_name: row.get(3)?,
120                old_hash: row.get(4)?,
121                new_hash: row.get(5)?,
122                detected_at: row.get(6)?,
123            })
124        })?;
125        rows.collect()
126    }
127
128    /// Compute the schema status for a given upstream URL.
129    pub fn schema_status(&self, upstream_url: &str) -> Result<SchemaStatusRow, rusqlite::Error> {
130        let methods_sql = "
131            SELECT method, captured_at FROM server_schema
132            WHERE upstream_url = ?1
133            ORDER BY method
134        ";
135        let mut stmt = self.conn().prepare(methods_sql)?;
136        let methods: Vec<(String, i64)> = stmt
137            .query_map(params![upstream_url], |row| Ok((row.get(0)?, row.get(1)?)))?
138            .collect::<Result<Vec<_>, _>>()?;
139
140        if methods.is_empty() {
141            return Ok(SchemaStatusRow {
142                upstream_url: upstream_url.to_string(),
143                status: "unknown".to_string(),
144                server_name: None,
145                server_version: None,
146                protocol_version: None,
147                capabilities: vec![],
148                methods_captured: vec![],
149                last_captured_at: None,
150            });
151        }
152
153        let method_names: Vec<String> = methods.iter().map(|(m, _)| m.clone()).collect();
154        let last_captured = methods.iter().map(|(_, ts)| *ts).max();
155
156        // Extract server info from initialize payload if available.
157        let (server_name, server_version, protocol_version, capabilities) =
158            self.extract_server_info(upstream_url);
159
160        let has_initialize = method_names.iter().any(|m| m == "initialize");
161        let list_methods = [
162            "tools/list",
163            "resources/list",
164            "resources/templates/list",
165            "prompts/list",
166        ];
167        let has_any_list = list_methods
168            .iter()
169            .any(|m| method_names.iter().any(|n| n == m));
170
171        let status = if has_initialize && has_any_list {
172            "complete"
173        } else {
174            "partial"
175        };
176
177        Ok(SchemaStatusRow {
178            upstream_url: upstream_url.to_string(),
179            status: status.to_string(),
180            server_name,
181            server_version,
182            protocol_version,
183            capabilities,
184            methods_captured: method_names,
185            last_captured_at: last_captured,
186        })
187    }
188
189    /// Cross-reference captured tools/list schema with actual request logs.
190    /// Returns all listed tools with their usage stats — unused tools have calls = 0.
191    pub fn schema_unused(
192        &self,
193        params: &SchemaUnusedParams,
194    ) -> Result<Vec<SchemaToolUsageRow>, rusqlite::Error> {
195        // Step 1: Get the tools/list payload from server_schema.
196        let payload: Option<String> = self
197            .conn()
198            .query_row(
199                "SELECT payload FROM server_schema WHERE method = 'tools/list' LIMIT 1",
200                [],
201                |row| row.get(0),
202            )
203            .ok();
204
205        let payload = match payload {
206            Some(p) => p,
207            None => return Ok(vec![]),
208        };
209
210        // Step 2: Parse tool names from the payload.
211        let val: serde_json::Value = serde_json::from_str(&payload).unwrap_or_default();
212        let tools = match val.get("tools").and_then(|t| t.as_array()) {
213            Some(arr) => arr,
214            None => return Ok(vec![]),
215        };
216
217        let mut tool_info: Vec<(String, String)> = Vec::new();
218        for tool in tools {
219            let name = tool
220                .get("name")
221                .and_then(|n| n.as_str())
222                .unwrap_or("")
223                .to_string();
224            let desc = tool
225                .get("description")
226                .and_then(|d| d.as_str())
227                .unwrap_or("")
228                .to_string();
229            if !name.is_empty() {
230                tool_info.push((name, desc));
231            }
232        }
233
234        if tool_info.is_empty() {
235            return Ok(vec![]);
236        }
237
238        // Step 3: Query request logs for each tool's usage.
239        let sql = "
240            SELECT COUNT(*) as calls,
241                   SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as errors,
242                   MAX(ts) as last_called_at
243            FROM requests
244            WHERE (?1 IS NULL OR proxy = ?1) AND ts >= ?2 AND tool = ?3
245        ";
246
247        let mut result = Vec::new();
248        for (name, desc) in &tool_info {
249            let row =
250                self.conn()
251                    .query_row(sql, params![params.proxy, params.since_ts, name], |row| {
252                        Ok((
253                            row.get::<_, i64>(0)?,
254                            row.get::<_, i64>(1)?,
255                            row.get::<_, Option<i64>>(2)?,
256                        ))
257                    });
258
259            let (calls, errors, last_called_at) = row.unwrap_or((0, 0, None));
260            result.push(SchemaToolUsageRow {
261                tool_name: name.clone(),
262                description: desc.clone(),
263                calls,
264                errors,
265                last_called_at,
266            });
267        }
268
269        // Sort: unused first (calls = 0), then by calls ascending.
270        result.sort_by(|a, b| a.calls.cmp(&b.calls));
271
272        Ok(result)
273    }
274
275    /// Extract server info from a captured initialize payload.
276    fn extract_server_info(
277        &self,
278        upstream_url: &str,
279    ) -> (Option<String>, Option<String>, Option<String>, Vec<String>) {
280        let payload: Option<String> = self
281            .conn()
282            .query_row(
283                "SELECT payload FROM server_schema WHERE upstream_url = ?1 AND method = 'initialize'",
284                params![upstream_url],
285                |row| row.get(0),
286            )
287            .ok();
288
289        let payload = match payload {
290            Some(p) => p,
291            None => return (None, None, None, vec![]),
292        };
293
294        let val: serde_json::Value = match serde_json::from_str(&payload) {
295            Ok(v) => v,
296            Err(_) => return (None, None, None, vec![]),
297        };
298
299        let server_name = val
300            .get("serverInfo")
301            .and_then(|i| i.get("name"))
302            .and_then(|n| n.as_str())
303            .map(String::from);
304        let server_version = val
305            .get("serverInfo")
306            .and_then(|i| i.get("version"))
307            .and_then(|v| v.as_str())
308            .map(String::from);
309        let protocol_version = val
310            .get("protocolVersion")
311            .and_then(|p| p.as_str())
312            .map(String::from);
313        let capabilities = val
314            .get("capabilities")
315            .and_then(|c| c.as_object())
316            .map(|obj| obj.keys().cloned().collect())
317            .unwrap_or_default();
318
319        (server_name, server_version, protocol_version, capabilities)
320    }
321}