mcpr-integrations 0.4.70

External integrations for mcpr: cloud event sink, API client, and SQLite request storage
Documentation
//! SQL schema definitions for the mcpr storage engine.
//!
//! All table and index definitions live here as constants. The migration runner
//! in [`super::db`] executes these on first open and on version upgrades.
//!
//! # Schema design decisions
//!
//! - **Two tables**: `requests` (one row per MCP request) and `sessions` (one row per
//!   MCP session). Client identity lives in `sessions` only — no denormalization.
//! - **Soft foreign keys**: `requests.session_id` references `sessions.session_id` but
//!   without a FOREIGN KEY constraint. SQLite FK enforcement requires per-connection
//!   pragmas and can cause constraint violations on ordering edge cases in async writes.
//! - **No body storage**: Request/response bodies can be MB-scale. Only metadata is stored.
//! - **Timestamps as unix milliseconds**: Sufficient resolution, avoids i64 overflow,
//!   and is the natural unit for latency math.
//! - **UUIDv7 for request_id**: Time-ordered for efficient indexing, globally unique
//!   for cloud sink correlation.

/// Current schema version. Stored in the `meta` table and checked on startup.
/// Bump this when adding migrations.
pub const SCHEMA_VERSION: &str = "5";

/// Initial schema: requests table, sessions table, meta table, and all indexes.
///
/// Executed as a single batch on first database creation (schema_version = 0 → 1).
pub const V1_SCHEMA: &str = "
-- ── requests ──────────────────────────────────────────────────────────
-- One row per MCP request that flows through the proxy.
-- This is the primary table for all observability queries (logs, slow, stats).
CREATE TABLE IF NOT EXISTS requests (
    -- Identity
    id              INTEGER PRIMARY KEY AUTOINCREMENT,
    request_id      TEXT NOT NULL UNIQUE,    -- UUIDv7, generated by mcpr
    ts              INTEGER NOT NULL,         -- unix milliseconds (UTC)

    -- Routing
    proxy           TEXT NOT NULL,            -- proxy name from config
    session_id      TEXT,                     -- soft FK → sessions.session_id

    -- MCP Protocol
    method          TEXT NOT NULL,            -- tools/call | resources/read | initialize | etc.
    tool            TEXT,                     -- tool name for tools/call, NULL otherwise

    -- Outcome
    latency_ms      INTEGER NOT NULL,         -- wall-clock ms (renamed to latency_us in V4)
    status          TEXT NOT NULL              -- 'ok' | 'error' | 'timeout'
        CHECK (status IN ('ok', 'error', 'timeout')),
    error_code      TEXT,                     -- MCP error code when status = 'error'
    error_msg       TEXT,                     -- error message, truncated to 512 chars

    -- Size
    bytes_in        INTEGER,                  -- request payload bytes
    bytes_out       INTEGER                   -- response payload bytes
);

-- ── sessions ──────────────────────────────────────────────────────────
-- One row per MCP session, populated from the `initialize` handshake.
-- Client identity (name, version, platform) lives here — not on requests.
-- Counters (total_calls, total_errors) are incremented atomically with
-- request inserts in the same batch transaction.
CREATE TABLE IF NOT EXISTS sessions (
    -- Identity
    session_id      TEXT PRIMARY KEY,         -- MCP session ID from initialize
    proxy           TEXT NOT NULL,            -- proxy this session connected through

    -- Lifecycle
    started_at      INTEGER NOT NULL,         -- ts of initialize request (unix ms)
    last_seen_at    INTEGER NOT NULL,         -- ts of most recent request
    ended_at        INTEGER,                  -- ts of clean disconnect, NULL if active

    -- Client identity (from MCP initialize → clientInfo)
    client_name     TEXT,                     -- e.g. 'claude-desktop', 'cursor'
    client_version  TEXT,                     -- e.g. '1.2.0', '0.44.1'
    client_platform TEXT,                     -- normalized: 'claude' | 'chatgpt' | 'vscode' | etc.

    -- Counters (updated per batch flush, same transaction as request inserts)
    total_calls     INTEGER NOT NULL DEFAULT 0,
    total_errors    INTEGER NOT NULL DEFAULT 0
);

-- ── meta ──────────────────────────────────────────────────────────────
-- Key-value metadata for schema versioning and operational info.
-- Used by the migration runner and `mcpr store stats`.
CREATE TABLE IF NOT EXISTS meta (
    key     TEXT PRIMARY KEY,
    value   TEXT NOT NULL
);

-- ── indexes (requests) ───────────────────────────────────────────────
-- Optimized for the access patterns of CLI observability commands.

-- logs: ORDER BY ts DESC, WHERE proxy = ?
CREATE INDEX IF NOT EXISTS idx_requests_ts        ON requests (ts);
CREATE INDEX IF NOT EXISTS idx_requests_proxy     ON requests (proxy, ts);

-- slow: WHERE tool IS NOT NULL, ORDER BY latency_ms DESC
CREATE INDEX IF NOT EXISTS idx_requests_tool      ON requests (tool, ts) WHERE tool IS NOT NULL;

