Skip to main content

codemem_storage/
lib.rs

1//! codemem-storage: SQLite persistence layer for Codemem.
2//!
3//! Uses rusqlite with bundled SQLite, WAL mode, and embedded schema.
4
5use codemem_core::{CodememError, MemoryNode, MemoryType};
6use rusqlite::Connection;
7use std::collections::HashMap;
8use std::path::Path;
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::Mutex;
11
12mod backend;
13pub mod graph;
14mod graph_persistence;
15mod memory;
16mod migrations;
17mod queries;
18pub mod vector;
19
20pub use graph::GraphEngine;
21pub use graph::RawGraphMetrics;
22pub use vector::HnswIndex;
23
24/// SQLite-backed storage for Codemem memories, embeddings, and graph data.
25///
26/// Wraps `rusqlite::Connection` in a `Mutex` to satisfy `Send + Sync` bounds
27/// required by the `StorageBackend` trait.
28pub struct Storage {
29    conn: Mutex<Connection>,
30    /// Whether an explicit outer transaction is active (via `begin_transaction`).
31    /// When set, individual methods like `insert_memory` skip starting their own
32    /// transaction so that all operations participate in the outer one.
33    in_transaction: AtomicBool,
34}
35
36impl Storage {
37    /// Get a lock on the underlying connection.
38    pub(crate) fn conn(&self) -> Result<std::sync::MutexGuard<'_, Connection>, CodememError> {
39        self.conn
40            .lock()
41            .map_err(|e| CodememError::LockPoisoned(format!("Storage mutex: {e}")))
42    }
43
44    /// Apply standard pragmas to a connection.
45    ///
46    /// `cache_size_mb` and `busy_timeout_secs` override the defaults (64 MB / 5 s)
47    /// when provided — typically sourced from `StorageConfig`.
48    fn apply_pragmas(
49        conn: &Connection,
50        cache_size_mb: Option<u32>,
51        busy_timeout_secs: Option<u64>,
52    ) -> Result<(), CodememError> {
53        // WAL mode for concurrent reads
54        conn.pragma_update(None, "journal_mode", "WAL")
55            .map_err(|e| CodememError::Storage(e.to_string()))?;
56        // Cache size (negative value = KiB in SQLite)
57        let cache_kb = i64::from(cache_size_mb.unwrap_or(64)) * 1000;
58        conn.pragma_update(None, "cache_size", -cache_kb)
59            .map_err(|e| CodememError::Storage(e.to_string()))?;
60        // Foreign keys ON
61        conn.pragma_update(None, "foreign_keys", "ON")
62            .map_err(|e| CodememError::Storage(e.to_string()))?;
63        // NORMAL sync (good balance of safety vs speed)
64        conn.pragma_update(None, "synchronous", "NORMAL")
65            .map_err(|e| CodememError::Storage(e.to_string()))?;
66        // 256MB mmap for faster reads
67        conn.pragma_update(None, "mmap_size", 268435456i64)
68            .map_err(|e| CodememError::Storage(e.to_string()))?;
69        // Temp tables in memory
70        conn.pragma_update(None, "temp_store", "MEMORY")
71            .map_err(|e| CodememError::Storage(e.to_string()))?;
72        // Busy timeout
73        let timeout = busy_timeout_secs.unwrap_or(5);
74        conn.busy_timeout(std::time::Duration::from_secs(timeout))
75            .map_err(|e| CodememError::Storage(e.to_string()))?;
76        Ok(())
77    }
78
79    /// Open (or create) a Codemem database at the given path.
80    pub fn open(path: &Path) -> Result<Self, CodememError> {
81        Self::open_with_config(path, None, None)
82    }
83
84    /// Open a database with explicit storage configuration overrides.
85    pub fn open_with_config(
86        path: &Path,
87        cache_size_mb: Option<u32>,
88        busy_timeout_secs: Option<u64>,
89    ) -> Result<Self, CodememError> {
90        let conn = Connection::open(path).map_err(|e| CodememError::Storage(e.to_string()))?;
91        Self::apply_pragmas(&conn, cache_size_mb, busy_timeout_secs)?;
92        migrations::run_migrations(&conn)?;
93        Ok(Self {
94            conn: Mutex::new(conn),
95            in_transaction: AtomicBool::new(false),
96        })
97    }
98
99    /// Open an existing database without running migrations.
100    ///
101    /// Use this in lifecycle hooks (context, prompt, summarize) where the
102    /// database has already been migrated by `codemem init` or `codemem serve`,
103    /// to avoid SQLITE_BUSY race conditions with the concurrent MCP server.
104    pub fn open_without_migrations(path: &Path) -> Result<Self, CodememError> {
105        Self::open_without_migrations_with_config(path, None, None)
106    }
107
108    /// Open without migrations, with explicit storage configuration overrides.
109    pub fn open_without_migrations_with_config(
110        path: &Path,
111        cache_size_mb: Option<u32>,
112        busy_timeout_secs: Option<u64>,
113    ) -> Result<Self, CodememError> {
114        let conn = Connection::open(path).map_err(|e| CodememError::Storage(e.to_string()))?;
115        Self::apply_pragmas(&conn, cache_size_mb, busy_timeout_secs)?;
116        Ok(Self {
117            conn: Mutex::new(conn),
118            in_transaction: AtomicBool::new(false),
119        })
120    }
121
122    /// Open an in-memory database (for testing).
123    pub fn open_in_memory() -> Result<Self, CodememError> {
124        let conn =
125            Connection::open_in_memory().map_err(|e| CodememError::Storage(e.to_string()))?;
126        Self::apply_pragmas(&conn, None, None)?;
127        migrations::run_migrations(&conn)?;
128        Ok(Self {
129            conn: Mutex::new(conn),
130            in_transaction: AtomicBool::new(false),
131        })
132    }
133
134    /// Compute SHA-256 hash of content for deduplication.
135    pub fn content_hash(content: &str) -> String {
136        codemem_core::content_hash(content)
137    }
138
139    /// Check whether an outer transaction is currently active.
140    pub(crate) fn has_outer_transaction(&self) -> bool {
141        self.in_transaction.load(Ordering::Acquire)
142    }
143}
144
145/// Internal row struct for memory deserialization.
146pub(crate) struct MemoryRow {
147    pub(crate) id: String,
148    pub(crate) content: String,
149    pub(crate) memory_type: String,
150    pub(crate) importance: f64,
151    pub(crate) confidence: f64,
152    pub(crate) access_count: i64,
153    pub(crate) content_hash: String,
154    pub(crate) tags: String,
155    pub(crate) metadata: String,
156    pub(crate) namespace: Option<String>,
157    pub(crate) session_id: Option<String>,
158    pub(crate) created_at: i64,
159    pub(crate) updated_at: i64,
160    pub(crate) last_accessed_at: i64,
161}
162
163impl MemoryRow {
164    pub(crate) fn into_memory_node(self) -> Result<MemoryNode, CodememError> {
165        let memory_type: MemoryType = self.memory_type.parse()?;
166        let tags: Vec<String> = serde_json::from_str(&self.tags).unwrap_or_else(|e| {
167            tracing::warn!(id = %self.id, error = %e, "Malformed tags JSON for memory");
168            Vec::new()
169        });
170        let metadata: HashMap<String, serde_json::Value> = serde_json::from_str(&self.metadata)
171            .unwrap_or_else(|e| {
172                tracing::warn!(id = %self.id, error = %e, "Malformed metadata JSON for memory");
173                HashMap::new()
174            });
175
176        let created_at = chrono::DateTime::from_timestamp(self.created_at, 0)
177            .unwrap_or_else(|| {
178                tracing::warn!(id = %self.id, ts = self.created_at, "Invalid created_at timestamp");
179                chrono::DateTime::<chrono::Utc>::default()
180            })
181            .with_timezone(&chrono::Utc);
182        let updated_at = chrono::DateTime::from_timestamp(self.updated_at, 0)
183            .unwrap_or_else(|| {
184                tracing::warn!(id = %self.id, ts = self.updated_at, "Invalid updated_at timestamp");
185                chrono::DateTime::<chrono::Utc>::default()
186            })
187            .with_timezone(&chrono::Utc);
188        let last_accessed_at = chrono::DateTime::from_timestamp(self.last_accessed_at, 0)
189            .unwrap_or_else(|| {
190                tracing::warn!(id = %self.id, ts = self.last_accessed_at, "Invalid last_accessed_at timestamp");
191                chrono::DateTime::<chrono::Utc>::default()
192            })
193            .with_timezone(&chrono::Utc);
194
195        Ok(MemoryNode {
196            id: self.id,
197            content: self.content,
198            memory_type,
199            importance: self.importance,
200            confidence: self.confidence,
201            access_count: u32::try_from(self.access_count).unwrap_or(u32::MAX),
202            content_hash: self.content_hash,
203            tags,
204            metadata,
205            namespace: self.namespace,
206            session_id: self.session_id,
207            created_at,
208            updated_at,
209            last_accessed_at,
210        })
211    }
212}