Skip to main content

mcpr_integrations/sinks/
sqlite_sink.rs

1//! SQLite event sink — writes proxy events to the local SQLite store.
2//!
3//! Converts `ProxyEvent` variants into store operations:
4//! - `Request` → insert into `requests` table + update session counters
5//! - `SessionStart` → insert into `sessions` table
6//! - `SessionEnd` → update `ended_at` on the session
7//! - `Heartbeat` → ignored (not stored locally)
8
9use mcpr_core::event::{EventSink, ProxyEvent, SchemaVersionCreatedEvent};
10
11use crate::store::engine::Store;
12use crate::store::event::{
13    RequestEvent as StoreRequestEvent, RequestStatus, SchemaVersionEvent,
14    SessionEvent as StoreSessionEvent, StoreEvent,
15};
16
17/// Event sink that writes to the SQLite store.
18///
19/// Wraps the existing `Store` and converts `ProxyEvent` → `StoreEvent`.
20pub struct SqliteSink {
21    store: Store,
22}
23
24impl SqliteSink {
25    pub fn new(store: Store) -> Self {
26        Self { store }
27    }
28
29    /// Shutdown the underlying store (flush pending writes).
30    pub fn shutdown(&mut self) {
31        self.store.shutdown();
32    }
33}
34
35impl EventSink for SqliteSink {
36    fn on_event(&self, event: &ProxyEvent) {
37        match event {
38            ProxyEvent::Request(e) => {
39                let status = if e.error_code.is_some() || e.status >= 500 {
40                    RequestStatus::Error
41                } else {
42                    RequestStatus::Ok
43                };
44
45                self.store.record(StoreEvent::Request(StoreRequestEvent {
46                    request_id: e.id.clone(),
47                    ts: e.ts,
48                    proxy: e.proxy.clone(),
49                    session_id: e.session_id.clone(),
50                    method: e.mcp_method.clone().unwrap_or_else(|| e.method.clone()),
51                    tool: e.tool.clone(),
52                    resource_uri: e.resource_uri.clone(),
53                    prompt_name: e.prompt_name.clone(),
54                    latency_us: e.latency_us as i64,
55                    status,
56                    error_code: e.error_code.clone(),
57                    error_msg: e.error_msg.clone(),
58                    bytes_in: e.request_size.map(|s| s as i64),
59                    bytes_out: e.response_size.map(|s| s as i64),
60                }));
61            }
62            ProxyEvent::SessionStart(e) => {
63                self.store.record(StoreEvent::Session(StoreSessionEvent {
64                    session_id: e.session_id.clone(),
65                    proxy: e.proxy.clone(),
66                    started_at: e.ts,
67                    client_name: e.client_name.clone(),
68                    client_version: e.client_version.clone(),
69                    client_platform: e.client_platform.clone(),
70                }));
71            }
72            ProxyEvent::SessionEnd(e) => {
73                self.store.record(StoreEvent::SessionClosed {
74                    session_id: e.session_id.clone(),
75                    ended_at: e.ts,
76                });
77            }
78            ProxyEvent::Heartbeat(_) => {
79                // Heartbeats are not stored locally.
80            }
81            ProxyEvent::SchemaVersionCreated(e) => {
82                self.store.record(map_schema_version(e));
83            }
84        }
85    }
86
87    fn name(&self) -> &'static str {
88        "sqlite"
89    }
90}
91
92fn map_schema_version(e: &SchemaVersionCreatedEvent) -> StoreEvent {
93    StoreEvent::SchemaVersion(SchemaVersionEvent {
94        ts: e.ts,
95        proxy: e.upstream_id.clone(),
96        upstream_url: e.upstream_url.clone(),
97        method: e.method.clone(),
98        payload: e.payload.to_string(),
99        content_hash: e.content_hash.clone(),
100    })
101}
102
103#[cfg(test)]
104#[allow(non_snake_case)]
105mod tests {
106    use super::*;
107    use serde_json::json;
108
109    #[test]
110    fn map_schema_version__copies_fields_correctly() {
111        let event = SchemaVersionCreatedEvent {
112            ts: 1_700_000_000_000,
113            upstream_id: "api".into(),
114            upstream_url: "http://localhost:9000".into(),
115            method: "tools/list".into(),
116            version: 3,
117            version_id: "abc123def4567890".into(),
118            content_hash: "abc123def4567890cafebabe".into(),
119            payload: json!({"tools": [{"name": "search"}]}),
120        };
121
122        let StoreEvent::SchemaVersion(sv) = map_schema_version(&event) else {
123            panic!("expected StoreEvent::SchemaVersion");
124        };
125        assert_eq!(sv.ts, 1_700_000_000_000);
126        assert_eq!(sv.proxy, "api");
127        assert_eq!(sv.upstream_url, "http://localhost:9000");
128        assert_eq!(sv.method, "tools/list");
129        assert_eq!(sv.content_hash, "abc123def4567890cafebabe");
130        assert!(sv.payload.contains("search"));
131    }
132
133    #[test]
134    fn map_schema_version__upstream_id_maps_to_proxy_column() {
135        let event = SchemaVersionCreatedEvent {
136            ts: 0,
137            upstream_id: "proxy-alpha".into(),
138            upstream_url: "".into(),
139            method: "initialize".into(),
140            version: 1,
141            version_id: "0".into(),
142            content_hash: "0".into(),
143            payload: json!({}),
144        };
145        let StoreEvent::SchemaVersion(sv) = map_schema_version(&event) else {
146            panic!();
147        };
148        assert_eq!(sv.proxy, "proxy-alpha");
149    }
150}