Skip to main content

mcpr_integrations/store/
engine.rs

1//! The `Store` — public API for the storage engine.
2//!
3//! `Store` is the single entry point for the rest of mcpr to interact with
4//! persistent storage. It handles:
5//!
6//! - Opening the database and running migrations.
7//! - Spawning the background writer thread.
8//! - Providing a non-blocking `record()` method for the proxy hot path.
9//! - Graceful shutdown with guaranteed flush of pending events.
10//!
11//! # Usage
12//!
13//! ```rust,ignore
14//! let store = Store::open(StoreConfig {
15//!     db_path: PathBuf::from("/tmp/mcpr.db"),
16//!     mcpr_version: "0.3.0".into(),
17//! })?;
18//!
19//! // Hot path — non-blocking, fire-and-forget.
20//! store.record(StoreEvent::Request(event));
21//!
22//! // Shutdown — blocks until writer drains pending events.
23//! store.shutdown();
24//! ```
25
26use std::path::PathBuf;
27use std::thread::JoinHandle;
28
29use tokio::sync::mpsc;
30
31use super::db;
32use super::event::StoreEvent;
33use super::path;
34use super::writer;
35
36/// Channel capacity — how many events can be buffered before the hot path
37/// starts dropping them.
38///
39/// At 1,000 requests/second this is a 10-second buffer. More than enough
40/// to absorb any write latency spike from SQLite.
41const CHANNEL_CAPACITY: usize = 10_000;
42
43/// Configuration for opening the store.
44pub struct StoreConfig {
45    /// Path to the SQLite database file.
46    /// The parent directory is created automatically if it doesn't exist.
47    pub db_path: PathBuf,
48
49    /// The current mcpr binary version (e.g., "0.3.0").
50    /// Written to the `meta` table on every startup for diagnostics.
51    pub mcpr_version: String,
52}
53
54/// Handle to the storage engine.
55///
56/// Cheap to clone (sender + Arc internally). The proxy holds one, and
57/// CLI query commands can open their own read-only connections separately.
58pub struct Store {
59    /// Channel sender for the background writer. Non-blocking `try_send`.
60    tx: mpsc::Sender<StoreEvent>,
61
62    /// Join handle for the writer thread. Used for graceful shutdown.
63    writer_handle: Option<JoinHandle<()>>,
64
65    /// Database path — needed by the query engine to open read-only connections.
66    db_path: PathBuf,
67}
68
69impl Store {
70    /// Open or create the database, run migrations, and spawn the writer thread.
71    ///
72    /// This is called once on proxy startup. It:
73    /// 1. Creates the parent directory if needed.
74    /// 2. Opens a read-write connection and runs schema migrations.
75    /// 3. Spawns the background writer on a dedicated OS thread.
76    /// 4. Returns a `Store` handle for recording events.
77    ///
78    /// # Errors
79    ///
80    /// Returns an error if:
81    /// - The parent directory can't be created (permissions).
82    /// - SQLite can't open the file (disk full, corrupt file).
83    /// - Schema migrations fail (shouldn't happen on fresh DBs).
84    pub fn open(config: StoreConfig) -> Result<Self, StoreError> {
85        // Ensure parent directory exists.
86        path::ensure_parent_dir(&config.db_path)
87            .map_err(|e| StoreError::Io(format!("failed to create db directory: {e}")))?;
88
89        // Open connection and run migrations on the current thread.
90        // This validates that the DB is usable before we hand off to the writer.
91        let conn = db::open_connection(&config.db_path)
92            .map_err(|e| StoreError::Sqlite(format!("failed to open database: {e}")))?;
93
94        db::run_migrations(&conn, &config.mcpr_version)
95            .map_err(|e| StoreError::Sqlite(format!("schema migration failed: {e}")))?;
96
97        // Create the event channel.
98        let (tx, rx) = mpsc::channel::<StoreEvent>(CHANNEL_CAPACITY);
99
100        // Spawn the writer on a dedicated OS thread.
101        // rusqlite::Connection is !Send, so it must stay on one thread.
102        // The connection is moved into the thread — nobody else writes.
103        let writer_handle = std::thread::Builder::new()
104            .name("mcpr-store-writer".into())
105            .spawn(move || {
106                writer::run_writer_loop(conn, rx);
107            })
108            .map_err(|e| StoreError::Io(format!("failed to spawn writer thread: {e}")))?;
109
110        Ok(Store {
111            tx,
112            writer_handle: Some(writer_handle),
113            db_path: config.db_path,
114        })
115    }
116
117    /// Record an event — non-blocking, fire-and-forget.
118    ///
119    /// If the channel is full (back-pressure), the event is silently dropped.
120    /// This is intentional: a busy proxy must never block on storage writes.
121    /// Dropped events are a signal that the writer can't keep up — in practice
122    /// this should never happen at normal MCP request rates.
123    pub fn record(&self, event: StoreEvent) {
124        // try_send returns Err if the channel is full or closed.
125        // We intentionally ignore both — the proxy must not block.
126        let _ = self.tx.try_send(event);
127    }
128
129    /// Get the database path for opening read-only query connections.
130    pub fn db_path(&self) -> &PathBuf {
131        &self.db_path
132    }
133
134    /// Graceful shutdown — close the channel and wait for the writer to flush.
135    ///
136    /// Call this on proxy shutdown (after stopping new requests, before exiting).
137    /// Blocks the current thread until all pending events are written to SQLite.
138    ///
139    /// After this returns, the database file is consistent and safe to read.
140    pub fn shutdown(&mut self) {
141        // Drop the sender to signal the writer that no more events are coming.
142        // The writer will drain any remaining events and exit.
143        //
144        // We replace tx with a closed channel — any subsequent record() calls
145        // will silently fail, which is correct during shutdown.
146        let (dead_tx, _) = mpsc::channel(1);
147        let old_tx = std::mem::replace(&mut self.tx, dead_tx);
148        drop(old_tx);
149
150        // Wait for the writer thread to finish.
151        if let Some(handle) = self.writer_handle.take()
152            && let Err(e) = handle.join()
153        {
154            eprintln!("mcpr-store: writer thread panicked: {e:?}");
155        }
156    }
157}
158
159impl Drop for Store {
160    fn drop(&mut self) {
161        // Best-effort shutdown if not already called.
162        // In normal usage, shutdown() is called explicitly before drop.
163        if self.writer_handle.is_some() {
164            self.shutdown();
165        }
166    }
167}
168
169/// Errors from store operations.
170#[derive(Debug)]
171pub enum StoreError {
172    /// Filesystem error (directory creation, permissions).
173    Io(String),
174    /// SQLite error (open, migration, query).
175    Sqlite(String),
176}
177
178impl std::fmt::Display for StoreError {
179    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
180        match self {
181            StoreError::Io(msg) => write!(f, "store I/O error: {msg}"),
182            StoreError::Sqlite(msg) => write!(f, "store SQLite error: {msg}"),
183        }
184    }
185}
186
187impl std::error::Error for StoreError {}
188
189#[cfg(test)]
190#[allow(non_snake_case)]
191mod tests {
192    use super::*;
193    use crate::store::event::{RequestEvent, RequestStatus, SessionEvent};
194
195    #[test]
196    fn store__open_record_shutdown() {
197        let dir = tempfile::tempdir().unwrap();
198        let db_path = dir.path().join("test.db");
199
200        let mut store = Store::open(StoreConfig {
201            db_path: db_path.clone(),
202            mcpr_version: "test".into(),
203        })
204        .unwrap();
205
206        store.record(StoreEvent::Session(SessionEvent {
207            session_id: "s1".into(),
208            proxy: "test-proxy".into(),
209            started_at: 1000,
210            client_name: Some("test-client".into()),
211            client_version: Some("0.1".into()),
212            client_platform: Some("unknown".into()),
213        }));
214
215        store.record(StoreEvent::Request(RequestEvent {
216            request_id: "r1".into(),
217            ts: 1001,
218            proxy: "test-proxy".into(),
219            session_id: Some("s1".into()),
220            method: "tools/call".into(),
221            tool: Some("test_tool".into()),
222            latency_us: 50_000,
223            status: RequestStatus::Ok,
224            error_code: None,
225            error_msg: None,
226            bytes_in: Some(100),
227            bytes_out: Some(200),
228        }));
229
230        store.shutdown();
231
232        let conn = db::open_connection(&db_path).unwrap();
233
234        let count: i64 = conn
235            .query_row("SELECT COUNT(*) FROM requests", [], |row| row.get(0))
236            .unwrap();
237        assert_eq!(count, 1);
238
239        let count: i64 = conn
240            .query_row("SELECT COUNT(*) FROM sessions", [], |row| row.get(0))
241            .unwrap();
242        assert_eq!(count, 1);
243
244        let tool: String = conn
245            .query_row(
246                "SELECT tool FROM requests WHERE request_id = 'r1'",
247                [],
248                |row| row.get(0),
249            )
250            .unwrap();
251        assert_eq!(tool, "test_tool");
252    }
253}