Skip to main content

kernex_memory/store/
mod.rs

1//! SQLite-backed persistent memory store.
2//!
3//! Split into focused submodules:
4//! - `conversations` — conversation lifecycle (create, find, close, summaries)
5//! - `messages` — message storage and full-text search
6//! - `facts` — user facts, aliases, and limitations
7//! - `tasks` — scheduled task CRUD and dedup
8//! - `context` — context building and user profile formatting
9//! - `context_helpers` — onboarding stages, system prompt composition, language detection
10
11mod checkpoints;
12mod context;
13mod context_helpers;
14mod conversations;
15mod facts;
16mod messages;
17mod outcomes;
18mod sessions;
19mod tasks;
20mod usage;
21
22pub use checkpoints::PhaseCheckpoint;
23pub use context::{detect_language, format_user_profile};
24pub use tasks::DueTask;
25pub use usage::{UsageBreakdown, UsageSummary};
26
27use crate::error::MemoryError;
28use kernex_core::{config::MemoryConfig, shellexpand};
29use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
30use sqlx::SqlitePool;
31use std::str::FromStr;
32use tracing::info;
33
34/// How long (in minutes) before a conversation is considered idle.
35const CONVERSATION_TIMEOUT_MINUTES: i64 = 120;
36
37/// Persistent memory store backed by SQLite.
38#[derive(Clone)]
39pub struct Store {
40    pool: SqlitePool,
41    max_context_messages: usize,
42}
43
44impl Store {
45    /// Create a new store, running migrations on first use.
46    pub async fn new(config: &MemoryConfig) -> Result<Self, MemoryError> {
47        let db_path = shellexpand(&config.db_path);
48
49        // Ensure parent directory exists. On Unix, also restrict its mode to
50        // 0o700 so other local users can't enumerate or read the SQLite WAL
51        // files that may briefly contain message text en route to disk.
52        if let Some(parent) = std::path::Path::new(&db_path).parent() {
53            std::fs::create_dir_all(parent)
54                .map_err(|e| MemoryError::io("failed to create data dir", e))?;
55            tighten_unix_dir_perms(parent);
56        }
57
58        // Pre-create the SQLite file with mode 0o600 *before* sqlx connects.
59        // Otherwise sqlx's create_if_missing path creates the file under the
60        // process umask (typically 0o644) and our chmod runs only after
61        // migrations — leaving a window where another local user can read
62        // messages, facts, and audit log entries on a shared host.
63        precreate_sqlite_file(&db_path)?;
64
65        let opts = SqliteConnectOptions::from_str(&format!("sqlite:{db_path}"))
66            .map_err(|e| MemoryError::sqlite("invalid db path", e))?
67            .create_if_missing(true)
68            .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal);
69
70        let pool = SqlitePoolOptions::new()
71            .max_connections(config.max_connections)
72            .connect_with(opts)
73            .await
74            .map_err(|e| MemoryError::sqlite("failed to connect to sqlite", e))?;
75
76        Self::run_migrations(&pool).await?;
77
78        // Belt-and-braces: re-tighten in case sqlx (or a future version of
79        // it) ever recreates the file under a relaxed mode.
80        tighten_unix_file_perms(&db_path);
81
82        info!("Memory store initialized at {db_path}");
83
84        Ok(Self {
85            pool,
86            max_context_messages: config.max_context_messages,
87        })
88    }
89
90    /// Get a reference to the underlying connection pool.
91    pub fn pool(&self) -> &SqlitePool {
92        &self.pool
93    }
94}
95
96/// Create the SQLite file at `db_path` with mode 0o600 if it doesn't already
97/// exist. `:memory:` and any non-disk URI is left alone. On non-Unix this is
98/// a best-effort `create_new` without explicit mode bits.
99fn precreate_sqlite_file(db_path: &str) -> Result<(), MemoryError> {
100    if db_path == ":memory:" || db_path.starts_with("file::memory:") {
101        return Ok(());
102    }
103    let path = std::path::Path::new(db_path);
104    if path.exists() {
105        return Ok(());
106    }
107
108    #[cfg(unix)]
109    {
110        use std::os::unix::fs::OpenOptionsExt;
111        match std::fs::OpenOptions::new()
112            .create_new(true)
113            .write(true)
114            .mode(0o600)
115            .open(path)
116        {
117            Ok(_) => Ok(()),
118            // Race with another process — the file now exists, that's fine
119            // because tighten_unix_file_perms runs after migrations.
120            Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => Ok(()),
121            Err(e) => Err(MemoryError::io("pre-create db file at 0o600", e)),
122        }
123    }
124    #[cfg(not(unix))]
125    {
126        // On Windows ACLs are inherited from the parent dir; we have no
127        // useful mode bits to set here. Touching the file early would just
128        // duplicate what sqlx does.
129        let _ = path;
130        Ok(())
131    }
132}
133
134#[cfg(unix)]
135fn tighten_unix_file_perms(path: &str) {
136    use std::os::unix::fs::PermissionsExt;
137    if let Ok(meta) = std::fs::metadata(path) {
138        let mut perms = meta.permissions();
139        perms.set_mode(0o600);
140        if let Err(e) = std::fs::set_permissions(path, perms) {
141            tracing::warn!(path = %path, "could not chmod 0600 on memory db: {e}");
142        }
143    }
144}
145
146#[cfg(not(unix))]
147fn tighten_unix_file_perms(_path: &str) {}
148
149#[cfg(unix)]
150fn tighten_unix_dir_perms(path: &std::path::Path) {
151    use std::os::unix::fs::PermissionsExt;
152    if let Ok(meta) = std::fs::metadata(path) {
153        let mut perms = meta.permissions();
154        perms.set_mode(0o700);
155        if let Err(e) = std::fs::set_permissions(path, perms) {
156            tracing::warn!(path = %path.display(), "could not chmod 0700 on memory data dir: {e}");
157        }
158    }
159}
160
161#[cfg(not(unix))]
162fn tighten_unix_dir_perms(_path: &std::path::Path) {}
163
164impl Store {
165    /// Get the database file size in bytes.
166    pub async fn db_size(&self) -> Result<u64, MemoryError> {
167        let (page_count,): (i64,) = sqlx::query_as("PRAGMA page_count")
168            .fetch_one(&self.pool)
169            .await
170            .map_err(|e| MemoryError::sqlite("pragma failed", e))?;
171
172        let (page_size,): (i64,) = sqlx::query_as("PRAGMA page_size")
173            .fetch_one(&self.pool)
174            .await
175            .map_err(|e| MemoryError::sqlite("pragma failed", e))?;
176
177        Ok((page_count * page_size) as u64)
178    }
179
180    /// Run SQL migrations, tracking which have already been applied.
181    pub(crate) async fn run_migrations(pool: &SqlitePool) -> Result<(), MemoryError> {
182        sqlx::raw_sql(
183            "CREATE TABLE IF NOT EXISTS _migrations (
184                name TEXT PRIMARY KEY,
185                applied_at TEXT NOT NULL DEFAULT (datetime('now'))
186            );",
187        )
188        .execute(pool)
189        .await
190        .map_err(|e| MemoryError::sqlite("failed to create migrations table", e))?;
191
192        // Bootstrap: if _migrations is empty but tables already exist from
193        // a pre-tracking era, mark all existing migrations as applied.
194        let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM _migrations")
195            .fetch_one(pool)
196            .await
197            .map_err(|e| MemoryError::sqlite("failed to count migrations", e))?;
198
199        if count.0 == 0 {
200            let has_summary: bool = sqlx::query_scalar::<_, String>(
201                "SELECT sql FROM sqlite_master WHERE type='table' AND name='conversations'",
202            )
203            .fetch_optional(pool)
204            .await
205            .ok()
206            .flatten()
207            .map(|sql| sql.contains("summary"))
208            .unwrap_or(false);
209
210            if has_summary {
211                for name in &["001_init", "002_audit_log", "003_memory_enhancement"] {
212                    sqlx::query("INSERT OR IGNORE INTO _migrations (name) VALUES (?)")
213                        .bind(name)
214                        .execute(pool)
215                        .await
216                        .map_err(|e| {
217                            MemoryError::sqlite(format!("failed to bootstrap migration {name}"), e)
218                        })?;
219                }
220            }
221        }
222
223        let migrations: &[(&str, &str)] = &[
224            ("001_init", include_str!("../../migrations/001_init.sql")),
225            (
226                "002_audit_log",
227                include_str!("../../migrations/002_audit_log.sql"),
228            ),
229            (
230                "003_memory_enhancement",
231                include_str!("../../migrations/003_memory_enhancement.sql"),
232            ),
233            (
234                "004_fts5_recall",
235                include_str!("../../migrations/004_fts5_recall.sql"),
236            ),
237            (
238                "005_scheduled_tasks",
239                include_str!("../../migrations/005_scheduled_tasks.sql"),
240            ),
241            (
242                "006_limitations",
243                include_str!("../../migrations/006_limitations.sql"),
244            ),
245            (
246                "007_task_type",
247                include_str!("../../migrations/007_task_type.sql"),
248            ),
249            (
250                "008_user_aliases",
251                include_str!("../../migrations/008_user_aliases.sql"),
252            ),
253            (
254                "009_task_retry",
255                include_str!("../../migrations/009_task_retry.sql"),
256            ),
257            (
258                "010_outcomes",
259                include_str!("../../migrations/010_outcomes.sql"),
260            ),
261            (
262                "011_project_learning",
263                include_str!("../../migrations/011_project_learning.sql"),
264            ),
265            (
266                "012_project_sessions",
267                include_str!("../../migrations/012_project_sessions.sql"),
268            ),
269            (
270                "013_multi_lessons",
271                include_str!("../../migrations/013_multi_lessons.sql"),
272            ),
273            (
274                "014_token_usage",
275                include_str!("../../migrations/014_token_usage.sql"),
276            ),
277            (
278                "015_phase_checkpoints",
279                include_str!("../../migrations/015_phase_checkpoints.sql"),
280            ),
281            (
282                "016_cache_token_breakdown",
283                include_str!("../../migrations/016_cache_token_breakdown.sql"),
284            ),
285        ];
286
287        for (name, sql) in migrations {
288            let applied: Option<(String,)> =
289                sqlx::query_as("SELECT name FROM _migrations WHERE name = ?")
290                    .bind(name)
291                    .fetch_optional(pool)
292                    .await
293                    .map_err(|e| {
294                        MemoryError::sqlite(format!("failed to check migration {name}"), e)
295                    })?;
296
297            if applied.is_some() {
298                continue;
299            }
300
301            sqlx::raw_sql(sql)
302                .execute(pool)
303                .await
304                .map_err(|e| MemoryError::sqlite(format!("migration {name} failed"), e))?;
305
306            sqlx::query("INSERT INTO _migrations (name) VALUES (?)")
307                .bind(name)
308                .execute(pool)
309                .await
310                .map_err(|e| {
311                    MemoryError::sqlite(format!("failed to record migration {name}"), e)
312                })?;
313        }
314        Ok(())
315    }
316}
317
318#[cfg(test)]
319mod tests;