-- session drill-down: WHERE session_id = ?
CREATE INDEX IF NOT EXISTS idx_requests_session   ON requests (session_id) WHERE session_id IS NOT NULL;

-- error filtering: WHERE status = ?, ORDER BY ts
CREATE INDEX IF NOT EXISTS idx_requests_status    ON requests (status, ts);

-- slow query: WHERE proxy = ?, ORDER BY latency_ms DESC
CREATE INDEX IF NOT EXISTS idx_requests_slow      ON requests (proxy, latency_ms DESC, ts);

-- ── indexes (sessions) ───────────────────────────────────────────────

-- session list: WHERE proxy = ?, ORDER BY started_at
CREATE INDEX IF NOT EXISTS idx_sessions_proxy     ON sessions (proxy, started_at);

-- client aggregation: GROUP BY client_name, client_platform
CREATE INDEX IF NOT EXISTS idx_sessions_client    ON sessions (client_name, client_platform);

-- active sessions: WHERE ended_at IS NULL, ORDER BY last_seen_at
CREATE INDEX IF NOT EXISTS idx_sessions_active    ON sessions (proxy, last_seen_at) WHERE ended_at IS NULL;
";

/// SQL to insert the initial meta rows after schema creation.
pub const V1_META_SEED: &str = "
INSERT OR IGNORE INTO meta (key, value) VALUES ('schema_version', '1');
INSERT OR IGNORE INTO meta (key, value) VALUES ('created_at', CAST(strftime('%s', 'now') AS TEXT) || '000');
";

/// V1 → V2 migration: add server_schema and schema_changes tables.
///
/// `server_schema` holds the latest snapshot per (upstream_url, method).
/// `schema_changes` is an append-only log of diffs detected by the writer.
pub const V2_SCHEMA: &str = "
-- ── server_schema ────────────────────────────────────────────────────
-- Latest captured snapshot per upstream server and MCP method.
-- UPSERT pattern: ON CONFLICT(upstream_url, method) DO UPDATE.
CREATE TABLE IF NOT EXISTS server_schema (
    id            INTEGER PRIMARY KEY AUTOINCREMENT,
    upstream_url  TEXT NOT NULL,          -- upstream MCP server URL
    method        TEXT NOT NULL,          -- 'initialize', 'tools/list', etc.
    payload       TEXT NOT NULL,          -- full JSON of the `result` field
    captured_at   INTEGER NOT NULL,       -- unix ms when captured
    schema_hash   TEXT NOT NULL,          -- SHA-256 hex of payload
    UNIQUE(upstream_url, method)
);

-- ── schema_changes ───────────────────────────────────────────────────
-- Immutable append-only log of schema changes detected by the writer.
CREATE TABLE IF NOT EXISTS schema_changes (
    id            INTEGER PRIMARY KEY AUTOINCREMENT,
    upstream_url  TEXT NOT NULL,
    method        TEXT NOT NULL,
    change_type   TEXT NOT NULL,          -- 'initial', 'stale', 'updated',
                                          -- 'tool_added', 'tool_removed', 'tool_modified', etc.
    item_name     TEXT,                   -- e.g. 'search_products', NULL for bulk changes
    old_hash      TEXT,
    new_hash      TEXT,
    detected_at   INTEGER NOT NULL        -- unix ms
);

-- ── indexes ──────────────────────────────────────────────────────────
CREATE INDEX IF NOT EXISTS idx_schema_upstream
    ON server_schema (upstream_url);
CREATE INDEX IF NOT EXISTS idx_schema_changes_upstream
    ON schema_changes (upstream_url, detected_at);

-- Bump schema version.
UPDATE meta SET value = '2' WHERE key = 'schema_version';
";

/// V2 → V3 migration: add `proxy` column to server_schema and schema_changes.
///
/// server_schema has a UNIQUE(upstream_url, method) table constraint that cannot
/// be altered in SQLite. We recreate the table with a new UNIQUE that includes proxy.
pub const V3_SCHEMA: &str = "
-- Recreate server_schema with proxy column and updated unique constraint.
CREATE TABLE IF NOT EXISTS server_schema_new (
    id            INTEGER PRIMARY KEY AUTOINCREMENT,
    proxy         TEXT NOT NULL DEFAULT 'default',
    upstream_url  TEXT NOT NULL,
    method        TEXT NOT NULL,
    payload       TEXT NOT NULL,
    captured_at   INTEGER NOT NULL,
    schema_hash   TEXT NOT NULL,
    UNIQUE(proxy, upstream_url, method)
);
INSERT OR IGNORE INTO server_schema_new (id, proxy, upstream_url, method, payload, captured_at, schema_hash)
    SELECT id, 'default', upstream_url, method, payload, captured_at, schema_hash
    FROM server_schema;
DROP TABLE IF EXISTS server_schema;
ALTER TABLE server_schema_new RENAME TO server_schema;

-- Add proxy column to schema_changes.
ALTER TABLE schema_changes ADD COLUMN proxy TEXT NOT NULL DEFAULT 'default';

-- New indexes for proxy-scoped queries.
DROP INDEX IF EXISTS idx_schema_upstream;
CREATE INDEX IF NOT EXISTS idx_schema_proxy
    ON server_schema (proxy, upstream_url);
