Skip to main content

kernex_memory/store/
mod.rs

1//! SQLite-backed persistent memory store.
2//!
3//! Split into focused submodules:
4//! - `conversations` — conversation lifecycle (create, find, close, summaries)
5//! - `messages` — message storage and full-text search
6//! - `facts` — user facts, aliases, and limitations
7//! - `tasks` — scheduled task CRUD and dedup
8//! - `context` — context building and user profile formatting
9//! - `context_helpers` — onboarding stages, system prompt composition, language detection
10
11mod context;
12mod context_helpers;
13mod conversations;
14mod facts;
15mod messages;
16mod outcomes;
17mod sessions;
18mod tasks;
19mod usage;
20
21pub use context::{detect_language, format_user_profile};
22pub use tasks::DueTask;
23pub use usage::UsageSummary;
24
25use kernex_core::{config::MemoryConfig, error::KernexError, shellexpand};
26use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
27use sqlx::SqlitePool;
28use std::str::FromStr;
29use tracing::info;
30
31/// How long (in minutes) before a conversation is considered idle.
32const CONVERSATION_TIMEOUT_MINUTES: i64 = 120;
33
34/// Persistent memory store backed by SQLite.
35#[derive(Clone)]
36pub struct Store {
37    pool: SqlitePool,
38    max_context_messages: usize,
39}
40
41impl Store {
42    /// Create a new store, running migrations on first use.
43    pub async fn new(config: &MemoryConfig) -> Result<Self, KernexError> {
44        let db_path = shellexpand(&config.db_path);
45
46        // Ensure parent directory exists.
47        if let Some(parent) = std::path::Path::new(&db_path).parent() {
48            std::fs::create_dir_all(parent)
49                .map_err(|e| KernexError::Store(format!("failed to create data dir: {e}")))?;
50        }
51
52        let opts = SqliteConnectOptions::from_str(&format!("sqlite:{db_path}"))
53            .map_err(|e| KernexError::Store(format!("invalid db path: {e}")))?
54            .create_if_missing(true)
55            .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal);
56
57        let pool = SqlitePoolOptions::new()
58            .max_connections(4)
59            .connect_with(opts)
60            .await
61            .map_err(|e| KernexError::Store(format!("failed to connect to sqlite: {e}")))?;
62
63        Self::run_migrations(&pool).await?;
64
65        info!("Memory store initialized at {db_path}");
66
67        Ok(Self {
68            pool,
69            max_context_messages: config.max_context_messages,
70        })
71    }
72
73    /// Get a reference to the underlying connection pool.
74    pub fn pool(&self) -> &SqlitePool {
75        &self.pool
76    }
77
78    /// Get the database file size in bytes.
79    pub async fn db_size(&self) -> Result<u64, KernexError> {
80        let (page_count,): (i64,) = sqlx::query_as("PRAGMA page_count")
81            .fetch_one(&self.pool)
82            .await
83            .map_err(|e| KernexError::Store(format!("pragma failed: {e}")))?;
84
85        let (page_size,): (i64,) = sqlx::query_as("PRAGMA page_size")
86            .fetch_one(&self.pool)
87            .await
88            .map_err(|e| KernexError::Store(format!("pragma failed: {e}")))?;
89
90        Ok((page_count * page_size) as u64)
91    }
92
93    /// Run SQL migrations, tracking which have already been applied.
94    pub(crate) async fn run_migrations(pool: &SqlitePool) -> Result<(), KernexError> {
95        sqlx::raw_sql(
96            "CREATE TABLE IF NOT EXISTS _migrations (
97                name TEXT PRIMARY KEY,
98                applied_at TEXT NOT NULL DEFAULT (datetime('now'))
99            );",
100        )
101        .execute(pool)
102        .await
103        .map_err(|e| KernexError::Store(format!("failed to create migrations table: {e}")))?;
104
105        // Bootstrap: if _migrations is empty but tables already exist from
106        // a pre-tracking era, mark all existing migrations as applied.
107        let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM _migrations")
108            .fetch_one(pool)
109            .await
110            .map_err(|e| KernexError::Store(format!("failed to count migrations: {e}")))?;
111
112        if count.0 == 0 {
113            let has_summary: bool = sqlx::query_scalar::<_, String>(
114                "SELECT sql FROM sqlite_master WHERE type='table' AND name='conversations'",
115            )
116            .fetch_optional(pool)
117            .await
118            .ok()
119            .flatten()
120            .map(|sql| sql.contains("summary"))
121            .unwrap_or(false);
122
123            if has_summary {
124                for name in &["001_init", "002_audit_log", "003_memory_enhancement"] {
125                    sqlx::query("INSERT OR IGNORE INTO _migrations (name) VALUES (?)")
126                        .bind(name)
127                        .execute(pool)
128                        .await
129                        .map_err(|e| {
130                            KernexError::Store(format!("failed to bootstrap migration {name}: {e}"))
131                        })?;
132                }
133            }
134        }
135
136        let migrations: &[(&str, &str)] = &[
137            ("001_init", include_str!("../../migrations/001_init.sql")),
138            (
139                "002_audit_log",
140                include_str!("../../migrations/002_audit_log.sql"),
141            ),
142            (
143                "003_memory_enhancement",
144                include_str!("../../migrations/003_memory_enhancement.sql"),
145            ),
146            (
147                "004_fts5_recall",
148                include_str!("../../migrations/004_fts5_recall.sql"),
149            ),
150            (
151                "005_scheduled_tasks",
152                include_str!("../../migrations/005_scheduled_tasks.sql"),
153            ),
154            (
155                "006_limitations",
156                include_str!("../../migrations/006_limitations.sql"),
157            ),
158            (
159                "007_task_type",
160                include_str!("../../migrations/007_task_type.sql"),
161            ),
162            (
163                "008_user_aliases",
164                include_str!("../../migrations/008_user_aliases.sql"),
165            ),
166            (
167                "009_task_retry",
168                include_str!("../../migrations/009_task_retry.sql"),
169            ),
170            (
171                "010_outcomes",
172                include_str!("../../migrations/010_outcomes.sql"),
173            ),
174            (
175                "011_project_learning",
176                include_str!("../../migrations/011_project_learning.sql"),
177            ),
178            (
179                "012_project_sessions",
180                include_str!("../../migrations/012_project_sessions.sql"),
181            ),
182            (
183                "013_multi_lessons",
184                include_str!("../../migrations/013_multi_lessons.sql"),
185            ),
186            (
187                "014_token_usage",
188                include_str!("../../migrations/014_token_usage.sql"),
189            ),
190        ];
191
192        for (name, sql) in migrations {
193            let applied: Option<(String,)> =
194                sqlx::query_as("SELECT name FROM _migrations WHERE name = ?")
195                    .bind(name)
196                    .fetch_optional(pool)
197                    .await
198                    .map_err(|e| {
199                        KernexError::Store(format!("failed to check migration {name}: {e}"))
200                    })?;
201
202            if applied.is_some() {
203                continue;
204            }
205
206            sqlx::raw_sql(sql)
207                .execute(pool)
208                .await
209                .map_err(|e| KernexError::Store(format!("migration {name} failed: {e}")))?;
210
211            sqlx::query("INSERT INTO _migrations (name) VALUES (?)")
212                .bind(name)
213                .execute(pool)
214                .await
215                .map_err(|e| {
216                    KernexError::Store(format!("failed to record migration {name}: {e}"))
217                })?;
218        }
219        Ok(())
220    }
221}
222
223#[cfg(test)]
224mod tests;