mcpr_integrations/sinks/
sqlite_sink.rs1use mcpr_core::event::{EventSink, ProxyEvent, SchemaVersionCreatedEvent};
10
11use crate::store::engine::Store;
12use crate::store::event::{
13 RequestEvent as StoreRequestEvent, RequestStatus, SchemaVersionEvent,
14 SessionEvent as StoreSessionEvent, StoreEvent,
15};
16
17pub struct SqliteSink {
21 store: Store,
22}
23
24impl SqliteSink {
25 pub fn new(store: Store) -> Self {
26 Self { store }
27 }
28
29 pub fn shutdown(&mut self) {
31 self.store.shutdown();
32 }
33}
34
35impl EventSink for SqliteSink {
36 fn on_event(&self, event: &ProxyEvent) {
37 match event {
38 ProxyEvent::Request(e) => {
39 let status = if e.error_code.is_some() || e.status >= 500 {
40 RequestStatus::Error
41 } else {
42 RequestStatus::Ok
43 };
44
45 self.store.record(StoreEvent::Request(StoreRequestEvent {
46 request_id: e.id.clone(),
47 ts: e.ts,
48 proxy: e.proxy.clone(),
49 session_id: e.session_id.clone(),
50 method: e.mcp_method.clone().unwrap_or_else(|| e.method.clone()),
51 tool: e.tool.clone(),
52 resource_uri: e.resource_uri.clone(),
53 prompt_name: e.prompt_name.clone(),
54 latency_us: e.latency_us as i64,
55 status,
56 error_code: e.error_code.clone(),
57 error_msg: e.error_msg.clone(),
58 bytes_in: e.request_size.map(|s| s as i64),
59 bytes_out: e.response_size.map(|s| s as i64),
60 }));
61 }
62 ProxyEvent::SessionStart(e) => {
63 self.store.record(StoreEvent::Session(StoreSessionEvent {
64 session_id: e.session_id.clone(),
65 proxy: e.proxy.clone(),
66 started_at: e.ts,
67 client_name: e.client_name.clone(),
68 client_version: e.client_version.clone(),
69 client_platform: e.client_platform.clone(),
70 }));
71 }
72 ProxyEvent::SessionEnd(e) => {
73 self.store.record(StoreEvent::SessionClosed {
74 session_id: e.session_id.clone(),
75 ended_at: e.ts,
76 });
77 }
78 ProxyEvent::Heartbeat(_) => {
79 }
81 ProxyEvent::SchemaVersionCreated(e) => {
82 self.store.record(map_schema_version(e));
83 }
84 }
85 }
86
87 fn name(&self) -> &'static str {
88 "sqlite"
89 }
90}
91
92fn map_schema_version(e: &SchemaVersionCreatedEvent) -> StoreEvent {
93 StoreEvent::SchemaVersion(SchemaVersionEvent {
94 ts: e.ts,
95 proxy: e.upstream_id.clone(),
96 upstream_url: e.upstream_url.clone(),
97 method: e.method.clone(),
98 payload: e.payload.to_string(),
99 content_hash: e.content_hash.clone(),
100 })
101}
102
103#[cfg(test)]
104#[allow(non_snake_case)]
105mod tests {
106 use super::*;
107 use serde_json::json;
108
109 #[test]
110 fn map_schema_version__copies_fields_correctly() {
111 let event = SchemaVersionCreatedEvent {
112 ts: 1_700_000_000_000,
113 upstream_id: "api".into(),
114 upstream_url: "http://localhost:9000".into(),
115 method: "tools/list".into(),
116 version: 3,
117 version_id: "abc123def4567890".into(),
118 content_hash: "abc123def4567890cafebabe".into(),
119 payload: json!({"tools": [{"name": "search"}]}),
120 };
121
122 let StoreEvent::SchemaVersion(sv) = map_schema_version(&event) else {
123 panic!("expected StoreEvent::SchemaVersion");
124 };
125 assert_eq!(sv.ts, 1_700_000_000_000);
126 assert_eq!(sv.proxy, "api");
127 assert_eq!(sv.upstream_url, "http://localhost:9000");
128 assert_eq!(sv.method, "tools/list");
129 assert_eq!(sv.content_hash, "abc123def4567890cafebabe");
130 assert!(sv.payload.contains("search"));
131 }
132
133 #[test]
134 fn map_schema_version__upstream_id_maps_to_proxy_column() {
135 let event = SchemaVersionCreatedEvent {
136 ts: 0,
137 upstream_id: "proxy-alpha".into(),
138 upstream_url: "".into(),
139 method: "initialize".into(),
140 version: 1,
141 version_id: "0".into(),
142 content_hash: "0".into(),
143 payload: json!({}),
144 };
145 let StoreEvent::SchemaVersion(sv) = map_schema_version(&event) else {
146 panic!();
147 };
148 assert_eq!(sv.proxy, "proxy-alpha");
149 }
150}