1use parking_lot::Mutex;
7use rusqlite::{Connection, OpenFlags};
8use std::path::Path;
9use std::sync::Arc;
10
11use super::migrations::run_migrations;
12use crate::error::Result;
13use crate::types::{CompactOp, CompactReport, StorageConfig, StorageMode};
14
15pub struct Storage {
17 config: StorageConfig,
18 conn: Arc<Mutex<Connection>>,
19}
20
21pub struct StoragePool {
23 config: StorageConfig,
24 pool: Vec<Arc<Mutex<Connection>>>,
25 next: std::sync::atomic::AtomicUsize,
26}
27
28impl Storage {
29 pub fn open(config: StorageConfig) -> Result<Self> {
31 let conn = Self::create_connection(&config)?;
32
33 run_migrations(&conn)?;
35
36 Ok(Self {
37 config,
38 conn: Arc::new(Mutex::new(conn)),
39 })
40 }
41
42 pub fn open_in_memory() -> Result<Self> {
44 let config = StorageConfig {
45 db_path: ":memory:".to_string(),
46 storage_mode: StorageMode::Local,
47 cloud_uri: None,
48 encrypt_cloud: false,
49 confidence_half_life_days: 30.0,
50 auto_sync: false,
51 sync_debounce_ms: 5000,
52 };
53 Self::open(config)
54 }
55
56 fn create_connection(config: &StorageConfig) -> Result<Connection> {
58 let flags = OpenFlags::SQLITE_OPEN_READ_WRITE
59 | OpenFlags::SQLITE_OPEN_CREATE
60 | OpenFlags::SQLITE_OPEN_NO_MUTEX;
61
62 let conn = if config.db_path == ":memory:" {
63 Connection::open_in_memory()?
64 } else {
65 if let Some(parent) = Path::new(&config.db_path).parent() {
67 std::fs::create_dir_all(parent)?;
68 }
69 Connection::open_with_flags(&config.db_path, flags)?
70 };
71
72 Self::configure_pragmas(&conn, config.storage_mode)?;
74
75 Ok(conn)
76 }
77
78 fn configure_pragmas(conn: &Connection, mode: StorageMode) -> Result<()> {
83 match mode {
84 StorageMode::Local => {
85 conn.execute_batch(
87 r#"
88 PRAGMA journal_mode=WAL;
89 PRAGMA synchronous=NORMAL;
90 PRAGMA wal_autocheckpoint=1000;
91 PRAGMA busy_timeout=30000;
92 PRAGMA cache_size=-64000;
93 PRAGMA temp_store=MEMORY;
94 PRAGMA mmap_size=268435456;
95 PRAGMA foreign_keys=ON;
96 "#,
97 )?;
98 }
99 StorageMode::CloudSafe => {
100 conn.execute_batch(
102 r#"
103 PRAGMA journal_mode=DELETE;
104 PRAGMA synchronous=FULL;
105 PRAGMA busy_timeout=30000;
106 PRAGMA cache_size=-32000;
107 PRAGMA temp_store=MEMORY;
108 PRAGMA foreign_keys=ON;
109 "#,
110 )?;
111 }
112 }
113 Ok(())
114 }
115
116 pub fn connection(&self) -> parking_lot::MutexGuard<'_, Connection> {
118 self.conn.lock()
119 }
120
121 pub fn with_connection<F, T>(&self, f: F) -> Result<T>
123 where
124 F: FnOnce(&Connection) -> Result<T>,
125 {
126 let conn = self.conn.lock();
127 f(&conn)
128 }
129
130 pub fn with_transaction<F, T>(&self, f: F) -> Result<T>
132 where
133 F: FnOnce(&Connection) -> Result<T>,
134 {
135 let mut conn = self.conn.lock();
136 let tx = conn.transaction()?;
137 let result = f(&tx)?;
138 tx.commit()?;
139 Ok(result)
140 }
141
142 pub fn storage_mode(&self) -> StorageMode {
144 self.config.storage_mode
145 }
146
147 pub fn db_path(&self) -> &str {
149 &self.config.db_path
150 }
151
152 pub fn is_in_cloud_folder(&self) -> bool {
154 let path = self.config.db_path.to_lowercase();
155 path.contains("dropbox")
156 || path.contains("onedrive")
157 || path.contains("icloud")
158 || path.contains("google drive")
159 }
160
161 pub fn storage_mode_warning(&self) -> Option<String> {
163 if self.is_in_cloud_folder() && self.config.storage_mode == StorageMode::Local {
164 Some(format!(
165 "WARNING: Database '{}' appears to be in a cloud-synced folder. \
166 WAL mode may cause corruption. Consider:\n\
167 1. Set ENGRAM_STORAGE_MODE=cloud-safe\n\
168 2. Move database to a local folder with backup sync",
169 self.config.db_path
170 ))
171 } else {
172 None
173 }
174 }
175
176 pub fn checkpoint(&self) -> Result<()> {
178 if self.config.storage_mode == StorageMode::Local {
179 let conn = self.conn.lock();
180 conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")?;
181 }
182 Ok(())
183 }
184
185 pub fn db_size(&self) -> Result<i64> {
187 let conn = self.conn.lock();
188 let size: i64 = conn.query_row(
189 "SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()",
190 [],
191 |row| row.get(0),
192 )?;
193 Ok(size)
194 }
195
196 pub fn vacuum(&self) -> Result<()> {
198 let conn = self.conn.lock();
199 conn.execute_batch("VACUUM;")?;
200 Ok(())
201 }
202
203 pub fn compact(&self, apply: bool) -> Result<CompactReport> {
210 #[cfg(unix)]
211 #[allow(clippy::unnecessary_cast)]
212 fn available_disk_bytes(path: &str) -> Option<i64> {
213 use std::ffi::CString;
214 use std::os::unix::ffi::OsStrExt;
215 let p = Path::new(path);
217 let target = match p.parent() {
218 Some(parent) if !parent.as_os_str().is_empty() => parent,
219 _ => p,
220 };
221 let cpath = CString::new(target.as_os_str().as_bytes()).ok()?;
222 unsafe {
225 let mut stat: libc::statvfs = std::mem::zeroed();
226 if libc::statvfs(cpath.as_ptr(), &mut stat) == 0 {
227 let avail = (stat.f_bavail as u64).saturating_mul(stat.f_frsize as u64);
228 Some(avail.min(i64::MAX as u64) as i64)
229 } else {
230 None
231 }
232 }
233 }
234 #[cfg(not(unix))]
235 fn available_disk_bytes(_path: &str) -> Option<i64> {
236 None
237 }
238
239 let conn = self.conn.lock();
240
241 let page_size: i64 = conn
242 .query_row("PRAGMA page_size", [], |r| r.get(0))
243 .unwrap_or(0);
244 let page_count: i64 = conn
245 .query_row("PRAGMA page_count", [], |r| r.get(0))
246 .unwrap_or(0);
247 let freelist_count: i64 = conn
248 .query_row("PRAGMA freelist_count", [], |r| r.get(0))
249 .unwrap_or(0);
250 let db_size_bytes = page_size * page_count;
251 let reclaimable_bytes = page_size * freelist_count;
252
253 let queue_complete_prunable: i64 = conn
254 .query_row(
255 "SELECT COUNT(*) FROM embedding_queue WHERE status = 'complete'",
256 [],
257 |r| r.get(0),
258 )
259 .unwrap_or(0);
260 let queue_failed_prunable: i64 = conn
261 .query_row(
262 "SELECT COUNT(*) FROM embedding_queue WHERE status = 'failed'",
263 [],
264 |r| r.get(0),
265 )
266 .unwrap_or(0);
267 let orphan_embeddings: i64 = conn
268 .query_row(
269 "SELECT COUNT(*) FROM embeddings WHERE memory_id NOT IN (SELECT id FROM memories)",
270 [],
271 |r| r.get(0),
272 )
273 .unwrap_or(0);
274
275 let sidecar = |suffix: &str| -> i64 {
276 if self.config.db_path == ":memory:" {
277 return 0;
278 }
279 std::fs::metadata(format!("{}{}", self.config.db_path, suffix))
280 .map(|m| m.len() as i64)
281 .unwrap_or(0)
282 };
283 let wal_bytes = sidecar("-wal");
284 let shm_bytes = sidecar("-shm");
285
286 let free_space = available_disk_bytes(&self.config.db_path);
287 let free_space_bytes = free_space.unwrap_or(-1);
288 let vacuum_safe = matches!(free_space, Some(free) if free >= db_size_bytes);
290
291 let mut operations = Vec::new();
292
293 let mut prune_complete = CompactOp {
294 name: "prune_complete_queue".to_string(),
295 candidates: queue_complete_prunable,
296 applied: false,
297 skipped_reason: None,
298 };
299 if apply {
300 conn.execute("DELETE FROM embedding_queue WHERE status = 'complete'", [])?;
303 prune_complete.applied = true;
304 }
305 operations.push(prune_complete);
306
307 let mut prune_failed = CompactOp {
308 name: "prune_failed_queue".to_string(),
309 candidates: queue_failed_prunable,
310 applied: false,
311 skipped_reason: None,
312 };
313 if apply {
314 conn.execute("DELETE FROM embedding_queue WHERE status = 'failed'", [])?;
315 prune_failed.applied = true;
316 }
317 operations.push(prune_failed);
318
319 let mut checkpoint = CompactOp {
320 name: "checkpoint_wal".to_string(),
321 candidates: wal_bytes,
322 applied: false,
323 skipped_reason: None,
324 };
325 if apply {
326 if self.config.storage_mode == StorageMode::Local {
327 conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")?;
328 checkpoint.applied = true;
329 } else {
330 checkpoint.skipped_reason = Some("not in local/WAL mode".to_string());
331 }
332 }
333 operations.push(checkpoint);
334
335 let mut vacuum = CompactOp {
336 name: "vacuum".to_string(),
337 candidates: reclaimable_bytes,
338 applied: false,
339 skipped_reason: None,
340 };
341 if apply {
342 if vacuum_safe {
343 conn.execute_batch("VACUUM;")?;
344 vacuum.applied = true;
345 } else {
346 vacuum.skipped_reason = Some(match free_space {
347 Some(free) => {
348 format!(
349 "insufficient free space: {free} available, need >= {db_size_bytes}"
350 )
351 }
352 None => "free space could not be determined".to_string(),
353 });
354 }
355 }
356 operations.push(vacuum);
357
358 Ok(CompactReport {
359 applied: apply,
360 db_size_bytes,
361 wal_bytes,
362 shm_bytes,
363 freelist_count,
364 reclaimable_bytes,
365 queue_complete_prunable,
366 queue_failed_prunable,
367 orphan_embeddings,
368 free_space_bytes,
369 vacuum_safe,
370 operations,
371 })
372 }
373
374 pub fn config(&self) -> &StorageConfig {
376 &self.config
377 }
378}
379
380impl StoragePool {
381 pub fn new(config: StorageConfig, pool_size: usize) -> Result<Self> {
383 let mut pool = Vec::with_capacity(pool_size);
384
385 for _ in 0..pool_size {
386 let conn = Storage::create_connection(&config)?;
387 pool.push(Arc::new(Mutex::new(conn)));
388 }
389
390 if let Some(first) = pool.first() {
392 let conn = first.lock();
393 run_migrations(&conn)?;
394 }
395
396 Ok(Self {
397 config,
398 pool,
399 next: std::sync::atomic::AtomicUsize::new(0),
400 })
401 }
402
403 pub fn get(&self) -> Arc<Mutex<Connection>> {
405 let idx = self.next.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % self.pool.len();
406 self.pool[idx].clone()
407 }
408
409 pub fn with_connection<F, T>(&self, f: F) -> Result<T>
411 where
412 F: FnOnce(&Connection) -> Result<T>,
413 {
414 let conn_arc = self.get();
415 let conn = conn_arc.lock();
416 f(&conn)
417 }
418
419 pub fn config(&self) -> &StorageConfig {
421 &self.config
422 }
423}
424
425impl Clone for Storage {
426 fn clone(&self) -> Self {
427 Self {
428 config: self.config.clone(),
429 conn: self.conn.clone(),
430 }
431 }
432}
433
434#[cfg(test)]
435mod tests {
436 use super::*;
437
438 #[test]
439 fn test_open_in_memory() {
440 let storage = Storage::open_in_memory().unwrap();
441 assert_eq!(storage.db_path(), ":memory:");
442 }
443
444 #[test]
445 fn test_storage_modes() {
446 let config = StorageConfig {
448 db_path: ":memory:".to_string(),
449 storage_mode: StorageMode::Local,
450 cloud_uri: None,
451 encrypt_cloud: false,
452 confidence_half_life_days: 30.0,
453 auto_sync: false,
454 sync_debounce_ms: 5000,
455 };
456 let storage = Storage::open(config).unwrap();
457 assert_eq!(storage.storage_mode(), StorageMode::Local);
458
459 let config = StorageConfig {
461 db_path: ":memory:".to_string(),
462 storage_mode: StorageMode::CloudSafe,
463 cloud_uri: None,
464 encrypt_cloud: false,
465 confidence_half_life_days: 30.0,
466 auto_sync: false,
467 sync_debounce_ms: 5000,
468 };
469 let storage = Storage::open(config).unwrap();
470 assert_eq!(storage.storage_mode(), StorageMode::CloudSafe);
471 }
472
473 #[test]
474 fn test_cloud_folder_detection() {
475 let config = StorageConfig {
476 db_path: "/Users/test/Dropbox/memories.db".to_string(),
477 storage_mode: StorageMode::Local,
478 cloud_uri: None,
479 encrypt_cloud: false,
480 confidence_half_life_days: 30.0,
481 auto_sync: false,
482 sync_debounce_ms: 5000,
483 };
484 let path = config.db_path.to_lowercase();
486 assert!(path.contains("dropbox"));
487 }
488}