Skip to main content

kernex_memory/store/
mod.rs

1//! SQLite-backed persistent memory store.
2//!
3//! Split into focused submodules:
4//! - `conversations` — conversation lifecycle (create, find, close, summaries)
5//! - `messages` — message storage and full-text search
6//! - `facts` — user facts, aliases, and limitations
7//! - `tasks` — scheduled task CRUD and dedup
8//! - `context` — context building and user profile formatting
9//! - `context_helpers` — onboarding stages, system prompt composition, language detection
10
11mod checkpoints;
12mod context;
13mod context_helpers;
14mod conversations;
15mod facts;
16mod messages;
17mod outcomes;
18mod sessions;
19mod tasks;
20mod usage;
21
22pub use checkpoints::PhaseCheckpoint;
23pub use context::{detect_language, format_user_profile};
24pub use tasks::DueTask;
25pub use usage::{UsageBreakdown, UsageSummary};
26
27use kernex_core::{config::MemoryConfig, error::KernexError, shellexpand};
28use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
29use sqlx::SqlitePool;
30use std::str::FromStr;
31use tracing::info;
32
33/// How long (in minutes) before a conversation is considered idle.
34const CONVERSATION_TIMEOUT_MINUTES: i64 = 120;
35
36/// Persistent memory store backed by SQLite.
37#[derive(Clone)]
38pub struct Store {
39    pool: SqlitePool,
40    max_context_messages: usize,
41}
42
43impl Store {
44    /// Create a new store, running migrations on first use.
45    pub async fn new(config: &MemoryConfig) -> Result<Self, KernexError> {
46        let db_path = shellexpand(&config.db_path);
47
48        // Ensure parent directory exists. On Unix, also restrict its mode to
49        // 0o700 so other local users can't enumerate or read the SQLite WAL
50        // files that may briefly contain message text en route to disk.
51        if let Some(parent) = std::path::Path::new(&db_path).parent() {
52            std::fs::create_dir_all(parent)
53                .map_err(|e| KernexError::Store(format!("failed to create data dir: {e}")))?;
54            tighten_unix_dir_perms(parent);
55        }
56
57        // Pre-create the SQLite file with mode 0o600 *before* sqlx connects.
58        // Otherwise sqlx's create_if_missing path creates the file under the
59        // process umask (typically 0o644) and our chmod runs only after
60        // migrations — leaving a window where another local user can read
61        // messages, facts, and audit log entries on a shared host.
62        precreate_sqlite_file(&db_path)?;
63
64        let opts = SqliteConnectOptions::from_str(&format!("sqlite:{db_path}"))
65            .map_err(|e| KernexError::Store(format!("invalid db path: {e}")))?
66            .create_if_missing(true)
67            .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal);
68
69        let pool = SqlitePoolOptions::new()
70            .max_connections(config.max_connections)
71            .connect_with(opts)
72            .await
73            .map_err(|e| KernexError::Store(format!("failed to connect to sqlite: {e}")))?;
74
75        Self::run_migrations(&pool).await?;
76
77        // Belt-and-braces: re-tighten in case sqlx (or a future version of
78        // it) ever recreates the file under a relaxed mode.
79        tighten_unix_file_perms(&db_path);
80
81        info!("Memory store initialized at {db_path}");
82
83        Ok(Self {
84            pool,
85            max_context_messages: config.max_context_messages,
86        })
87    }
88
89    /// Get a reference to the underlying connection pool.
90    pub fn pool(&self) -> &SqlitePool {
91        &self.pool
92    }
93}
94
95/// Create the SQLite file at `db_path` with mode 0o600 if it doesn't already
96/// exist. `:memory:` and any non-disk URI is left alone. On non-Unix this is
97/// a best-effort `create_new` without explicit mode bits.
98fn precreate_sqlite_file(db_path: &str) -> Result<(), KernexError> {
99    if db_path == ":memory:" || db_path.starts_with("file::memory:") {
100        return Ok(());
101    }
102    let path = std::path::Path::new(db_path);
103    if path.exists() {
104        return Ok(());
105    }
106
107    #[cfg(unix)]
108    {
109        use std::os::unix::fs::OpenOptionsExt;
110        match std::fs::OpenOptions::new()
111            .create_new(true)
112            .write(true)
113            .mode(0o600)
114            .open(path)
115        {
116            Ok(_) => Ok(()),
117            // Race with another process — the file now exists, that's fine
118            // because tighten_unix_file_perms runs after migrations.
119            Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => Ok(()),
120            Err(e) => Err(KernexError::Store(format!(
121                "failed to pre-create db file at 0o600: {e}"
122            ))),
123        }
124    }
125    #[cfg(not(unix))]
126    {
127        // On Windows ACLs are inherited from the parent dir; we have no
128        // useful mode bits to set here. Touching the file early would just
129        // duplicate what sqlx does.
130        let _ = path;
131        Ok(())
132    }
133}
134
135#[cfg(unix)]
136fn tighten_unix_file_perms(path: &str) {
137    use std::os::unix::fs::PermissionsExt;
138    if let Ok(meta) = std::fs::metadata(path) {
139        let mut perms = meta.permissions();
140        perms.set_mode(0o600);
141        if let Err(e) = std::fs::set_permissions(path, perms) {
142            tracing::warn!(path = %path, "could not chmod 0600 on memory db: {e}");
143        }
144    }
145}
146
147#[cfg(not(unix))]
148fn tighten_unix_file_perms(_path: &str) {}
149
150#[cfg(unix)]
151fn tighten_unix_dir_perms(path: &std::path::Path) {
152    use std::os::unix::fs::PermissionsExt;
153    if let Ok(meta) = std::fs::metadata(path) {
154        let mut perms = meta.permissions();
155        perms.set_mode(0o700);
156        if let Err(e) = std::fs::set_permissions(path, perms) {
157            tracing::warn!(path = %path.display(), "could not chmod 0700 on memory data dir: {e}");
158        }
159    }
160}
161
162#[cfg(not(unix))]
163fn tighten_unix_dir_perms(_path: &std::path::Path) {}
164
165impl Store {
166    /// Get the database file size in bytes.
167    pub async fn db_size(&self) -> Result<u64, KernexError> {
168        let (page_count,): (i64,) = sqlx::query_as("PRAGMA page_count")
169            .fetch_one(&self.pool)
170            .await
171            .map_err(|e| KernexError::Store(format!("pragma failed: {e}")))?;
172
173        let (page_size,): (i64,) = sqlx::query_as("PRAGMA page_size")
174            .fetch_one(&self.pool)
175            .await
176            .map_err(|e| KernexError::Store(format!("pragma failed: {e}")))?;
177
178        Ok((page_count * page_size) as u64)
179    }
180
181    /// Run SQL migrations, tracking which have already been applied.
182    pub(crate) async fn run_migrations(pool: &SqlitePool) -> Result<(), KernexError> {
183        sqlx::raw_sql(
184            "CREATE TABLE IF NOT EXISTS _migrations (
185                name TEXT PRIMARY KEY,
186                applied_at TEXT NOT NULL DEFAULT (datetime('now'))
187            );",
188        )
189        .execute(pool)
190        .await
191        .map_err(|e| KernexError::Store(format!("failed to create migrations table: {e}")))?;
192
193        // Bootstrap: if _migrations is empty but tables already exist from
194        // a pre-tracking era, mark all existing migrations as applied.
195        let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM _migrations")
196            .fetch_one(pool)
197            .await
198            .map_err(|e| KernexError::Store(format!("failed to count migrations: {e}")))?;
199
200        if count.0 == 0 {
201            let has_summary: bool = sqlx::query_scalar::<_, String>(
202                "SELECT sql FROM sqlite_master WHERE type='table' AND name='conversations'",
203            )
204            .fetch_optional(pool)
205            .await
206            .ok()
207            .flatten()
208            .map(|sql| sql.contains("summary"))
209            .unwrap_or(false);
210
211            if has_summary {
212                for name in &["001_init", "002_audit_log", "003_memory_enhancement"] {
213                    sqlx::query("INSERT OR IGNORE INTO _migrations (name) VALUES (?)")
214                        .bind(name)
215                        .execute(pool)
216                        .await
217                        .map_err(|e| {
218                            KernexError::Store(format!("failed to bootstrap migration {name}: {e}"))
219                        })?;
220                }
221            }
222        }
223
224        let migrations: &[(&str, &str)] = &[
225            ("001_init", include_str!("../../migrations/001_init.sql")),
226            (
227                "002_audit_log",
228                include_str!("../../migrations/002_audit_log.sql"),
229            ),
230            (
231                "003_memory_enhancement",
232                include_str!("../../migrations/003_memory_enhancement.sql"),
233            ),
234            (
235                "004_fts5_recall",
236                include_str!("../../migrations/004_fts5_recall.sql"),
237            ),
238            (
239                "005_scheduled_tasks",
240                include_str!("../../migrations/005_scheduled_tasks.sql"),
241            ),
242            (
243                "006_limitations",
244                include_str!("../../migrations/006_limitations.sql"),
245            ),
246            (
247                "007_task_type",
248                include_str!("../../migrations/007_task_type.sql"),
249            ),
250            (
251                "008_user_aliases",
252                include_str!("../../migrations/008_user_aliases.sql"),
253            ),
254            (
255                "009_task_retry",
256                include_str!("../../migrations/009_task_retry.sql"),
257            ),
258            (
259                "010_outcomes",
260                include_str!("../../migrations/010_outcomes.sql"),
261            ),
262            (
263                "011_project_learning",
264                include_str!("../../migrations/011_project_learning.sql"),
265            ),
266            (
267                "012_project_sessions",
268                include_str!("../../migrations/012_project_sessions.sql"),
269            ),
270            (
271                "013_multi_lessons",
272                include_str!("../../migrations/013_multi_lessons.sql"),
273            ),
274            (
275                "014_token_usage",
276                include_str!("../../migrations/014_token_usage.sql"),
277            ),
278            (
279                "015_phase_checkpoints",
280                include_str!("../../migrations/015_phase_checkpoints.sql"),
281            ),
282            (
283                "016_cache_token_breakdown",
284                include_str!("../../migrations/016_cache_token_breakdown.sql"),
285            ),
286        ];
287
288        for (name, sql) in migrations {
289            let applied: Option<(String,)> =
290                sqlx::query_as("SELECT name FROM _migrations WHERE name = ?")
291                    .bind(name)
292                    .fetch_optional(pool)
293                    .await
294                    .map_err(|e| {
295                        KernexError::Store(format!("failed to check migration {name}: {e}"))
296                    })?;
297
298            if applied.is_some() {
299                continue;
300            }
301
302            sqlx::raw_sql(sql)
303                .execute(pool)
304                .await
305                .map_err(|e| KernexError::Store(format!("migration {name} failed: {e}")))?;
306
307            sqlx::query("INSERT INTO _migrations (name) VALUES (?)")
308                .bind(name)
309                .execute(pool)
310                .await
311                .map_err(|e| {
312                    KernexError::Store(format!("failed to record migration {name}: {e}"))
313                })?;
314        }
315        Ok(())
316    }
317}
318
319#[cfg(test)]
320mod tests;