Skip to main content

mcpr_integrations/store/query/
mod.rs

1//! Query engine — read-only access to the storage database.
2//!
3//! All CLI observability commands (`mcpr proxy logs`, `mcpr proxy slow`, etc.)
4//! are thin wrappers around [`QueryEngine`] methods. Each method executes a
5//! parameterized SQL query and maps rows to typed result structs.
6//!
7//! The query engine opens its own read-only connection to the database.
8//! WAL mode ensures this never blocks the background writer.
9
10pub mod clients;
11pub mod logs;
12pub mod schema;
13pub mod session_detail;
14pub mod sessions;
15pub mod slow;
16pub mod stats;
17pub mod store_ops;
18
19use rusqlite::Connection;
20use std::path::Path;
21
22use super::db;
23
24/// Read-only query interface to the storage database.
25pub struct QueryEngine {
26    conn: Connection,
27}
28
29impl QueryEngine {
30    /// Open a query connection to the database at the given path.
31    ///
32    /// Runs migrations on open. The writer also migrates on `Store::open`,
33    /// but a user who upgrades the binary and runs a read command before
34    /// restarting the proxy would otherwise hit "no such column" errors
35    /// against a stale schema. Migrations are idempotent + WAL-safe, so
36    /// it's fine for the reader to bump the schema if the writer hasn't
37    /// caught up yet.
38    pub fn open(db_path: &Path) -> Result<Self, rusqlite::Error> {
39        let conn = db::open_connection(db_path)?;
40        db::run_migrations(&conn, env!("CARGO_PKG_VERSION"))?;
41        Ok(QueryEngine { conn })
42    }
43
44    /// Get a reference to the underlying connection (for query methods).
45    pub(crate) fn conn(&self) -> &Connection {
46        &self.conn
47    }
48
49    /// Create a query engine from an in-memory connection (for testing).
50    #[cfg(test)]
51    pub(crate) fn from_conn(conn: Connection) -> Self {
52        QueryEngine { conn }
53    }
54}
55
56#[cfg(test)]
57#[allow(non_snake_case)]
58mod tests {
59    use super::*;
60    use crate::store::{db, schema};
61    use rusqlite::params;
62
63    /// `QueryEngine::open` must migrate a stale DB so users who upgrade the
64    /// binary and run a read command (without restarting the proxy first)
65    /// don't hit "no such column" errors.
66    #[test]
67    fn open__migrates_stale_db_to_current_version() {
68        let dir = tempfile::tempdir().unwrap();
69        let db_path = dir.path().join("stale.db");
70
71        // Build a V4-shaped DB: run migrations through V4 only.
72        {
73            let conn = Connection::open(&db_path).unwrap();
74            conn.execute_batch(schema::V1_SCHEMA).unwrap();
75            conn.execute_batch(schema::V1_META_SEED).unwrap();
76            conn.execute_batch(schema::V2_SCHEMA).unwrap();
77            conn.execute_batch(schema::V3_SCHEMA).unwrap();
78            conn.execute_batch(schema::V4_SCHEMA).unwrap();
79        }
80
81        // Opening the query engine should bump the schema to V5.
82        let _engine = QueryEngine::open(&db_path).unwrap();
83
84        let conn = Connection::open(&db_path).unwrap();
85        let version: String = conn
86            .query_row(
87                "SELECT value FROM meta WHERE key = 'schema_version'",
88                [],
89                |row| row.get(0),
90            )
91            .unwrap();
92        assert_eq!(version, schema::SCHEMA_VERSION);
93
94        // V5 columns must be queryable.
95        conn.query_row(
96            "SELECT resource_uri, prompt_name FROM requests LIMIT 1",
97            [],
98            |_| Ok(()),
99        )
100        .or_else(|e| {
101            if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
102                Ok(())
103            } else {
104                Err(e)
105            }
106        })
107        .unwrap();
108    }
109
110    /// Create a test QueryEngine with schema applied and seed data inserted.
111    pub(crate) fn seeded_engine() -> QueryEngine {
112        let conn = Connection::open_in_memory().unwrap();
113        db::run_migrations(&conn, "test").unwrap();
114
115        // Seed sessions
116        conn.execute(
117            "INSERT INTO sessions (session_id, proxy, started_at, last_seen_at, client_name, client_version, client_platform, total_calls, total_errors)
118             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
119            params!["s1", "api", 1000, 5000, "claude-desktop", "1.2.0", "claude", 3, 1],
120        ).unwrap();
121        conn.execute(
122            "INSERT INTO sessions (session_id, proxy, started_at, last_seen_at, ended_at, client_name, client_version, client_platform, total_calls, total_errors)
123             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
124            params!["s2", "api", 2000, 3000, 3500, "cursor", "0.44", "cursor", 2, 0],
125        ).unwrap();
126
127        // Seed requests
128        let requests = vec![
129            (
130                "r1",
131                1000i64,
132                "api",
133                Some("s1"),
134                "tools/call",
135                Some("search"),
136                142i64,
137                "ok",
138                None::<&str>,
139                None::<&str>,
140                Some(256i64),
141                Some(1024i64),
142            ),
143            (
144                "r2",
145                2000,
146                "api",
147                Some("s1"),
148                "tools/call",
149                Some("search"),
150                891,
151                "ok",
152                None,
153                None,
154                Some(256),
155                Some(4096),
156            ),
157            (
158                "r3",
159                3000,
160                "api",
161                Some("s1"),
162                "tools/call",
163                Some("create_order"),
164                4201,
165                "error",
166                Some("-32600"),
167                Some("timeout"),
168                Some(512),
169                None,
170            ),
171            (
172                "r4",
173                4000,
174                "api",
175                Some("s2"),
176                "resources/read",
177                None,
178                23,
179                "ok",
180                None,
181                None,
182                Some(64),
183                Some(2048),
184            ),
185            (
186                "r5",
187                5000,
188                "api",
189                Some("s2"),
190                "tools/call",
191                Some("search"),
192                156,
193                "ok",
194                None,
195                None,
196                Some(256),
197                Some(1024),
198            ),
199        ];
200
201        for (
202            id,
203            ts,
204            proxy,
205            sid,
206            method,
207            tool,
208            latency,
209            status,
210            err_code,
211            err_msg,
212            bytes_in,
213            bytes_out,
214        ) in requests
215        {
216            conn.execute(
217                "INSERT INTO requests (request_id, ts, proxy, session_id, method, tool, latency_us, status, error_code, error_msg, bytes_in, bytes_out)
218                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
219                params![id, ts, proxy, sid, method, tool, latency, status, err_code, err_msg, bytes_in, bytes_out],
220            ).unwrap();
221        }
222
223        QueryEngine::from_conn(conn)
224    }
225
226    // ── logs ────────────────────────────────────────────────────────────
227
228    #[test]
229    fn logs__returns_all_rows() {
230        let engine = seeded_engine();
231        let rows = engine
232            .logs(&super::logs::LogsParams {
233                proxy: Some("api".into()),
234                since_ts: 0,
235                limit: 100,
236                tool: None,
237                method: None,
238                session: None,
239                status: None,
240                error_code: None,
241            })
242            .unwrap();
243        assert_eq!(rows.len(), 5);
244        assert_eq!(rows[0].request_id, "r5");
245        assert_eq!(rows[4].request_id, "r1");
246    }
247
248    #[test]
249    fn logs__filter_by_tool() {
250        let engine = seeded_engine();
251        let rows = engine
252            .logs(&super::logs::LogsParams {
253                proxy: Some("api".into()),
254                since_ts: 0,
255                limit: 100,
256                tool: Some("search".into()),
257                method: None,
258                session: None,
259                status: None,
260                error_code: None,
261            })
262            .unwrap();
263        assert_eq!(rows.len(), 3);
264        assert!(rows.iter().all(|r| r.tool.as_deref() == Some("search")));
265    }
266
267    #[test]
268    fn logs__filter_by_status() {
269        let engine = seeded_engine();
270        let rows = engine
271            .logs(&super::logs::LogsParams {
272                proxy: Some("api".into()),
273                since_ts: 0,
274                limit: 100,
275                tool: None,
276                method: None,
277                session: None,
278                status: Some("error".into()),
279                error_code: None,
280            })
281            .unwrap();
282        assert_eq!(rows.len(), 1);
283        assert_eq!(rows[0].request_id, "r3");
284        assert_eq!(rows[0].error_msg.as_deref(), Some("timeout"));
285    }
286
287    #[test]
288    fn logs__since_returns_newer() {
289        let engine = seeded_engine();
290        let params = super::logs::LogsParams {
291            proxy: Some("api".into()),
292            since_ts: 0,
293            limit: 100,
294            tool: None,
295            method: None,
296            session: None,
297            status: None,
298            error_code: None,
299        };
300        let rows = engine.logs_since(&params, 3000).unwrap();
301        assert_eq!(rows.len(), 2);
302        assert_eq!(rows[0].request_id, "r4");
303        assert_eq!(rows[1].request_id, "r5");
304    }
305
306    #[test]
307    fn logs__empty_proxy() {
308        let engine = seeded_engine();
309        let rows = engine
310            .logs(&super::logs::LogsParams {
311                proxy: Some("nonexistent".into()),
312                since_ts: 0,
313                limit: 100,
314                tool: None,
315                method: None,
316                session: None,
317                status: None,
318                error_code: None,
319            })
320            .unwrap();
321        assert!(rows.is_empty());
322    }
323
324    #[test]
325    fn logs__filter_by_session() {
326        let engine = seeded_engine();
327        let rows = engine
328            .logs(&super::logs::LogsParams {
329                proxy: Some("api".into()),
330                since_ts: 0,
331                limit: 100,
332                tool: None,
333                method: None,
334                session: Some("s1".into()),
335                status: None,
336                error_code: None,
337            })
338            .unwrap();
339        assert_eq!(rows.len(), 3);
340        assert!(rows.iter().all(|r| r.session_id.as_deref() == Some("s1")));
341    }
342
343    #[test]
344    fn logs__filter_by_session_prefix() {
345        let engine = seeded_engine();
346        let rows = engine
347            .logs(&super::logs::LogsParams {
348                proxy: Some("api".into()),
349                since_ts: 0,
350                limit: 100,
351                tool: None,
352                method: None,
353                session: Some("s".into()),
354                status: None,
355                error_code: None,
356            })
357            .unwrap();
358        assert_eq!(rows.len(), 5);
359    }
360
361    #[test]
362    fn logs__filter_by_method() {
363        let engine = seeded_engine();
364        let rows = engine
365            .logs(&super::logs::LogsParams {
366                proxy: Some("api".into()),
367                since_ts: 0,
368                limit: 100,
369                tool: None,
370                method: Some("resources/read".into()),
371                session: None,
372                status: None,
373                error_code: None,
374            })
375            .unwrap();
376        assert_eq!(rows.len(), 1);
377        assert_eq!(rows[0].request_id, "r4");
378    }
379
380    #[test]
381    fn logs__filter_combined_session_and_method() {
382        let engine = seeded_engine();
383        let rows = engine
384            .logs(&super::logs::LogsParams {
385                proxy: Some("api".into()),
386                since_ts: 0,
387                limit: 100,
388                tool: None,
389                method: Some("tools/call".into()),
390                session: Some("s1".into()),
391                status: None,
392                error_code: None,
393            })
394            .unwrap();
395        assert_eq!(rows.len(), 3);
396    }
397
398    #[test]
399    fn logs__filter_by_error_code() {
400        let engine = seeded_engine();
401        let rows = engine
402            .logs(&super::logs::LogsParams {
403                proxy: Some("api".into()),
404                since_ts: 0,
405                limit: 100,
406                tool: None,
407                method: None,
408                session: None,
409                status: None,
410                error_code: Some("-32600".into()),
411            })
412            .unwrap();
413        assert_eq!(rows.len(), 1);
414        assert_eq!(rows[0].request_id, "r3");
415        assert_eq!(rows[0].error_code.as_deref(), Some("-32600"));
416    }
417
418    #[test]
419    fn logs__filter_by_error_code_no_match() {
420        let engine = seeded_engine();
421        let rows = engine
422            .logs(&super::logs::LogsParams {
423                proxy: Some("api".into()),
424                since_ts: 0,
425                limit: 100,
426                tool: None,
427                method: None,
428                session: None,
429                status: None,
430                error_code: Some("-32601".into()),
431            })
432            .unwrap();
433        assert!(rows.is_empty());
434    }
435
436    #[test]
437    fn logs__error_code_present_in_row() {
438        let engine = seeded_engine();
439        let rows = engine
440            .logs(&super::logs::LogsParams {
441                proxy: Some("api".into()),
442                since_ts: 0,
443                limit: 100,
444                tool: None,
445                method: None,
446                session: None,
447                status: None,
448                error_code: None,
449            })
450            .unwrap();
451        let r3 = rows.iter().find(|r| r.request_id == "r3").unwrap();
452        assert_eq!(r3.error_code.as_deref(), Some("-32600"));
453        let r1 = rows.iter().find(|r| r.request_id == "r1").unwrap();
454        assert!(r1.error_code.is_none());
455    }
456
457    // ── slow ────────────────────────────────────────────────────────────
458
459    #[test]
460    fn slow__filter_by_tool() {
461        let engine = seeded_engine();
462        let rows = engine
463            .slow(&super::slow::SlowParams {
464                proxy: Some("api".into()),
465                tool: Some("search".into()),
466                threshold_us: 500,
467                since_ts: 0,
468                limit: 100,
469            })
470            .unwrap();
471        assert_eq!(rows.len(), 1);
472        assert_eq!(rows[0].tool.as_deref(), Some("search"));
473        assert_eq!(rows[0].latency_us, 891);
474    }
475
476    #[test]
477    fn slow__returns_above_threshold() {
478        let engine = seeded_engine();
479        let rows = engine
480            .slow(&super::slow::SlowParams {
481                proxy: Some("api".into()),
482                tool: None,
483                threshold_us: 500,
484                since_ts: 0,
485                limit: 100,
486            })
487            .unwrap();
488        assert_eq!(rows.len(), 2);
489        assert_eq!(rows[0].latency_us, 4201);
490        assert_eq!(rows[1].latency_us, 891);
491    }
492
493    #[test]
494    fn slow__high_threshold_returns_empty() {
495        let engine = seeded_engine();
496        let rows = engine
497            .slow(&super::slow::SlowParams {
498                proxy: Some("api".into()),
499                tool: None,
500                threshold_us: 10000,
501                since_ts: 0,
502                limit: 100,
503            })
504            .unwrap();
505        assert!(rows.is_empty());
506    }
507
508    // ── slow_since (--follow) ──────────────────────────────────────────
509
510    #[test]
511    fn slow_since__returns_newer_rows() {
512        let engine = seeded_engine();
513        let params = super::slow::SlowParams {
514            proxy: Some("api".into()),
515            threshold_us: 500,
516            since_ts: 0,
517            tool: None,
518            limit: 100,
519        };
520        let rows = engine.slow_since(&params, 1000).unwrap();
521        assert_eq!(rows.len(), 2);
522        assert_eq!(rows[0].request_id, "r2");
523        assert_eq!(rows[1].request_id, "r3");
524    }
525
526    #[test]
527    fn slow_since__excludes_at_boundary() {
528        let engine = seeded_engine();
529        let params = super::slow::SlowParams {
530            proxy: Some("api".into()),
531            threshold_us: 500,
532            since_ts: 0,
533            tool: None,
534            limit: 100,
535        };
536        let rows = engine.slow_since(&params, 2000).unwrap();
537        assert_eq!(rows.len(), 1);
538        assert_eq!(rows[0].request_id, "r3");
539    }
540
541    #[test]
542    fn slow_since__returns_empty_when_no_new() {
543        let engine = seeded_engine();
544        let params = super::slow::SlowParams {
545            proxy: Some("api".into()),
546            threshold_us: 500,
547            since_ts: 0,
548            tool: None,
549            limit: 100,
550        };
551        let rows = engine.slow_since(&params, 5000).unwrap();
552        assert!(rows.is_empty());
553    }
554
555    #[test]
556    fn slow_since__respects_threshold() {
557        let engine = seeded_engine();
558        let params = super::slow::SlowParams {
559            proxy: Some("api".into()),
560            threshold_us: 1000,
561            since_ts: 0,
562            tool: None,
563            limit: 100,
564        };
565        let rows = engine.slow_since(&params, 0).unwrap();
566        assert_eq!(rows.len(), 1);
567        assert_eq!(rows[0].latency_us, 4201);
568    }
569
570    #[test]
571    fn slow_since__respects_tool_filter() {
572        let engine = seeded_engine();
573        let params = super::slow::SlowParams {
574            proxy: Some("api".into()),
575            threshold_us: 500,
576            since_ts: 0,
577            tool: Some("search".into()),
578            limit: 100,
579        };
580        let rows = engine.slow_since(&params, 0).unwrap();
581        assert_eq!(rows.len(), 1);
582        assert_eq!(rows[0].tool.as_deref(), Some("search"));
583        assert_eq!(rows[0].latency_us, 891);
584    }
585
586    // ── stats ───────────────────────────────────────────────────────────
587
588    #[test]
589    fn stats__aggregates_correctly() {
590        let engine = seeded_engine();
591        let result = engine
592            .stats(&super::stats::StatsParams {
593                proxy: Some("api".into()),
594                since_ts: 0,
595            })
596            .unwrap();
597        assert_eq!(result.total_calls, 5);
598        assert!(result.error_pct > 0.0);
599        let search = result.tools.iter().find(|t| t.label == "search").unwrap();
600        assert_eq!(search.calls, 3);
601    }
602
603    #[test]
604    fn stats__empty_proxy() {
605        let engine = seeded_engine();
606        let result = engine
607            .stats(&super::stats::StatsParams {
608                proxy: Some("nonexistent".into()),
609                since_ts: 0,
610            })
611            .unwrap();
612        assert_eq!(result.total_calls, 0);
613        assert!(result.tools.is_empty());
614    }
615
616    #[test]
617    fn stats__latency_us_values() {
618        let engine = seeded_engine();
619        let result = engine
620            .stats(&super::stats::StatsParams {
621                proxy: Some("api".into()),
622                since_ts: 0,
623            })
624            .unwrap();
625
626        let search = result.tools.iter().find(|t| t.label == "search").unwrap();
627        assert_eq!(search.min_us, 142);
628        assert_eq!(search.max_us, 891);
629        assert!((search.avg_us - 396.33).abs() < 1.0);
630        assert_eq!(search.p95_us, 891);
631    }
632
633    #[test]
634    fn stats__serialization_uses_us_field_names() {
635        let engine = seeded_engine();
636        let result = engine
637            .stats(&super::stats::StatsParams {
638                proxy: Some("api".into()),
639                since_ts: 0,
640            })
641            .unwrap();
642        let json = serde_json::to_string(&result).unwrap();
643        assert!(json.contains("avg_us"));
644        assert!(json.contains("min_us"));
645        assert!(json.contains("max_us"));
646        assert!(json.contains("p95_us"));
647        assert!(!json.contains("avg_ms"));
648    }
649
650    #[test]
651    fn log_row__latency_us_field() {
652        let engine = seeded_engine();
653        let rows = engine
654            .logs(&super::logs::LogsParams {
655                proxy: Some("api".into()),
656                since_ts: 0,
657                limit: 100,
658                tool: Some("search".into()),
659                method: None,
660                session: None,
661                status: None,
662                error_code: None,
663            })
664            .unwrap();
665        assert_eq!(rows[0].latency_us, 156);
666        assert_eq!(rows[1].latency_us, 891);
667        assert_eq!(rows[2].latency_us, 142);
668    }
669
670    #[test]
671    fn log_row__serialization_uses_us_field() {
672        let engine = seeded_engine();
673        let rows = engine
674            .logs(&super::logs::LogsParams {
675                proxy: Some("api".into()),
676                since_ts: 0,
677                limit: 1,
678                tool: None,
679                method: None,
680                session: None,
681                status: None,
682                error_code: None,
683            })
684            .unwrap();
685        let json = serde_json::to_string(&rows[0]).unwrap();
686        assert!(json.contains("latency_us"));
687        assert!(!json.contains("latency_ms"));
688    }
689
690    #[test]
691    fn slow__threshold_us_precision() {
692        let engine = seeded_engine();
693        let rows = engine
694            .slow(&super::slow::SlowParams {
695                proxy: Some("api".into()),
696                tool: None,
697                threshold_us: 150,
698                since_ts: 0,
699                limit: 100,
700            })
701            .unwrap();
702        assert_eq!(rows.len(), 3);
703        assert_eq!(rows[0].latency_us, 4201);
704        assert_eq!(rows[1].latency_us, 891);
705        assert_eq!(rows[2].latency_us, 156);
706    }
707
708    #[test]
709    fn slow__exact_threshold_boundary() {
710        let engine = seeded_engine();
711        let rows = engine
712            .slow(&super::slow::SlowParams {
713                proxy: Some("api".into()),
714                tool: None,
715                threshold_us: 891,
716                since_ts: 0,
717                limit: 100,
718            })
719            .unwrap();
720        assert_eq!(rows.len(), 2);
721        assert_eq!(rows[0].latency_us, 4201);
722        assert_eq!(rows[1].latency_us, 891);
723    }
724
725    // ── clients ─────────────────────────────────────────────────────────
726
727    #[test]
728    fn clients__aggregates_by_client() {
729        let engine = seeded_engine();
730        let rows = engine
731            .clients(&super::clients::ClientsParams {
732                proxy: Some("api".into()),
733                since_ts: 0,
734            })
735            .unwrap();
736        assert_eq!(rows.len(), 2);
737        assert_eq!(rows[0].client_name.as_deref(), Some("claude-desktop"));
738        assert_eq!(rows[0].total_calls, 3);
739        assert_eq!(rows[1].client_name.as_deref(), Some("cursor"));
740        assert_eq!(rows[1].total_calls, 2);
741    }
742
743    // ── sessions ────────────────────────────────────────────────────────
744
745    #[test]
746    fn sessions__returns_all() {
747        let engine = seeded_engine();
748        let rows = engine
749            .sessions(&super::sessions::SessionsParams {
750                proxy: Some("api".into()),
751                since_ts: 0,
752                limit: 100,
753                active_only: false,
754                client: None,
755            })
756            .unwrap();
757        assert_eq!(rows.len(), 2);
758    }
759
760    #[test]
761    fn sessions__filter_by_client() {
762        let engine = seeded_engine();
763        let rows = engine
764            .sessions(&super::sessions::SessionsParams {
765                proxy: Some("api".into()),
766                since_ts: 0,
767                limit: 100,
768                active_only: false,
769                client: Some("cursor".into()),
770            })
771            .unwrap();
772        assert_eq!(rows.len(), 1);
773        assert_eq!(rows[0].client_name.as_deref(), Some("cursor"));
774    }
775
776    // ── session_detail ──────────────────────────────────────────────────
777
778    #[test]
779    fn session_detail__returns_with_requests() {
780        let engine = seeded_engine();
781        let detail = engine.session_detail("s1").unwrap().unwrap();
782        assert_eq!(detail.session_id, "s1");
783        assert_eq!(detail.client_name.as_deref(), Some("claude-desktop"));
784        assert_eq!(detail.client_version.as_deref(), Some("1.2.0"));
785        assert_eq!(detail.client_platform.as_deref(), Some("claude"));
786        assert_eq!(detail.total_calls, 3);
787        assert_eq!(detail.total_errors, 1);
788        assert_eq!(detail.requests.len(), 3);
789        assert_eq!(detail.requests[0].request_id, "r1");
790        assert_eq!(detail.requests[1].request_id, "r2");
791        assert_eq!(detail.requests[2].request_id, "r3");
792    }
793
794    #[test]
795    fn session_detail__closed_session() {
796        let engine = seeded_engine();
797        let detail = engine.session_detail("s2").unwrap().unwrap();
798        assert_eq!(detail.session_id, "s2");
799        assert_eq!(detail.client_name.as_deref(), Some("cursor"));
800        assert_eq!(detail.ended_at, Some(3500));
801        assert_eq!(detail.requests.len(), 2);
802        assert_eq!(detail.requests[0].request_id, "r4");
803        assert_eq!(detail.requests[1].request_id, "r5");
804    }
805
806    #[test]
807    fn session_detail__nonexistent_returns_none() {
808        let engine = seeded_engine();
809        let result = engine.session_detail("no-such-session").unwrap();
810        assert!(result.is_none());
811    }
812
813    #[test]
814    fn session_detail__requests_ordered_oldest_first() {
815        let engine = seeded_engine();
816        let detail = engine.session_detail("s1").unwrap().unwrap();
817        for pair in detail.requests.windows(2) {
818            assert!(pair[0].ts <= pair[1].ts);
819        }
820    }
821
822    #[test]
823    fn session_detail__serializes_to_json() {
824        let engine = seeded_engine();
825        let detail = engine.session_detail("s1").unwrap().unwrap();
826        let json = serde_json::to_string(&detail).unwrap();
827        assert!(json.contains("session_id"));
828        assert!(json.contains("client_name"));
829        assert!(json.contains("requests"));
830        assert!(json.contains("r1"));
831    }
832
833    // ── store_ops ───────────────────────────────────────────────────────
834
835    #[test]
836    fn vacuum__dry_run_counts_correctly() {
837        let engine = seeded_engine();
838        let result = engine
839            .vacuum(&super::store_ops::VacuumParams {
840                before_ts: 3500,
841                proxy: None,
842                dry_run: true,
843            })
844            .unwrap();
845        assert_eq!(result.deleted_requests, 3);
846        assert!(result.dry_run);
847    }
848
849    #[test]
850    fn vacuum__actually_deletes() {
851        let engine = seeded_engine();
852        let result = engine
853            .vacuum(&super::store_ops::VacuumParams {
854                before_ts: 3500,
855                proxy: None,
856                dry_run: false,
857            })
858            .unwrap();
859        assert_eq!(result.deleted_requests, 3);
860        assert!(!result.dry_run);
861
862        let remaining = engine
863            .logs(&super::logs::LogsParams {
864                proxy: Some("api".into()),
865                since_ts: 0,
866                limit: 100,
867                tool: None,
868                method: None,
869                session: None,
870                status: None,
871                error_code: None,
872            })
873            .unwrap();
874        assert_eq!(remaining.len(), 2);
875    }
876
877    // ── serialization ───────────────────────────────────────────────────
878
879    #[test]
880    fn log_row__serializes_to_json() {
881        let engine = seeded_engine();
882        let rows = engine
883            .logs(&super::logs::LogsParams {
884                proxy: Some("api".into()),
885                since_ts: 0,
886                limit: 1,
887                tool: None,
888                method: None,
889                session: None,
890                status: None,
891                error_code: None,
892            })
893            .unwrap();
894        let json = serde_json::to_string(&rows[0]).unwrap();
895        assert!(json.contains("request_id"));
896        assert!(json.contains("latency_us"));
897    }
898
899    #[test]
900    fn client_row__serializes_to_json() {
901        let engine = seeded_engine();
902        let rows = engine
903            .clients(&super::clients::ClientsParams {
904                proxy: Some("api".into()),
905                since_ts: 0,
906            })
907            .unwrap();
908        let json = serde_json::to_string(&rows[0]).unwrap();
909        assert!(json.contains("client_name"));
910        assert!(json.contains("total_calls"));
911    }
912
913    #[test]
914    fn stats__serializes_to_json() {
915        let engine = seeded_engine();
916        let result = engine
917            .stats(&super::stats::StatsParams {
918                proxy: Some("api".into()),
919                since_ts: 0,
920            })
921            .unwrap();
922        let json = serde_json::to_string(&result).unwrap();
923        assert!(json.contains("total_calls"));
924        assert!(json.contains("tools"));
925    }
926
927    // ── schema ─────────────────────────────────────────────────────────
928
929    fn seed_schema(engine: &QueryEngine) {
930        engine
931            .conn()
932            .execute(
933                "INSERT INTO server_schema (upstream_url, method, payload, captured_at, schema_hash) VALUES (?1, ?2, ?3, ?4, ?5)",
934                params![
935                    "http://localhost:9000",
936                    "initialize",
937                    r#"{"serverInfo":{"name":"test-server","version":"1.0"},"protocolVersion":"2025-03-26","capabilities":{"tools":{}}}"#,
938                    1000i64,
939                    "hash_init"
940                ],
941            )
942            .unwrap();
943        engine
944            .conn()
945            .execute(
946                "INSERT INTO server_schema (upstream_url, method, payload, captured_at, schema_hash) VALUES (?1, ?2, ?3, ?4, ?5)",
947                params![
948                    "http://localhost:9000",
949                    "tools/list",
950                    r#"{"tools":[{"name":"search","description":"search things"}]}"#,
951                    2000i64,
952                    "hash_tools"
953                ],
954            )
955            .unwrap();
956        engine
957            .conn()
958            .execute(
959                "INSERT INTO schema_changes (upstream_url, method, change_type, item_name, old_hash, new_hash, detected_at) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
960                params!["http://localhost:9000", "tools/list", "initial", Option::<String>::None, Option::<String>::None, "hash_tools", 2000i64],
961            )
962            .unwrap();
963    }
964
965    /// Insert one `tools/list` schema row + its "initial" change row under
966    /// the given proxy name and URL.
967    fn seed_schema_for_proxy(engine: &QueryEngine, proxy: &str, upstream: &str) {
968        engine
969            .conn()
970            .execute(
971                "INSERT INTO server_schema (proxy, upstream_url, method, payload, captured_at, schema_hash) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
972                params![
973                    proxy,
974                    upstream,
975                    "tools/list",
976                    r#"{"tools":[{"name":"search","description":"search things"}]}"#,
977                    1000i64,
978                    format!("hash-{proxy}")
979                ],
980            )
981            .unwrap();
982        engine
983            .conn()
984            .execute(
985                "INSERT INTO schema_changes (proxy, upstream_url, method, change_type, item_name, old_hash, new_hash, detected_at) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
986                params![
987                    proxy,
988                    upstream,
989                    "tools/list",
990                    "initial",
991                    Option::<String>::None,
992                    Option::<String>::None,
993                    format!("hash-{proxy}"),
994                    1000i64,
995                ],
996            )
997            .unwrap();
998    }
999
1000    #[test]
1001    fn schema__returns_all_snapshots() {
1002        let engine = seeded_engine();
1003        seed_schema(&engine);
1004        let rows = engine
1005            .schema(&super::schema::SchemaParams {
1006                proxy: None,
1007                method: None,
1008            })
1009            .unwrap();
1010        assert_eq!(rows.len(), 2);
1011    }
1012
1013    #[test]
1014    fn schema__filter_by_method() {
1015        let engine = seeded_engine();
1016        seed_schema(&engine);
1017        let rows = engine
1018            .schema(&super::schema::SchemaParams {
1019                proxy: None,
1020                method: Some("tools/list".into()),
1021            })
1022            .unwrap();
1023        assert_eq!(rows.len(), 1);
1024        assert_eq!(rows[0].method, "tools/list");
1025    }
1026
1027    #[test]
1028    fn schema__filter_by_proxy() {
1029        let engine = seeded_engine();
1030        seed_schema_for_proxy(&engine, "alpha", "http://a:9000");
1031        seed_schema_for_proxy(&engine, "beta", "http://b:9000");
1032
1033        let alpha = engine
1034            .schema(&super::schema::SchemaParams {
1035                proxy: Some("alpha".into()),
1036                method: None,
1037            })
1038            .unwrap();
1039        assert_eq!(alpha.len(), 1);
1040        assert_eq!(alpha[0].upstream_url, "http://a:9000");
1041
1042        let beta = engine
1043            .schema(&super::schema::SchemaParams {
1044                proxy: Some("beta".into()),
1045                method: None,
1046            })
1047            .unwrap();
1048        assert_eq!(beta.len(), 1);
1049        assert_eq!(beta[0].upstream_url, "http://b:9000");
1050
1051        let missing = engine
1052            .schema(&super::schema::SchemaParams {
1053                proxy: Some("nonexistent".into()),
1054                method: None,
1055            })
1056            .unwrap();
1057        assert!(missing.is_empty());
1058    }
1059
1060    #[test]
1061    fn latest_schema_row__returns_row_for_proxy_method() {
1062        let engine = seeded_engine();
1063        seed_schema_for_proxy(&engine, "alpha", "http://a:9000");
1064
1065        let row = engine
1066            .latest_schema_row("alpha", "tools/list")
1067            .unwrap()
1068            .expect("row must exist");
1069        assert_eq!(row.proxy, "alpha");
1070        assert_eq!(row.upstream_url, "http://a:9000");
1071        assert_eq!(row.method, "tools/list");
1072        assert_eq!(row.schema_hash, "hash-alpha");
1073    }
1074
1075    #[test]
1076    fn latest_schema_row__none_when_missing() {
1077        let engine = seeded_engine();
1078        assert!(
1079            engine
1080                .latest_schema_row("nonexistent", "tools/list")
1081                .unwrap()
1082                .is_none()
1083        );
1084    }
1085
1086    #[test]
1087    fn latest_schema_row__scoped_by_proxy() {
1088        let engine = seeded_engine();
1089        seed_schema_for_proxy(&engine, "alpha", "http://a:9000");
1090        seed_schema_for_proxy(&engine, "beta", "http://b:9000");
1091
1092        let a = engine
1093            .latest_schema_row("alpha", "tools/list")
1094            .unwrap()
1095            .unwrap();
1096        let b = engine
1097            .latest_schema_row("beta", "tools/list")
1098            .unwrap()
1099            .unwrap();
1100        assert_eq!(a.schema_hash, "hash-alpha");
1101        assert_eq!(b.schema_hash, "hash-beta");
1102    }
1103
1104    #[test]
1105    fn schema_changes__returns_history() {
1106        let engine = seeded_engine();
1107        seed_schema(&engine);
1108        let rows = engine
1109            .schema_changes(&super::schema::SchemaChangesParams {
1110                proxy: None,
1111                method: None,
1112                limit: 50,
1113            })
1114            .unwrap();
1115        assert_eq!(rows.len(), 1);
1116        assert_eq!(rows[0].change_type, "initial");
1117    }
1118
1119    #[test]
1120    fn schema_changes__filter_by_proxy() {
1121        let engine = seeded_engine();
1122        seed_schema_for_proxy(&engine, "alpha", "http://a:9000");
1123        seed_schema_for_proxy(&engine, "beta", "http://b:9000");
1124
1125        let alpha = engine
1126            .schema_changes(&super::schema::SchemaChangesParams {
1127                proxy: Some("alpha".into()),
1128                method: None,
1129                limit: 50,
1130            })
1131            .unwrap();
1132        assert_eq!(alpha.len(), 1);
1133        assert_eq!(alpha[0].upstream_url, "http://a:9000");
1134
1135        let all = engine
1136            .schema_changes(&super::schema::SchemaChangesParams {
1137                proxy: None,
1138                method: None,
1139                limit: 50,
1140            })
1141            .unwrap();
1142        assert_eq!(all.len(), 2);
1143    }
1144
1145    #[test]
1146    fn schema_status__complete() {
1147        let engine = seeded_engine();
1148        seed_schema(&engine);
1149        let status = engine.schema_status("http://localhost:9000").unwrap();
1150        assert_eq!(status.status, "complete");
1151        assert_eq!(status.server_name.as_deref(), Some("test-server"));
1152        assert_eq!(status.server_version.as_deref(), Some("1.0"));
1153        assert_eq!(status.protocol_version.as_deref(), Some("2025-03-26"));
1154        assert!(status.capabilities.contains(&"tools".to_string()));
1155        assert_eq!(status.methods_captured.len(), 2);
1156    }
1157
1158    #[test]
1159    fn schema_status__unknown() {
1160        let engine = seeded_engine();
1161        let status = engine.schema_status("http://nonexistent").unwrap();
1162        assert_eq!(status.status, "unknown");
1163        assert!(status.methods_captured.is_empty());
1164    }
1165
1166    #[test]
1167    fn schema_status__partial() {
1168        let engine = seeded_engine();
1169        engine
1170            .conn()
1171            .execute(
1172                "INSERT INTO server_schema (upstream_url, method, payload, captured_at, schema_hash) VALUES (?1, ?2, ?3, ?4, ?5)",
1173                params!["http://partial", "tools/list", "{}", 1000i64, "h1"],
1174            )
1175            .unwrap();
1176        let status = engine.schema_status("http://partial").unwrap();
1177        assert_eq!(status.status, "partial");
1178    }
1179
1180    // ── schema unused ──────────────────────────────────────────────────
1181
1182    #[test]
1183    fn schema_unused__finds_uncalled_tools() {
1184        let engine = seeded_engine();
1185        seed_schema(&engine);
1186
1187        engine
1188            .conn()
1189            .execute(
1190                "UPDATE server_schema SET payload = ?1 WHERE method = 'tools/list'",
1191                params![r#"{"tools":[{"name":"search","description":"search things"},{"name":"never_used","description":"does nothing"}]}"#],
1192            )
1193            .unwrap();
1194
1195        let rows = engine
1196            .schema_unused(&super::schema::SchemaUnusedParams {
1197                proxy: Some("api".into()),
1198                since_ts: 0,
1199            })
1200            .unwrap();
1201
1202        assert_eq!(rows.len(), 2);
1203        assert_eq!(rows[0].tool_name, "never_used");
1204        assert_eq!(rows[0].calls, 0);
1205        assert_eq!(rows[1].tool_name, "search");
1206        assert!(rows[1].calls > 0);
1207    }
1208
1209    #[test]
1210    fn schema_unused__empty_when_no_schema() {
1211        let engine = seeded_engine();
1212        let rows = engine
1213            .schema_unused(&super::schema::SchemaUnusedParams {
1214                proxy: Some("api".into()),
1215                since_ts: 0,
1216            })
1217            .unwrap();
1218        assert!(rows.is_empty());
1219    }
1220
1221    // ── multi-proxy: proxy: None shows all ──────────────────────────────
1222
1223    /// Seed a second proxy ("email") alongside the default "api" proxy.
1224    fn seeded_multi_proxy_engine() -> QueryEngine {
1225        let engine = seeded_engine();
1226
1227        // Add a second proxy's session and requests.
1228        engine.conn().execute(
1229            "INSERT INTO sessions (session_id, proxy, started_at, last_seen_at, client_name, client_version, client_platform, total_calls, total_errors)
1230             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
1231            params!["s-email-1", "email", 6000, 7000, "claude-desktop", "1.2.0", "claude", 1, 0],
1232        ).unwrap();
1233
1234        engine.conn().execute(
1235            "INSERT INTO requests (request_id, ts, proxy, session_id, method, tool, latency_us, status, error_code, error_msg, bytes_in, bytes_out)
1236             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
1237            params!["r-email-1", 6000i64, "email", "s-email-1", "tools/call", "send_email", 320i64, "ok", None::<&str>, None::<&str>, Some(512i64), Some(128i64)],
1238        ).unwrap();
1239
1240        engine.conn().execute(
1241            "INSERT INTO requests (request_id, ts, proxy, session_id, method, tool, latency_us, status, error_code, error_msg, bytes_in, bytes_out)
1242             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
1243            params!["r-email-2", 7000i64, "email", "s-email-1", "tools/call", "send_email", 250i64, "ok", None::<&str>, None::<&str>, Some(512i64), Some(128i64)],
1244        ).unwrap();
1245
1246        engine
1247    }
1248
1249    #[test]
1250    fn logs__proxy_none_returns_all() {
1251        let engine = seeded_multi_proxy_engine();
1252        let rows = engine
1253            .logs(&super::logs::LogsParams {
1254                proxy: None,
1255                since_ts: 0,
1256                limit: 100,
1257                tool: None,
1258                method: None,
1259                session: None,
1260                status: None,
1261                error_code: None,
1262            })
1263            .unwrap();
1264        assert_eq!(rows.len(), 7);
1265    }
1266
1267    #[test]
1268    fn logs__proxy_filter_excludes_other() {
1269        let engine = seeded_multi_proxy_engine();
1270        let rows = engine
1271            .logs(&super::logs::LogsParams {
1272                proxy: Some("email".into()),
1273                since_ts: 0,
1274                limit: 100,
1275                tool: None,
1276                method: None,
1277                session: None,
1278                status: None,
1279                error_code: None,
1280            })
1281            .unwrap();
1282        assert_eq!(rows.len(), 2);
1283    }
1284
1285    #[test]
1286    fn stats__proxy_none_aggregates_all() {
1287        let engine = seeded_multi_proxy_engine();
1288        let result = engine
1289            .stats(&super::stats::StatsParams {
1290                proxy: None,
1291                since_ts: 0,
1292            })
1293            .unwrap();
1294        assert_eq!(result.total_calls, 7);
1295    }
1296
1297    #[test]
1298    fn stats__proxy_filter_scopes_to_one() {
1299        let engine = seeded_multi_proxy_engine();
1300        let result = engine
1301            .stats(&super::stats::StatsParams {
1302                proxy: Some("email".into()),
1303                since_ts: 0,
1304            })
1305            .unwrap();
1306        assert_eq!(result.total_calls, 2);
1307    }
1308
1309    #[test]
1310    fn slow__proxy_none_returns_all() {
1311        let engine = seeded_multi_proxy_engine();
1312        let rows = engine
1313            .slow(&super::slow::SlowParams {
1314                proxy: None,
1315                threshold_us: 100,
1316                since_ts: 0,
1317                tool: None,
1318                limit: 100,
1319            })
1320            .unwrap();
1321        assert_eq!(rows.len(), 6);
1322    }
1323
1324    #[test]
1325    fn sessions__proxy_none_returns_all() {
1326        let engine = seeded_multi_proxy_engine();
1327        let rows = engine
1328            .sessions(&super::sessions::SessionsParams {
1329                proxy: None,
1330                since_ts: 0,
1331                limit: 100,
1332                active_only: false,
1333                client: None,
1334            })
1335            .unwrap();
1336        assert_eq!(rows.len(), 3);
1337    }
1338
1339    #[test]
1340    fn clients__proxy_none_returns_all() {
1341        let engine = seeded_multi_proxy_engine();
1342        let rows = engine
1343            .clients(&super::clients::ClientsParams {
1344                proxy: None,
1345                since_ts: 0,
1346            })
1347            .unwrap();
1348        assert!(rows.len() >= 2);
1349    }
1350}