mcpr_integrations/store/engine.rs
1//! The `Store` — public API for the storage engine.
2//!
3//! `Store` is the single entry point for the rest of mcpr to interact with
4//! persistent storage. It handles:
5//!
6//! - Opening the database and running migrations.
7//! - Spawning the background writer thread.
8//! - Providing a non-blocking `record()` method for the proxy hot path.
9//! - Graceful shutdown with guaranteed flush of pending events.
10//!
11//! # Usage
12//!
13//! ```rust,ignore
14//! let store = Store::open(StoreConfig {
15//! db_path: PathBuf::from("/tmp/mcpr.db"),
16//! mcpr_version: "0.3.0".into(),
17//! })?;
18//!
19//! // Hot path — non-blocking, fire-and-forget.
20//! store.record(StoreEvent::Request(event));
21//!
22//! // Shutdown — blocks until writer drains pending events.
23//! store.shutdown();
24//! ```
25
26use std::path::PathBuf;
27use std::thread::JoinHandle;
28
29use tokio::sync::mpsc;
30
31use super::db;
32use super::event::StoreEvent;
33use super::path;
34use super::writer;
35
36/// Channel capacity — how many events can be buffered before the hot path
37/// starts dropping them.
38///
39/// At 1,000 requests/second this is a 10-second buffer. More than enough
40/// to absorb any write latency spike from SQLite.
41const CHANNEL_CAPACITY: usize = 10_000;
42
43/// Configuration for opening the store.
44pub struct StoreConfig {
45 /// Path to the SQLite database file.
46 /// The parent directory is created automatically if it doesn't exist.
47 pub db_path: PathBuf,
48
49 /// The current mcpr binary version (e.g., "0.3.0").
50 /// Written to the `meta` table on every startup for diagnostics.
51 pub mcpr_version: String,
52}
53
54/// Handle to the storage engine.
55///
56/// Cheap to clone (sender + Arc internally). The proxy holds one, and
57/// CLI query commands can open their own read-only connections separately.
58pub struct Store {
59 /// Channel sender for the background writer. Non-blocking `try_send`.
60 tx: mpsc::Sender<StoreEvent>,
61
62 /// Join handle for the writer thread. Used for graceful shutdown.
63 writer_handle: Option<JoinHandle<()>>,
64
65 /// Database path — needed by the query engine to open read-only connections.
66 db_path: PathBuf,
67}
68
69impl Store {
70 /// Open or create the database, run migrations, and spawn the writer thread.
71 ///
72 /// This is called once on proxy startup. It:
73 /// 1. Creates the parent directory if needed.
74 /// 2. Opens a read-write connection and runs schema migrations.
75 /// 3. Spawns the background writer on a dedicated OS thread.
76 /// 4. Returns a `Store` handle for recording events.
77 ///
78 /// # Errors
79 ///
80 /// Returns an error if:
81 /// - The parent directory can't be created (permissions).
82 /// - SQLite can't open the file (disk full, corrupt file).
83 /// - Schema migrations fail (shouldn't happen on fresh DBs).
84 pub fn open(config: StoreConfig) -> Result<Self, StoreError> {
85 // Ensure parent directory exists.
86 path::ensure_parent_dir(&config.db_path)
87 .map_err(|e| StoreError::Io(format!("failed to create db directory: {e}")))?;
88
89 // Open connection and run migrations on the current thread.
90 // This validates that the DB is usable before we hand off to the writer.
91 let conn = db::open_connection(&config.db_path)
92 .map_err(|e| StoreError::Sqlite(format!("failed to open database: {e}")))?;
93
94 db::run_migrations(&conn, &config.mcpr_version)
95 .map_err(|e| StoreError::Sqlite(format!("schema migration failed: {e}")))?;
96
97 // Create the event channel.
98 let (tx, rx) = mpsc::channel::<StoreEvent>(CHANNEL_CAPACITY);
99
100 // Spawn the writer on a dedicated OS thread.
101 // rusqlite::Connection is !Send, so it must stay on one thread.
102 // The connection is moved into the thread — nobody else writes.
103 let writer_handle = std::thread::Builder::new()
104 .name("mcpr-store-writer".into())
105 .spawn(move || {
106 writer::run_writer_loop(conn, rx);
107 })
108 .map_err(|e| StoreError::Io(format!("failed to spawn writer thread: {e}")))?;
109
110 Ok(Store {
111 tx,
112 writer_handle: Some(writer_handle),
113 db_path: config.db_path,
114 })
115 }
116
117 /// Record an event — non-blocking, fire-and-forget.
118 ///
119 /// If the channel is full (back-pressure), the event is silently dropped.
120 /// This is intentional: a busy proxy must never block on storage writes.
121 /// Dropped events are a signal that the writer can't keep up — in practice
122 /// this should never happen at normal MCP request rates.
123 pub fn record(&self, event: StoreEvent) {
124 // try_send returns Err if the channel is full or closed.
125 // We intentionally ignore both — the proxy must not block.
126 let _ = self.tx.try_send(event);
127 }
128
129 /// Get the database path for opening read-only query connections.
130 pub fn db_path(&self) -> &PathBuf {
131 &self.db_path
132 }
133
134 /// Graceful shutdown — close the channel and wait for the writer to flush.
135 ///
136 /// Call this on proxy shutdown (after stopping new requests, before exiting).
137 /// Blocks the current thread until all pending events are written to SQLite.
138 ///
139 /// After this returns, the database file is consistent and safe to read.
140 pub fn shutdown(&mut self) {
141 // Drop the sender to signal the writer that no more events are coming.
142 // The writer will drain any remaining events and exit.
143 //
144 // We replace tx with a closed channel — any subsequent record() calls
145 // will silently fail, which is correct during shutdown.
146 let (dead_tx, _) = mpsc::channel(1);
147 let old_tx = std::mem::replace(&mut self.tx, dead_tx);
148 drop(old_tx);
149
150 // Wait for the writer thread to finish.
151 if let Some(handle) = self.writer_handle.take()
152 && let Err(e) = handle.join()
153 {
154 eprintln!("mcpr-store: writer thread panicked: {e:?}");
155 }
156 }
157}
158
159impl Drop for Store {
160 fn drop(&mut self) {
161 // Best-effort shutdown if not already called.
162 // In normal usage, shutdown() is called explicitly before drop.
163 if self.writer_handle.is_some() {
164 self.shutdown();
165 }
166 }
167}
168
169/// Errors from store operations.
170#[derive(Debug)]
171pub enum StoreError {
172 /// Filesystem error (directory creation, permissions).
173 Io(String),
174 /// SQLite error (open, migration, query).
175 Sqlite(String),
176}
177
178impl std::fmt::Display for StoreError {
179 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
180 match self {
181 StoreError::Io(msg) => write!(f, "store I/O error: {msg}"),
182 StoreError::Sqlite(msg) => write!(f, "store SQLite error: {msg}"),
183 }
184 }
185}
186
187impl std::error::Error for StoreError {}
188
189#[cfg(test)]
190#[allow(non_snake_case)]
191mod tests {
192 use super::*;
193 use crate::store::event::{RequestEvent, RequestStatus, SessionEvent};
194
195 #[test]
196 fn store__open_record_shutdown() {
197 let dir = tempfile::tempdir().unwrap();
198 let db_path = dir.path().join("test.db");
199
200 let mut store = Store::open(StoreConfig {
201 db_path: db_path.clone(),
202 mcpr_version: "test".into(),
203 })
204 .unwrap();
205
206 store.record(StoreEvent::Session(SessionEvent {
207 session_id: "s1".into(),
208 proxy: "test-proxy".into(),
209 started_at: 1000,
210 client_name: Some("test-client".into()),
211 client_version: Some("0.1".into()),
212 client_platform: Some("unknown".into()),
213 }));
214
215 store.record(StoreEvent::Request(RequestEvent {
216 request_id: "r1".into(),
217 ts: 1001,
218 proxy: "test-proxy".into(),
219 session_id: Some("s1".into()),
220 method: "tools/call".into(),
221 tool: Some("test_tool".into()),
222 resource_uri: None,
223 prompt_name: None,
224 latency_us: 50_000,
225 status: RequestStatus::Ok,
226 error_code: None,
227 error_msg: None,
228 bytes_in: Some(100),
229 bytes_out: Some(200),
230 }));
231
232 store.shutdown();
233
234 let conn = db::open_connection(&db_path).unwrap();
235
236 let count: i64 = conn
237 .query_row("SELECT COUNT(*) FROM requests", [], |row| row.get(0))
238 .unwrap();
239 assert_eq!(count, 1);
240
241 let count: i64 = conn
242 .query_row("SELECT COUNT(*) FROM sessions", [], |row| row.get(0))
243 .unwrap();
244 assert_eq!(count, 1);
245
246 let tool: String = conn
247 .query_row(
248 "SELECT tool FROM requests WHERE request_id = 'r1'",
249 [],
250 |row| row.get(0),
251 )
252 .unwrap();
253 assert_eq!(tool, "test_tool");
254 }
255}