Skip to main content

engram/storage/
connection.rs

1//! Database connection management with WAL mode support (RML-874)
2//!
3//! Implements SQLite connection pooling with configurable storage modes
4//! for both local (WAL) and cloud-safe (DELETE journal) operation.
5
6use parking_lot::Mutex;
7use rusqlite::{Connection, OpenFlags};
8use std::path::Path;
9use std::sync::Arc;
10
11use super::migrations::run_migrations;
12use crate::error::Result;
13use crate::types::{CompactOp, CompactReport, StorageConfig, StorageMode};
14
15/// Storage engine wrapping SQLite with connection pooling
16pub struct Storage {
17    config: StorageConfig,
18    conn: Arc<Mutex<Connection>>,
19}
20
21/// Connection pool for concurrent access
22pub struct StoragePool {
23    config: StorageConfig,
24    pool: Vec<Arc<Mutex<Connection>>>,
25    next: std::sync::atomic::AtomicUsize,
26}
27
28impl Storage {
29    /// Open or create a database with the given configuration
30    pub fn open(config: StorageConfig) -> Result<Self> {
31        let conn = Self::create_connection(&config)?;
32
33        // Run migrations
34        run_migrations(&conn)?;
35
36        Ok(Self {
37            config,
38            conn: Arc::new(Mutex::new(conn)),
39        })
40    }
41
42    /// Open with default configuration (in-memory for testing)
43    pub fn open_in_memory() -> Result<Self> {
44        let config = StorageConfig {
45            db_path: ":memory:".to_string(),
46            storage_mode: StorageMode::Local,
47            cloud_uri: None,
48            encrypt_cloud: false,
49            confidence_half_life_days: 30.0,
50            auto_sync: false,
51            sync_debounce_ms: 5000,
52        };
53        Self::open(config)
54    }
55
56    /// Create a new connection with appropriate pragmas
57    fn create_connection(config: &StorageConfig) -> Result<Connection> {
58        let flags = OpenFlags::SQLITE_OPEN_READ_WRITE
59            | OpenFlags::SQLITE_OPEN_CREATE
60            | OpenFlags::SQLITE_OPEN_NO_MUTEX;
61
62        let conn = if config.db_path == ":memory:" {
63            Connection::open_in_memory()?
64        } else {
65            // Ensure parent directory exists
66            if let Some(parent) = Path::new(&config.db_path).parent() {
67                std::fs::create_dir_all(parent)?;
68            }
69            Connection::open_with_flags(&config.db_path, flags)?
70        };
71
72        // Configure based on storage mode (RML-874, RML-900)
73        Self::configure_pragmas(&conn, config.storage_mode)?;
74
75        Ok(conn)
76    }
77
78    /// Configure SQLite pragmas based on storage mode
79    ///
80    /// Local mode (RML-874): WAL for performance and crash recovery
81    /// Cloud-safe mode (RML-900): DELETE journal for cloud sync compatibility
82    fn configure_pragmas(conn: &Connection, mode: StorageMode) -> Result<()> {
83        match mode {
84            StorageMode::Local => {
85                // WAL mode for better concurrency and crash recovery
86                conn.execute_batch(
87                    r#"
88                    PRAGMA journal_mode=WAL;
89                    PRAGMA synchronous=NORMAL;
90                    PRAGMA wal_autocheckpoint=1000;
91                    PRAGMA busy_timeout=30000;
92                    PRAGMA cache_size=-64000;
93                    PRAGMA temp_store=MEMORY;
94                    PRAGMA mmap_size=268435456;
95                    PRAGMA foreign_keys=ON;
96                    "#,
97                )?;
98            }
99            StorageMode::CloudSafe => {
100                // Single-file mode for cloud sync (Dropbox, OneDrive, iCloud)
101                conn.execute_batch(
102                    r#"
103                    PRAGMA journal_mode=DELETE;
104                    PRAGMA synchronous=FULL;
105                    PRAGMA busy_timeout=30000;
106                    PRAGMA cache_size=-32000;
107                    PRAGMA temp_store=MEMORY;
108                    PRAGMA foreign_keys=ON;
109                    "#,
110                )?;
111            }
112        }
113        Ok(())
114    }
115
116    /// Get a reference to the connection (for single-threaded use)
117    pub fn connection(&self) -> parking_lot::MutexGuard<'_, Connection> {
118        self.conn.lock()
119    }
120
121    /// Execute a function with the connection
122    pub fn with_connection<F, T>(&self, f: F) -> Result<T>
123    where
124        F: FnOnce(&Connection) -> Result<T>,
125    {
126        let conn = self.conn.lock();
127        f(&conn)
128    }
129
130    /// Execute a function with a transaction
131    pub fn with_transaction<F, T>(&self, f: F) -> Result<T>
132    where
133        F: FnOnce(&Connection) -> Result<T>,
134    {
135        let mut conn = self.conn.lock();
136        let tx = conn.transaction()?;
137        let result = f(&tx)?;
138        tx.commit()?;
139        Ok(result)
140    }
141
142    /// Get current storage mode
143    pub fn storage_mode(&self) -> StorageMode {
144        self.config.storage_mode
145    }
146
147    /// Get database path
148    pub fn db_path(&self) -> &str {
149        &self.config.db_path
150    }
151
152    /// Check if database is in a cloud-synced folder
153    pub fn is_in_cloud_folder(&self) -> bool {
154        let path = self.config.db_path.to_lowercase();
155        path.contains("dropbox")
156            || path.contains("onedrive")
157            || path.contains("icloud")
158            || path.contains("google drive")
159    }
160
161    /// Get warning if storage mode doesn't match folder type
162    pub fn storage_mode_warning(&self) -> Option<String> {
163        if self.is_in_cloud_folder() && self.config.storage_mode == StorageMode::Local {
164            Some(format!(
165                "WARNING: Database '{}' appears to be in a cloud-synced folder. \
166                WAL mode may cause corruption. Consider:\n\
167                1. Set ENGRAM_STORAGE_MODE=cloud-safe\n\
168                2. Move database to a local folder with backup sync",
169                self.config.db_path
170            ))
171        } else {
172            None
173        }
174    }
175
176    /// Checkpoint WAL file (for local mode)
177    pub fn checkpoint(&self) -> Result<()> {
178        if self.config.storage_mode == StorageMode::Local {
179            let conn = self.conn.lock();
180            conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")?;
181        }
182        Ok(())
183    }
184
185    /// Get database size in bytes
186    pub fn db_size(&self) -> Result<i64> {
187        let conn = self.conn.lock();
188        let size: i64 = conn.query_row(
189            "SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()",
190            [],
191            |row| row.get(0),
192        )?;
193        Ok(size)
194    }
195
196    /// Vacuum the database to reclaim space
197    pub fn vacuum(&self) -> Result<()> {
198        let conn = self.conn.lock();
199        conn.execute_batch("VACUUM;")?;
200        Ok(())
201    }
202
203    /// Build a storage compaction report (issue #22).
204    ///
205    /// With `apply = false` (dry-run) nothing is mutated. With `apply = true`,
206    /// completed/failed embedding-queue rows are pruned, the WAL is
207    /// checkpointed, and the database is VACUUMed — but VACUUM runs only when
208    /// there is enough free disk space for the rewrite.
209    pub fn compact(&self, apply: bool) -> Result<CompactReport> {
210        #[cfg(unix)]
211        #[allow(clippy::unnecessary_cast)]
212        fn available_disk_bytes(path: &str) -> Option<i64> {
213            use std::ffi::CString;
214            use std::os::unix::ffi::OsStrExt;
215            // Stat the parent dir so this works even before the file exists.
216            let p = Path::new(path);
217            let target = match p.parent() {
218                Some(parent) if !parent.as_os_str().is_empty() => parent,
219                _ => p,
220            };
221            let cpath = CString::new(target.as_os_str().as_bytes()).ok()?;
222            // SAFETY: `cpath` is a valid NUL-terminated path; `stat` is zeroed
223            // before being filled by statvfs(3).
224            unsafe {
225                let mut stat: libc::statvfs = std::mem::zeroed();
226                if libc::statvfs(cpath.as_ptr(), &mut stat) == 0 {
227                    let avail = (stat.f_bavail as u64).saturating_mul(stat.f_frsize as u64);
228                    Some(avail.min(i64::MAX as u64) as i64)
229                } else {
230                    None
231                }
232            }
233        }
234        #[cfg(not(unix))]
235        fn available_disk_bytes(_path: &str) -> Option<i64> {
236            None
237        }
238
239        let conn = self.conn.lock();
240
241        let page_size: i64 = conn
242            .query_row("PRAGMA page_size", [], |r| r.get(0))
243            .unwrap_or(0);
244        let page_count: i64 = conn
245            .query_row("PRAGMA page_count", [], |r| r.get(0))
246            .unwrap_or(0);
247        let freelist_count: i64 = conn
248            .query_row("PRAGMA freelist_count", [], |r| r.get(0))
249            .unwrap_or(0);
250        let db_size_bytes = page_size * page_count;
251        let reclaimable_bytes = page_size * freelist_count;
252
253        let queue_complete_prunable: i64 = conn
254            .query_row(
255                "SELECT COUNT(*) FROM embedding_queue WHERE status = 'complete'",
256                [],
257                |r| r.get(0),
258            )
259            .unwrap_or(0);
260        let queue_failed_prunable: i64 = conn
261            .query_row(
262                "SELECT COUNT(*) FROM embedding_queue WHERE status = 'failed'",
263                [],
264                |r| r.get(0),
265            )
266            .unwrap_or(0);
267        let orphan_embeddings: i64 = conn
268            .query_row(
269                "SELECT COUNT(*) FROM embeddings WHERE memory_id NOT IN (SELECT id FROM memories)",
270                [],
271                |r| r.get(0),
272            )
273            .unwrap_or(0);
274
275        let sidecar = |suffix: &str| -> i64 {
276            if self.config.db_path == ":memory:" {
277                return 0;
278            }
279            std::fs::metadata(format!("{}{}", self.config.db_path, suffix))
280                .map(|m| m.len() as i64)
281                .unwrap_or(0)
282        };
283        let wal_bytes = sidecar("-wal");
284        let shm_bytes = sidecar("-shm");
285
286        let free_space = available_disk_bytes(&self.config.db_path);
287        let free_space_bytes = free_space.unwrap_or(-1);
288        // VACUUM rewrites the DB into a temp file, needing ~db_size extra bytes.
289        let vacuum_safe = matches!(free_space, Some(free) if free >= db_size_bytes);
290
291        let mut operations = Vec::new();
292
293        let mut prune_complete = CompactOp {
294            name: "prune_complete_queue".to_string(),
295            candidates: queue_complete_prunable,
296            applied: false,
297            skipped_reason: None,
298        };
299        if apply {
300            // DELETE of zero rows is a harmless no-op; running it unconditionally
301            // in apply mode lets the operation report as applied rather than dry-run.
302            conn.execute("DELETE FROM embedding_queue WHERE status = 'complete'", [])?;
303            prune_complete.applied = true;
304        }
305        operations.push(prune_complete);
306
307        let mut prune_failed = CompactOp {
308            name: "prune_failed_queue".to_string(),
309            candidates: queue_failed_prunable,
310            applied: false,
311            skipped_reason: None,
312        };
313        if apply {
314            conn.execute("DELETE FROM embedding_queue WHERE status = 'failed'", [])?;
315            prune_failed.applied = true;
316        }
317        operations.push(prune_failed);
318
319        let mut checkpoint = CompactOp {
320            name: "checkpoint_wal".to_string(),
321            candidates: wal_bytes,
322            applied: false,
323            skipped_reason: None,
324        };
325        if apply {
326            if self.config.storage_mode == StorageMode::Local {
327                conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")?;
328                checkpoint.applied = true;
329            } else {
330                checkpoint.skipped_reason = Some("not in local/WAL mode".to_string());
331            }
332        }
333        operations.push(checkpoint);
334
335        let mut vacuum = CompactOp {
336            name: "vacuum".to_string(),
337            candidates: reclaimable_bytes,
338            applied: false,
339            skipped_reason: None,
340        };
341        if apply {
342            if vacuum_safe {
343                conn.execute_batch("VACUUM;")?;
344                vacuum.applied = true;
345            } else {
346                vacuum.skipped_reason = Some(match free_space {
347                    Some(free) => {
348                        format!(
349                            "insufficient free space: {free} available, need >= {db_size_bytes}"
350                        )
351                    }
352                    None => "free space could not be determined".to_string(),
353                });
354            }
355        }
356        operations.push(vacuum);
357
358        Ok(CompactReport {
359            applied: apply,
360            db_size_bytes,
361            wal_bytes,
362            shm_bytes,
363            freelist_count,
364            reclaimable_bytes,
365            queue_complete_prunable,
366            queue_failed_prunable,
367            orphan_embeddings,
368            free_space_bytes,
369            vacuum_safe,
370            operations,
371        })
372    }
373
374    /// Get configuration
375    pub fn config(&self) -> &StorageConfig {
376        &self.config
377    }
378}
379
380impl StoragePool {
381    /// Create a connection pool with the specified size
382    pub fn new(config: StorageConfig, pool_size: usize) -> Result<Self> {
383        let mut pool = Vec::with_capacity(pool_size);
384
385        for _ in 0..pool_size {
386            let conn = Storage::create_connection(&config)?;
387            pool.push(Arc::new(Mutex::new(conn)));
388        }
389
390        // Run migrations on first connection
391        if let Some(first) = pool.first() {
392            let conn = first.lock();
393            run_migrations(&conn)?;
394        }
395
396        Ok(Self {
397            config,
398            pool,
399            next: std::sync::atomic::AtomicUsize::new(0),
400        })
401    }
402
403    /// Get a connection from the pool (round-robin)
404    pub fn get(&self) -> Arc<Mutex<Connection>> {
405        let idx = self.next.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % self.pool.len();
406        self.pool[idx].clone()
407    }
408
409    /// Execute a function with a connection from the pool
410    pub fn with_connection<F, T>(&self, f: F) -> Result<T>
411    where
412        F: FnOnce(&Connection) -> Result<T>,
413    {
414        let conn_arc = self.get();
415        let conn = conn_arc.lock();
416        f(&conn)
417    }
418
419    /// Get configuration
420    pub fn config(&self) -> &StorageConfig {
421        &self.config
422    }
423}
424
425impl Clone for Storage {
426    fn clone(&self) -> Self {
427        Self {
428            config: self.config.clone(),
429            conn: self.conn.clone(),
430        }
431    }
432}
433
434#[cfg(test)]
435mod tests {
436    use super::*;
437
438    #[test]
439    fn test_open_in_memory() {
440        let storage = Storage::open_in_memory().unwrap();
441        assert_eq!(storage.db_path(), ":memory:");
442    }
443
444    #[test]
445    fn test_storage_modes() {
446        // Test local mode
447        let config = StorageConfig {
448            db_path: ":memory:".to_string(),
449            storage_mode: StorageMode::Local,
450            cloud_uri: None,
451            encrypt_cloud: false,
452            confidence_half_life_days: 30.0,
453            auto_sync: false,
454            sync_debounce_ms: 5000,
455        };
456        let storage = Storage::open(config).unwrap();
457        assert_eq!(storage.storage_mode(), StorageMode::Local);
458
459        // Test cloud-safe mode
460        let config = StorageConfig {
461            db_path: ":memory:".to_string(),
462            storage_mode: StorageMode::CloudSafe,
463            cloud_uri: None,
464            encrypt_cloud: false,
465            confidence_half_life_days: 30.0,
466            auto_sync: false,
467            sync_debounce_ms: 5000,
468        };
469        let storage = Storage::open(config).unwrap();
470        assert_eq!(storage.storage_mode(), StorageMode::CloudSafe);
471    }
472
473    #[test]
474    fn test_cloud_folder_detection() {
475        let config = StorageConfig {
476            db_path: "/Users/test/Dropbox/memories.db".to_string(),
477            storage_mode: StorageMode::Local,
478            cloud_uri: None,
479            encrypt_cloud: false,
480            confidence_half_life_days: 30.0,
481            auto_sync: false,
482            sync_debounce_ms: 5000,
483        };
484        // Can't actually open this path in tests, but we can test detection
485        let path = config.db_path.to_lowercase();
486        assert!(path.contains("dropbox"));
487    }
488}