DROP INDEX IF EXISTS idx_schema_changes_upstream;
CREATE INDEX IF NOT EXISTS idx_schema_changes_proxy
    ON schema_changes (proxy, detected_at);

-- Bump schema version.
UPDATE meta SET value = '3' WHERE key = 'schema_version';
";

/// V3 → V4 migration: rename `latency_ms` → `latency_us` and convert
/// existing values from milliseconds to microseconds for sub-ms precision.
pub const V4_SCHEMA: &str = "
-- Rename latency column from ms to μs.
ALTER TABLE requests RENAME COLUMN latency_ms TO latency_us;

-- Convert existing rows: ms → μs (multiply by 1000).
UPDATE requests SET latency_us = latency_us * 1000;

-- Recreate the slow index with the new column name.
DROP INDEX IF EXISTS idx_requests_slow;
CREATE INDEX IF NOT EXISTS idx_requests_slow ON requests (proxy, latency_us DESC, ts);

-- Bump schema version.
UPDATE meta SET value = '4' WHERE key = 'schema_version';
";

/// V4 → V5 migration: add `resource_uri` and `prompt_name` columns to
/// the `requests` table. Captured by the proxy's `TargetExtractMiddleware`:
///   resources/{read,subscribe,unsubscribe} → params.uri  → resource_uri
///   prompts/get                            → params.name → prompt_name
pub const V5_SCHEMA: &str = "
ALTER TABLE requests ADD COLUMN resource_uri TEXT;
ALTER TABLE requests ADD COLUMN prompt_name TEXT;

-- Bump schema version.
UPDATE meta SET value = '5' WHERE key = 'schema_version';
";

/// SQL to insert or update the mcpr_version meta key on every startup.
pub const UPSERT_MCPR_VERSION: &str = "
INSERT INTO meta (key, value) VALUES ('mcpr_version', ?1)
    ON CONFLICT(key) DO UPDATE SET value = excluded.value;
";

// ── Prepared statement SQL ────────────────────────────────────────────
// Used by the background writer for batch inserts/updates.

/// INSERT a new request row. All parameters are positional (?1 .. ?14).
pub const INSERT_REQUEST_SQL: &str = "
INSERT INTO requests (
    request_id, ts, proxy, session_id,
    method, tool, resource_uri, prompt_name,
    latency_us, status, error_code, error_msg,
    bytes_in, bytes_out
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14);
";

/// INSERT a new session row. Uses INSERT OR IGNORE because a reconnecting
/// client may re-send `initialize` with the same session ID.
pub const INSERT_SESSION_SQL: &str = "
INSERT OR IGNORE INTO sessions (
    session_id, proxy, started_at, last_seen_at,
    client_name, client_version, client_platform,
    total_calls, total_errors
) VALUES (?1, ?2, ?3, ?3, ?4, ?5, ?6, 0, 0);
";

/// UPDATE session counters and last_seen_at. Executed in the same transaction
/// as the request INSERT to keep counters consistent.
pub const UPDATE_SESSION_COUNTERS_SQL: &str = "
UPDATE sessions
SET last_seen_at = MAX(last_seen_at, ?1),
    total_calls  = total_calls + 1,
    total_errors = total_errors + ?2
WHERE session_id = ?3;
";

/// Mark a session as ended (clean transport close).
/// Only updates if not already ended (idempotent).
pub const CLOSE_SESSION_SQL: &str = "
UPDATE sessions SET ended_at = ?1
WHERE session_id = ?2 AND ended_at IS NULL;
";

// ── Schema capture prepared statements ───────────────────────────────

/// UPSERT a server_schema row. ON CONFLICT updates the existing row.
/// Parameters: ?1=proxy, ?2=upstream_url, ?3=method, ?4=payload, ?5=captured_at, ?6=schema_hash.
pub const UPSERT_SERVER_SCHEMA_SQL: &str = "
INSERT INTO server_schema (proxy, upstream_url, method, payload, captured_at, schema_hash)
VALUES (?1, ?2, ?3, ?4, ?5, ?6)
ON CONFLICT(proxy, upstream_url, method) DO UPDATE SET
    payload = excluded.payload,
    captured_at = excluded.captured_at,
    schema_hash = excluded.schema_hash;
";

/// Fetch the current schema_hash and payload for a given proxy+upstream+method.
/// Used by the writer to detect changes before upserting.
/// Parameters: ?1=proxy, ?2=upstream_url, ?3=method.
pub const GET_SCHEMA_HASH_SQL: &str = "
SELECT schema_hash, payload FROM server_schema
WHERE proxy = ?1 AND upstream_url = ?2 AND method = ?3;
";

/// Insert a schema change record into the append-only log.
/// Parameters: ?1=proxy, ?2=upstream_url, ?3=method, ?4=change_type, ?5=item_name, ?6=old_hash, ?7=new_hash, ?8=detected_at.
pub const INSERT_SCHEMA_CHANGE_SQL: &str = "
INSERT INTO schema_changes (proxy, upstream_url, method, change_type, item_name, old_hash, new_hash, detected_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8);
";