Skip to main content

kernex_memory/store/
mod.rs

1//! SQLite-backed persistent memory store.
2//!
3//! Split into focused submodules:
4//! - `conversations` — conversation lifecycle (create, find, close, summaries)
5//! - `messages` — message storage and full-text search
6//! - `facts` — user facts, aliases, and limitations
7//! - `tasks` — scheduled task CRUD and dedup
8//! - `context` — context building and user profile formatting
9//! - `context_helpers` — onboarding stages, system prompt composition, language detection
10
11mod checkpoints;
12mod context;
13mod context_helpers;
14mod conversations;
15mod facts;
16mod messages;
17mod observations;
18mod outcomes;
19mod sessions;
20mod tasks;
21mod usage;
22
23pub use checkpoints::PhaseCheckpoint;
24pub use context::{detect_language, format_user_profile};
25pub use tasks::DueTask;
26pub use usage::{UsageBreakdown, UsageSummary};
27
28use crate::error::MemoryError;
29use kernex_core::{config::MemoryConfig, shellexpand};
30use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
31use sqlx::SqlitePool;
32use std::str::FromStr;
33use tracing::info;
34
35/// How long (in minutes) before a conversation is considered idle.
36const CONVERSATION_TIMEOUT_MINUTES: i64 = 120;
37
38/// Persistent memory store backed by SQLite.
39#[derive(Clone)]
40pub struct Store {
41    pool: SqlitePool,
42    max_context_messages: usize,
43}
44
45impl Store {
46    /// Create a new store, running migrations on first use.
47    pub async fn new(config: &MemoryConfig) -> Result<Self, MemoryError> {
48        let db_path = shellexpand(&config.db_path);
49
50        // Ensure parent directory exists. On Unix, also restrict its mode to
51        // 0o700 so other local users can't enumerate or read the SQLite WAL
52        // files that may briefly contain message text en route to disk.
53        if let Some(parent) = std::path::Path::new(&db_path).parent() {
54            std::fs::create_dir_all(parent)
55                .map_err(|e| MemoryError::io("failed to create data dir", e))?;
56            tighten_unix_dir_perms(parent);
57        }
58
59        // Pre-create the SQLite file with mode 0o600 *before* sqlx connects.
60        // Otherwise sqlx's create_if_missing path creates the file under the
61        // process umask (typically 0o644) and our chmod runs only after
62        // migrations — leaving a window where another local user can read
63        // messages, facts, and audit log entries on a shared host.
64        precreate_sqlite_file(&db_path)?;
65
66        let opts = SqliteConnectOptions::from_str(&format!("sqlite:{db_path}"))
67            .map_err(|e| MemoryError::sqlite("invalid db path", e))?
68            .create_if_missing(true)
69            .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal);
70
71        let pool = SqlitePoolOptions::new()
72            .max_connections(config.max_connections)
73            .connect_with(opts)
74            .await
75            .map_err(|e| MemoryError::sqlite("failed to connect to sqlite", e))?;
76
77        Self::run_migrations(&pool).await?;
78
79        // Belt-and-braces: re-tighten in case sqlx (or a future version of
80        // it) ever recreates the file under a relaxed mode.
81        tighten_unix_file_perms(&db_path);
82
83        info!("Memory store initialized at {db_path}");
84
85        Ok(Self {
86            pool,
87            max_context_messages: config.max_context_messages,
88        })
89    }
90
91    /// Get a reference to the underlying connection pool.
92    pub fn pool(&self) -> &SqlitePool {
93        &self.pool
94    }
95}
96
97/// Create the SQLite file at `db_path` with mode 0o600 if it doesn't already
98/// exist. `:memory:` and any non-disk URI is left alone. On non-Unix this is
99/// a best-effort `create_new` without explicit mode bits.
100fn precreate_sqlite_file(db_path: &str) -> Result<(), MemoryError> {
101    if db_path == ":memory:" || db_path.starts_with("file::memory:") {
102        return Ok(());
103    }
104    let path = std::path::Path::new(db_path);
105    if path.exists() {
106        return Ok(());
107    }
108
109    #[cfg(unix)]
110    {
111        use std::os::unix::fs::OpenOptionsExt;
112        match std::fs::OpenOptions::new()
113            .create_new(true)
114            .write(true)
115            .mode(0o600)
116            .open(path)
117        {
118            Ok(_) => Ok(()),
119            // Race with another process — the file now exists, that's fine
120            // because tighten_unix_file_perms runs after migrations.
121            Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => Ok(()),
122            Err(e) => Err(MemoryError::io("pre-create db file at 0o600", e)),
123        }
124    }
125    #[cfg(not(unix))]
126    {
127        // On Windows ACLs are inherited from the parent dir; we have no
128        // useful mode bits to set here. Touching the file early would just
129        // duplicate what sqlx does.
130        let _ = path;
131        Ok(())
132    }
133}
134
135#[cfg(unix)]
136fn tighten_unix_file_perms(path: &str) {
137    use std::os::unix::fs::PermissionsExt;
138    if let Ok(meta) = std::fs::metadata(path) {
139        let mut perms = meta.permissions();
140        perms.set_mode(0o600);
141        if let Err(e) = std::fs::set_permissions(path, perms) {
142            tracing::warn!(path = %path, "could not chmod 0600 on memory db: {e}");
143        }
144    }
145}
146
147#[cfg(not(unix))]
148fn tighten_unix_file_perms(_path: &str) {}
149
150#[cfg(unix)]
151fn tighten_unix_dir_perms(path: &std::path::Path) {
152    use std::os::unix::fs::PermissionsExt;
153    if let Ok(meta) = std::fs::metadata(path) {
154        let mut perms = meta.permissions();
155        perms.set_mode(0o700);
156        if let Err(e) = std::fs::set_permissions(path, perms) {
157            tracing::warn!(path = %path.display(), "could not chmod 0700 on memory data dir: {e}");
158        }
159    }
160}
161
162#[cfg(not(unix))]
163fn tighten_unix_dir_perms(_path: &std::path::Path) {}
164
165impl Store {
166    /// Get the database file size in bytes.
167    pub async fn db_size(&self) -> Result<u64, MemoryError> {
168        let (page_count,): (i64,) = sqlx::query_as("PRAGMA page_count")
169            .fetch_one(&self.pool)
170            .await
171            .map_err(|e| MemoryError::sqlite("pragma failed", e))?;
172
173        let (page_size,): (i64,) = sqlx::query_as("PRAGMA page_size")
174            .fetch_one(&self.pool)
175            .await
176            .map_err(|e| MemoryError::sqlite("pragma failed", e))?;
177
178        Ok((page_count * page_size) as u64)
179    }
180
181    /// Run SQL migrations, tracking which have already been applied.
182    pub(crate) async fn run_migrations(pool: &SqlitePool) -> Result<(), MemoryError> {
183        sqlx::raw_sql(
184            "CREATE TABLE IF NOT EXISTS _migrations (
185                name TEXT PRIMARY KEY,
186                applied_at TEXT NOT NULL DEFAULT (datetime('now'))
187            );",
188        )
189        .execute(pool)
190        .await
191        .map_err(|e| MemoryError::sqlite("failed to create migrations table", e))?;
192
193        // Bootstrap: if _migrations is empty but tables already exist from
194        // a pre-tracking era, mark all existing migrations as applied.
195        let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM _migrations")
196            .fetch_one(pool)
197            .await
198            .map_err(|e| MemoryError::sqlite("failed to count migrations", e))?;
199
200        if count.0 == 0 {
201            let has_summary: bool = sqlx::query_scalar::<_, String>(
202                "SELECT sql FROM sqlite_master WHERE type='table' AND name='conversations'",
203            )
204            .fetch_optional(pool)
205            .await
206            .ok()
207            .flatten()
208            .map(|sql| sql.contains("summary"))
209            .unwrap_or(false);
210
211            if has_summary {
212                for name in &["001_init", "002_audit_log", "003_memory_enhancement"] {
213                    sqlx::query("INSERT OR IGNORE INTO _migrations (name) VALUES (?)")
214                        .bind(name)
215                        .execute(pool)
216                        .await
217                        .map_err(|e| {
218                            MemoryError::sqlite(format!("failed to bootstrap migration {name}"), e)
219                        })?;
220                }
221            }
222        }
223
224        let migrations: &[(&str, &str)] = &[
225            ("001_init", include_str!("../../migrations/001_init.sql")),
226            (
227                "002_audit_log",
228                include_str!("../../migrations/002_audit_log.sql"),
229            ),
230            (
231                "003_memory_enhancement",
232                include_str!("../../migrations/003_memory_enhancement.sql"),
233            ),
234            (
235                "004_fts5_recall",
236                include_str!("../../migrations/004_fts5_recall.sql"),
237            ),
238            (
239                "005_scheduled_tasks",
240                include_str!("../../migrations/005_scheduled_tasks.sql"),
241            ),
242            (
243                "006_limitations",
244                include_str!("../../migrations/006_limitations.sql"),
245            ),
246            (
247                "007_task_type",
248                include_str!("../../migrations/007_task_type.sql"),
249            ),
250            (
251                "008_user_aliases",
252                include_str!("../../migrations/008_user_aliases.sql"),
253            ),
254            (
255                "009_task_retry",
256                include_str!("../../migrations/009_task_retry.sql"),
257            ),
258            (
259                "010_outcomes",
260                include_str!("../../migrations/010_outcomes.sql"),
261            ),
262            (
263                "011_project_learning",
264                include_str!("../../migrations/011_project_learning.sql"),
265            ),
266            (
267                "012_project_sessions",
268                include_str!("../../migrations/012_project_sessions.sql"),
269            ),
270            (
271                "013_multi_lessons",
272                include_str!("../../migrations/013_multi_lessons.sql"),
273            ),
274            (
275                "014_token_usage",
276                include_str!("../../migrations/014_token_usage.sql"),
277            ),
278            (
279                "015_phase_checkpoints",
280                include_str!("../../migrations/015_phase_checkpoints.sql"),
281            ),
282            (
283                "016_cache_token_breakdown",
284                include_str!("../../migrations/016_cache_token_breakdown.sql"),
285            ),
286            (
287                "017_soft_delete",
288                include_str!("../../migrations/017_soft_delete.sql"),
289            ),
290            (
291                "018_observations",
292                include_str!("../../migrations/018_observations.sql"),
293            ),
294        ];
295
296        // Fast-path: fetch the applied-migrations set in a single SELECT
297        // and check membership in memory. The previous shape issued one
298        // `SELECT name FROM _migrations WHERE name = ?` round-trip per
299        // migration (17 of them at time of writing), which dominated the
300        // cold-open cost of `Store::new` on warm caches. One SELECT is
301        // O(N) network IO instead of O(N²); the in-memory check is free.
302        let applied_rows: Vec<(String,)> = sqlx::query_as("SELECT name FROM _migrations")
303            .fetch_all(pool)
304            .await
305            .map_err(|e| MemoryError::sqlite("failed to load applied migrations", e))?;
306        let applied: std::collections::HashSet<String> =
307            applied_rows.into_iter().map(|(name,)| name).collect();
308
309        for (name, sql) in migrations {
310            if applied.contains(*name) {
311                continue;
312            }
313
314            sqlx::raw_sql(sql)
315                .execute(pool)
316                .await
317                .map_err(|e| MemoryError::sqlite(format!("migration {name} failed"), e))?;
318
319            sqlx::query("INSERT INTO _migrations (name) VALUES (?)")
320                .bind(name)
321                .execute(pool)
322                .await
323                .map_err(|e| {
324                    MemoryError::sqlite(format!("failed to record migration {name}"), e)
325                })?;
326        }
327        Ok(())
328    }
329}
330
331#[cfg(test)]
332mod tests;