Skip to main content

mcpr_integrations/store/query/
schema.rs

1//! Query: `mcpr proxy schema` — show captured MCP server schema and change history.
2
3use rusqlite::params;
4use serde::Serialize;
5
6use super::QueryEngine;
7
8/// Parameters for the schema snapshot query.
9pub struct SchemaParams {
10    /// Filter to a specific proxy name. `None` returns all proxies.
11    pub proxy: Option<String>,
12    pub method: Option<String>,
13}
14
15/// Parameters for the schema changes query.
16pub struct SchemaChangesParams {
17    /// Filter to a specific proxy name. `None` returns all proxies.
18    pub proxy: Option<String>,
19    pub method: Option<String>,
20    pub limit: i64,
21}
22
23/// A single schema snapshot row from `server_schema`.
24#[derive(Debug, Clone, Serialize)]
25pub struct SchemaRow {
26    pub upstream_url: String,
27    pub method: String,
28    pub payload: String,
29    pub captured_at: i64,
30    pub schema_hash: String,
31}
32
33/// A hydration-focused schema row. Adds `proxy` which the general
34/// `SchemaRow` omits (it's not useful for the status-display consumers
35/// that call `schema()`). Used at startup to seed `SchemaManager`.
36#[derive(Debug, Clone)]
37pub struct LatestSchemaRow {
38    pub proxy: String,
39    pub upstream_url: String,
40    pub method: String,
41    pub payload: String,
42    pub captured_at: i64,
43    pub schema_hash: String,
44}
45
46/// A schema change record from `schema_changes`.
47#[derive(Debug, Clone, Serialize)]
48pub struct SchemaChangeRow {
49    pub upstream_url: String,
50    pub method: String,
51    pub change_type: String,
52    pub item_name: Option<String>,
53    pub old_hash: Option<String>,
54    pub new_hash: Option<String>,
55    pub detected_at: i64,
56}
57
58/// Parameters for the unused tools query.
59pub struct SchemaUnusedParams {
60    pub proxy: Option<String>,
61    pub since_ts: i64,
62}
63
64/// A tool listed in the schema with its usage stats.
65#[derive(Debug, Clone, Serialize)]
66pub struct SchemaToolUsageRow {
67    pub tool_name: String,
68    pub description: String,
69    pub calls: i64,
70    pub errors: i64,
71    pub last_called_at: Option<i64>,
72}
73
74/// Computed schema status for a given upstream.
75#[derive(Debug, Clone, Serialize)]
76pub struct SchemaStatusRow {
77    pub upstream_url: String,
78    /// "unknown", "partial", or "complete".
79    pub status: String,
80    pub server_name: Option<String>,
81    pub server_version: Option<String>,
82    pub protocol_version: Option<String>,
83    pub capabilities: Vec<String>,
84    pub methods_captured: Vec<String>,
85    pub last_captured_at: Option<i64>,
86}
87
88impl QueryEngine {
89    /// Fetch all captured schema snapshots, optionally filtered by proxy
90    /// name and/or MCP method.
91    pub fn schema(&self, params: &SchemaParams) -> Result<Vec<SchemaRow>, rusqlite::Error> {
92        let sql = "
93            SELECT upstream_url, method, payload, captured_at, schema_hash
94            FROM server_schema
95            WHERE (?1 IS NULL OR proxy = ?1)
96              AND (?2 IS NULL OR method = ?2)
97            ORDER BY upstream_url, method
98        ";
99        let mut stmt = self.conn().prepare(sql)?;
100        let rows = stmt.query_map(params![params.proxy, params.method], |row| {
101            Ok(SchemaRow {
102                upstream_url: row.get(0)?,
103                method: row.get(1)?,
104                payload: row.get(2)?,
105                captured_at: row.get(3)?,
106                schema_hash: row.get(4)?,
107            })
108        })?;
109        rows.collect()
110    }
111
112    /// Fetch schema change history, optionally filtered by proxy name
113    /// and/or MCP method.
114    pub fn schema_changes(
115        &self,
116        params: &SchemaChangesParams,
117    ) -> Result<Vec<SchemaChangeRow>, rusqlite::Error> {
118        let sql = "
119            SELECT upstream_url, method, change_type, item_name, old_hash, new_hash, detected_at
120            FROM schema_changes
121            WHERE (?1 IS NULL OR proxy = ?1)
122              AND (?2 IS NULL OR method = ?2)
123            ORDER BY detected_at DESC
124            LIMIT ?3
125        ";
126        let mut stmt = self.conn().prepare(sql)?;
127        let rows = stmt.query_map(params![params.proxy, params.method, params.limit], |row| {
128            Ok(SchemaChangeRow {
129                upstream_url: row.get(0)?,
130                method: row.get(1)?,
131                change_type: row.get(2)?,
132                item_name: row.get(3)?,
133                old_hash: row.get(4)?,
134                new_hash: row.get(5)?,
135                detected_at: row.get(6)?,
136            })
137        })?;
138        rows.collect()
139    }
140
141    /// Compute the schema status for a given upstream URL.
142    pub fn schema_status(&self, upstream_url: &str) -> Result<SchemaStatusRow, rusqlite::Error> {
143        let methods_sql = "
144            SELECT method, captured_at FROM server_schema
145            WHERE upstream_url = ?1
146            ORDER BY method
147        ";
148        let mut stmt = self.conn().prepare(methods_sql)?;
149        let methods: Vec<(String, i64)> = stmt
150            .query_map(params![upstream_url], |row| Ok((row.get(0)?, row.get(1)?)))?
151            .collect::<Result<Vec<_>, _>>()?;
152
153        if methods.is_empty() {
154            return Ok(SchemaStatusRow {
155                upstream_url: upstream_url.to_string(),
156                status: "unknown".to_string(),
157                server_name: None,
158                server_version: None,
159                protocol_version: None,
160                capabilities: vec![],
161                methods_captured: vec![],
162                last_captured_at: None,
163            });
164        }
165
166        let method_names: Vec<String> = methods.iter().map(|(m, _)| m.clone()).collect();
167        let last_captured = methods.iter().map(|(_, ts)| *ts).max();
168
169        // Extract server info from initialize payload if available.
170        let (server_name, server_version, protocol_version, capabilities) =
171            self.extract_server_info(upstream_url);
172
173        let has_initialize = method_names.iter().any(|m| m == "initialize");
174        let list_methods = [
175            "tools/list",
176            "resources/list",
177            "resources/templates/list",
178            "prompts/list",
179        ];
180        let has_any_list = list_methods
181            .iter()
182            .any(|m| method_names.iter().any(|n| n == m));
183
184        let status = if has_initialize && has_any_list {
185            "complete"
186        } else {
187            "partial"
188        };
189
190        Ok(SchemaStatusRow {
191            upstream_url: upstream_url.to_string(),
192            status: status.to_string(),
193            server_name,
194            server_version,
195            protocol_version,
196            capabilities,
197            methods_captured: method_names,
198            last_captured_at: last_captured,
199        })
200    }
201
202    /// Fetch the latest persisted schema row for `(proxy, method)` —
203    /// used at proxy startup to hydrate `SchemaManager` from disk.
204    ///
205    /// Returns `None` when no row matches. `server_schema` is an
206    /// UPSERT-keyed-by-(proxy, upstream_url, method) table, so the
207    /// "latest" is simply the single row per key; we pick by highest
208    /// `captured_at` to be robust against multiple upstream URLs that
209    /// share the same proxy name (unlikely today but possible).
210    pub fn latest_schema_row(
211        &self,
212        proxy: &str,
213        method: &str,
214    ) -> Result<Option<LatestSchemaRow>, rusqlite::Error> {
215        let sql = "
216            SELECT proxy, upstream_url, method, payload, captured_at, schema_hash
217            FROM server_schema
218            WHERE proxy = ?1 AND method = ?2
219            ORDER BY captured_at DESC
220            LIMIT 1
221        ";
222        self.conn()
223            .query_row(sql, params![proxy, method], |row| {
224                Ok(LatestSchemaRow {
225                    proxy: row.get(0)?,
226                    upstream_url: row.get(1)?,
227                    method: row.get(2)?,
228                    payload: row.get(3)?,
229                    captured_at: row.get(4)?,
230                    schema_hash: row.get(5)?,
231                })
232            })
233            .map(Some)
234            .or_else(|e| match e {
235                rusqlite::Error::QueryReturnedNoRows => Ok(None),
236                other => Err(other),
237            })
238    }
239
240    /// Cross-reference captured tools/list schema with actual request logs.
241    /// Returns all listed tools with their usage stats — unused tools have calls = 0.
242    pub fn schema_unused(
243        &self,
244        params: &SchemaUnusedParams,
245    ) -> Result<Vec<SchemaToolUsageRow>, rusqlite::Error> {
246        // Step 1: Get the tools/list payload from server_schema.
247        let payload: Option<String> = self
248            .conn()
249            .query_row(
250                "SELECT payload FROM server_schema WHERE method = 'tools/list' LIMIT 1",
251                [],
252                |row| row.get(0),
253            )
254            .ok();
255
256        let payload = match payload {
257            Some(p) => p,
258            None => return Ok(vec![]),
259        };
260
261        // Step 2: Parse tool names from the payload.
262        let val: serde_json::Value = serde_json::from_str(&payload).unwrap_or_default();
263        let tools = match val.get("tools").and_then(|t| t.as_array()) {
264            Some(arr) => arr,
265            None => return Ok(vec![]),
266        };
267
268        let mut tool_info: Vec<(String, String)> = Vec::new();
269        for tool in tools {
270            let name = tool
271                .get("name")
272                .and_then(|n| n.as_str())
273                .unwrap_or("")
274                .to_string();
275            let desc = tool
276                .get("description")
277                .and_then(|d| d.as_str())
278                .unwrap_or("")
279                .to_string();
280            if !name.is_empty() {
281                tool_info.push((name, desc));
282            }
283        }
284
285        if tool_info.is_empty() {
286            return Ok(vec![]);
287        }
288
289        // Step 3: Query request logs for each tool's usage.
290        let sql = "
291            SELECT COUNT(*) as calls,
292                   SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as errors,
293                   MAX(ts) as last_called_at
294            FROM requests
295            WHERE (?1 IS NULL OR proxy = ?1) AND ts >= ?2 AND tool = ?3
296        ";
297
298        let mut result = Vec::new();
299        for (name, desc) in &tool_info {
300            let row =
301                self.conn()
302                    .query_row(sql, params![params.proxy, params.since_ts, name], |row| {
303                        Ok((
304                            row.get::<_, i64>(0)?,
305                            row.get::<_, i64>(1)?,
306                            row.get::<_, Option<i64>>(2)?,
307                        ))
308                    });
309
310            let (calls, errors, last_called_at) = row.unwrap_or((0, 0, None));
311            result.push(SchemaToolUsageRow {
312                tool_name: name.clone(),
313                description: desc.clone(),
314                calls,
315                errors,
316                last_called_at,
317            });
318        }
319
320        // Sort: unused first (calls = 0), then by calls ascending.
321        result.sort_by(|a, b| a.calls.cmp(&b.calls));
322
323        Ok(result)
324    }
325
326    /// Extract server info from a captured initialize payload.
327    fn extract_server_info(
328        &self,
329        upstream_url: &str,
330    ) -> (Option<String>, Option<String>, Option<String>, Vec<String>) {
331        let payload: Option<String> = self
332            .conn()
333            .query_row(
334                "SELECT payload FROM server_schema WHERE upstream_url = ?1 AND method = 'initialize'",
335                params![upstream_url],
336                |row| row.get(0),
337            )
338            .ok();
339
340        let payload = match payload {
341            Some(p) => p,
342            None => return (None, None, None, vec![]),
343        };
344
345        let val: serde_json::Value = match serde_json::from_str(&payload) {
346            Ok(v) => v,
347            Err(_) => return (None, None, None, vec![]),
348        };
349
350        let server_name = val
351            .get("serverInfo")
352            .and_then(|i| i.get("name"))
353            .and_then(|n| n.as_str())
354            .map(String::from);
355        let server_version = val
356            .get("serverInfo")
357            .and_then(|i| i.get("version"))
358            .and_then(|v| v.as_str())
359            .map(String::from);
360        let protocol_version = val
361            .get("protocolVersion")
362            .and_then(|p| p.as_str())
363            .map(String::from);
364        let capabilities = val
365            .get("capabilities")
366            .and_then(|c| c.as_object())
367            .map(|obj| obj.keys().cloned().collect())
368            .unwrap_or_default();
369
370        (server_name, server_version, protocol_version, capabilities)
371    }
372}