Skip to main content

mcpr_integrations/store/
schema.rs

1//! SQL schema definitions for the mcpr storage engine.
2//!
3//! All table and index definitions live here as constants. The migration runner
4//! in [`super::db`] executes these on first open and on version upgrades.
5//!
6//! # Schema design decisions
7//!
8//! - **Two tables**: `requests` (one row per MCP request) and `sessions` (one row per
9//!   MCP session). Client identity lives in `sessions` only — no denormalization.
10//! - **Soft foreign keys**: `requests.session_id` references `sessions.session_id` but
11//!   without a FOREIGN KEY constraint. SQLite FK enforcement requires per-connection
12//!   pragmas and can cause constraint violations on ordering edge cases in async writes.
13//! - **No body storage**: Request/response bodies can be MB-scale. Only metadata is stored.
14//! - **Timestamps as unix milliseconds**: Sufficient resolution, avoids i64 overflow,
15//!   and is the natural unit for latency math.
16//! - **UUIDv7 for request_id**: Time-ordered for efficient indexing, globally unique
17//!   for cloud sink correlation.
18
19/// Current schema version. Stored in the `meta` table and checked on startup.
20/// Bump this when adding migrations.
21pub const SCHEMA_VERSION: &str = "4";
22
23/// Initial schema: requests table, sessions table, meta table, and all indexes.
24///
25/// Executed as a single batch on first database creation (schema_version = 0 → 1).
26pub const V1_SCHEMA: &str = "
27-- ── requests ──────────────────────────────────────────────────────────
28-- One row per MCP request that flows through the proxy.
29-- This is the primary table for all observability queries (logs, slow, stats).
30CREATE TABLE IF NOT EXISTS requests (
31    -- Identity
32    id              INTEGER PRIMARY KEY AUTOINCREMENT,
33    request_id      TEXT NOT NULL UNIQUE,    -- UUIDv7, generated by mcpr
34    ts              INTEGER NOT NULL,         -- unix milliseconds (UTC)
35
36    -- Routing
37    proxy           TEXT NOT NULL,            -- proxy name from config
38    session_id      TEXT,                     -- soft FK → sessions.session_id
39
40    -- MCP Protocol
41    method          TEXT NOT NULL,            -- tools/call | resources/read | initialize | etc.
42    tool            TEXT,                     -- tool name for tools/call, NULL otherwise
43
44    -- Outcome
45    latency_ms      INTEGER NOT NULL,         -- wall-clock ms (renamed to latency_us in V4)
46    status          TEXT NOT NULL              -- 'ok' | 'error' | 'timeout'
47        CHECK (status IN ('ok', 'error', 'timeout')),
48    error_code      TEXT,                     -- MCP error code when status = 'error'
49    error_msg       TEXT,                     -- error message, truncated to 512 chars
50
51    -- Size
52    bytes_in        INTEGER,                  -- request payload bytes
53    bytes_out       INTEGER                   -- response payload bytes
54);
55
56-- ── sessions ──────────────────────────────────────────────────────────
57-- One row per MCP session, populated from the `initialize` handshake.
58-- Client identity (name, version, platform) lives here — not on requests.
59-- Counters (total_calls, total_errors) are incremented atomically with
60-- request inserts in the same batch transaction.
61CREATE TABLE IF NOT EXISTS sessions (
62    -- Identity
63    session_id      TEXT PRIMARY KEY,         -- MCP session ID from initialize
64    proxy           TEXT NOT NULL,            -- proxy this session connected through
65
66    -- Lifecycle
67    started_at      INTEGER NOT NULL,         -- ts of initialize request (unix ms)
68    last_seen_at    INTEGER NOT NULL,         -- ts of most recent request
69    ended_at        INTEGER,                  -- ts of clean disconnect, NULL if active
70
71    -- Client identity (from MCP initialize → clientInfo)
72    client_name     TEXT,                     -- e.g. 'claude-desktop', 'cursor'
73    client_version  TEXT,                     -- e.g. '1.2.0', '0.44.1'
74    client_platform TEXT,                     -- normalized: 'claude' | 'chatgpt' | 'vscode' | etc.
75
76    -- Counters (updated per batch flush, same transaction as request inserts)
77    total_calls     INTEGER NOT NULL DEFAULT 0,
78    total_errors    INTEGER NOT NULL DEFAULT 0
79);
80
81-- ── meta ──────────────────────────────────────────────────────────────
82-- Key-value metadata for schema versioning and operational info.
83-- Used by the migration runner and `mcpr store stats`.
84CREATE TABLE IF NOT EXISTS meta (
85    key     TEXT PRIMARY KEY,
86    value   TEXT NOT NULL
87);
88
89-- ── indexes (requests) ───────────────────────────────────────────────
90-- Optimized for the access patterns of CLI observability commands.
91
92-- logs: ORDER BY ts DESC, WHERE proxy = ?
93CREATE INDEX IF NOT EXISTS idx_requests_ts        ON requests (ts);
94CREATE INDEX IF NOT EXISTS idx_requests_proxy     ON requests (proxy, ts);
95
96-- slow: WHERE tool IS NOT NULL, ORDER BY latency_ms DESC
97CREATE INDEX IF NOT EXISTS idx_requests_tool      ON requests (tool, ts) WHERE tool IS NOT NULL;
98
99-- session drill-down: WHERE session_id = ?
100CREATE INDEX IF NOT EXISTS idx_requests_session   ON requests (session_id) WHERE session_id IS NOT NULL;
101
102-- error filtering: WHERE status = ?, ORDER BY ts
103CREATE INDEX IF NOT EXISTS idx_requests_status    ON requests (status, ts);
104
105-- slow query: WHERE proxy = ?, ORDER BY latency_ms DESC
106CREATE INDEX IF NOT EXISTS idx_requests_slow      ON requests (proxy, latency_ms DESC, ts);
107
108-- ── indexes (sessions) ───────────────────────────────────────────────
109
110-- session list: WHERE proxy = ?, ORDER BY started_at
111CREATE INDEX IF NOT EXISTS idx_sessions_proxy     ON sessions (proxy, started_at);
112
113-- client aggregation: GROUP BY client_name, client_platform
114CREATE INDEX IF NOT EXISTS idx_sessions_client    ON sessions (client_name, client_platform);
115
116-- active sessions: WHERE ended_at IS NULL, ORDER BY last_seen_at
117CREATE INDEX IF NOT EXISTS idx_sessions_active    ON sessions (proxy, last_seen_at) WHERE ended_at IS NULL;
118";
119
120/// SQL to insert the initial meta rows after schema creation.
121pub const V1_META_SEED: &str = "
122INSERT OR IGNORE INTO meta (key, value) VALUES ('schema_version', '1');
123INSERT OR IGNORE INTO meta (key, value) VALUES ('created_at', CAST(strftime('%s', 'now') AS TEXT) || '000');
124";
125
126/// V1 → V2 migration: add server_schema and schema_changes tables.
127///
128/// `server_schema` holds the latest snapshot per (upstream_url, method).
129/// `schema_changes` is an append-only log of diffs detected by the writer.
130pub const V2_SCHEMA: &str = "
131-- ── server_schema ────────────────────────────────────────────────────
132-- Latest captured snapshot per upstream server and MCP method.
133-- UPSERT pattern: ON CONFLICT(upstream_url, method) DO UPDATE.
134CREATE TABLE IF NOT EXISTS server_schema (
135    id            INTEGER PRIMARY KEY AUTOINCREMENT,
136    upstream_url  TEXT NOT NULL,          -- upstream MCP server URL
137    method        TEXT NOT NULL,          -- 'initialize', 'tools/list', etc.
138    payload       TEXT NOT NULL,          -- full JSON of the `result` field
139    captured_at   INTEGER NOT NULL,       -- unix ms when captured
140    schema_hash   TEXT NOT NULL,          -- SHA-256 hex of payload
141    UNIQUE(upstream_url, method)
142);
143
144-- ── schema_changes ───────────────────────────────────────────────────
145-- Immutable append-only log of schema changes detected by the writer.
146CREATE TABLE IF NOT EXISTS schema_changes (
147    id            INTEGER PRIMARY KEY AUTOINCREMENT,
148    upstream_url  TEXT NOT NULL,
149    method        TEXT NOT NULL,
150    change_type   TEXT NOT NULL,          -- 'initial', 'stale', 'updated',
151                                          -- 'tool_added', 'tool_removed', 'tool_modified', etc.
152    item_name     TEXT,                   -- e.g. 'search_products', NULL for bulk changes
153    old_hash      TEXT,
154    new_hash      TEXT,
155    detected_at   INTEGER NOT NULL        -- unix ms
156);
157
158-- ── indexes ──────────────────────────────────────────────────────────
159CREATE INDEX IF NOT EXISTS idx_schema_upstream
160    ON server_schema (upstream_url);
161CREATE INDEX IF NOT EXISTS idx_schema_changes_upstream
162    ON schema_changes (upstream_url, detected_at);
163
164-- Bump schema version.
165UPDATE meta SET value = '2' WHERE key = 'schema_version';
166";
167
168/// V2 → V3 migration: add `proxy` column to server_schema and schema_changes.
169///
170/// server_schema has a UNIQUE(upstream_url, method) table constraint that cannot
171/// be altered in SQLite. We recreate the table with a new UNIQUE that includes proxy.
172pub const V3_SCHEMA: &str = "
173-- Recreate server_schema with proxy column and updated unique constraint.
174CREATE TABLE IF NOT EXISTS server_schema_new (
175    id            INTEGER PRIMARY KEY AUTOINCREMENT,
176    proxy         TEXT NOT NULL DEFAULT 'default',
177    upstream_url  TEXT NOT NULL,
178    method        TEXT NOT NULL,
179    payload       TEXT NOT NULL,
180    captured_at   INTEGER NOT NULL,
181    schema_hash   TEXT NOT NULL,
182    UNIQUE(proxy, upstream_url, method)
183);
184INSERT OR IGNORE INTO server_schema_new (id, proxy, upstream_url, method, payload, captured_at, schema_hash)
185    SELECT id, 'default', upstream_url, method, payload, captured_at, schema_hash
186    FROM server_schema;
187DROP TABLE IF EXISTS server_schema;
188ALTER TABLE server_schema_new RENAME TO server_schema;
189
190-- Add proxy column to schema_changes.
191ALTER TABLE schema_changes ADD COLUMN proxy TEXT NOT NULL DEFAULT 'default';
192
193-- New indexes for proxy-scoped queries.
194DROP INDEX IF EXISTS idx_schema_upstream;
195CREATE INDEX IF NOT EXISTS idx_schema_proxy
196    ON server_schema (proxy, upstream_url);
197DROP INDEX IF EXISTS idx_schema_changes_upstream;
198CREATE INDEX IF NOT EXISTS idx_schema_changes_proxy
199    ON schema_changes (proxy, detected_at);
200
201-- Bump schema version.
202UPDATE meta SET value = '3' WHERE key = 'schema_version';
203";
204
205/// V3 → V4 migration: rename `latency_ms` → `latency_us` and convert
206/// existing values from milliseconds to microseconds for sub-ms precision.
207pub const V4_SCHEMA: &str = "
208-- Rename latency column from ms to μs.
209ALTER TABLE requests RENAME COLUMN latency_ms TO latency_us;
210
211-- Convert existing rows: ms → μs (multiply by 1000).
212UPDATE requests SET latency_us = latency_us * 1000;
213
214-- Recreate the slow index with the new column name.
215DROP INDEX IF EXISTS idx_requests_slow;
216CREATE INDEX IF NOT EXISTS idx_requests_slow ON requests (proxy, latency_us DESC, ts);
217
218-- Bump schema version.
219UPDATE meta SET value = '4' WHERE key = 'schema_version';
220";
221
222/// SQL to insert or update the mcpr_version meta key on every startup.
223pub const UPSERT_MCPR_VERSION: &str = "
224INSERT INTO meta (key, value) VALUES ('mcpr_version', ?1)
225    ON CONFLICT(key) DO UPDATE SET value = excluded.value;
226";
227
228// ── Prepared statement SQL ────────────────────────────────────────────
229// Used by the background writer for batch inserts/updates.
230
231/// INSERT a new request row. All parameters are positional (?1 .. ?12).
232pub const INSERT_REQUEST_SQL: &str = "
233INSERT INTO requests (
234    request_id, ts, proxy, session_id,
235    method, tool,
236    latency_us, status, error_code, error_msg,
237    bytes_in, bytes_out
238) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12);
239";
240
241/// INSERT a new session row. Uses INSERT OR IGNORE because a reconnecting
242/// client may re-send `initialize` with the same session ID.
243pub const INSERT_SESSION_SQL: &str = "
244INSERT OR IGNORE INTO sessions (
245    session_id, proxy, started_at, last_seen_at,
246    client_name, client_version, client_platform,
247    total_calls, total_errors
248) VALUES (?1, ?2, ?3, ?3, ?4, ?5, ?6, 0, 0);
249";
250
251/// UPDATE session counters and last_seen_at. Executed in the same transaction
252/// as the request INSERT to keep counters consistent.
253pub const UPDATE_SESSION_COUNTERS_SQL: &str = "
254UPDATE sessions
255SET last_seen_at = MAX(last_seen_at, ?1),
256    total_calls  = total_calls + 1,
257    total_errors = total_errors + ?2
258WHERE session_id = ?3;
259";
260
261/// Mark a session as ended (clean transport close).
262/// Only updates if not already ended (idempotent).
263pub const CLOSE_SESSION_SQL: &str = "
264UPDATE sessions SET ended_at = ?1
265WHERE session_id = ?2 AND ended_at IS NULL;
266";
267
268// ── Schema capture prepared statements ───────────────────────────────
269
270/// UPSERT a server_schema row. ON CONFLICT updates the existing row.
271/// Parameters: ?1=proxy, ?2=upstream_url, ?3=method, ?4=payload, ?5=captured_at, ?6=schema_hash.
272pub const UPSERT_SERVER_SCHEMA_SQL: &str = "
273INSERT INTO server_schema (proxy, upstream_url, method, payload, captured_at, schema_hash)
274VALUES (?1, ?2, ?3, ?4, ?5, ?6)
275ON CONFLICT(proxy, upstream_url, method) DO UPDATE SET
276    payload = excluded.payload,
277    captured_at = excluded.captured_at,
278    schema_hash = excluded.schema_hash;
279";
280
281/// Fetch the current schema_hash and payload for a given proxy+upstream+method.
282/// Used by the writer to detect changes before upserting.
283/// Parameters: ?1=proxy, ?2=upstream_url, ?3=method.
284pub const GET_SCHEMA_HASH_SQL: &str = "
285SELECT schema_hash, payload FROM server_schema
286WHERE proxy = ?1 AND upstream_url = ?2 AND method = ?3;
287";
288
289/// Insert a schema change record into the append-only log.
290/// Parameters: ?1=proxy, ?2=upstream_url, ?3=method, ?4=change_type, ?5=item_name, ?6=old_hash, ?7=new_hash, ?8=detected_at.
291pub const INSERT_SCHEMA_CHANGE_SQL: &str = "
292INSERT INTO schema_changes (proxy, upstream_url, method, change_type, item_name, old_hash, new_hash, detected_at)
293VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8);
294";