Skip to main content

zeph_memory/sqlite/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4mod acp_sessions;
5pub mod compression_guidelines;
6pub mod corrections;
7#[cfg(feature = "experiments")]
8pub mod experiments;
9pub mod graph_store;
10mod history;
11pub(crate) mod messages;
12pub mod overflow;
13mod skills;
14mod summaries;
15mod trust;
16
17use sqlx::SqlitePool;
18use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
19use std::str::FromStr;
20use std::time::Duration;
21
22use crate::error::MemoryError;
23
24pub use acp_sessions::{AcpSessionEvent, AcpSessionInfo};
25pub use messages::role_str;
26pub use skills::{SkillMetricsRow, SkillUsageRow, SkillVersionRow};
27pub use trust::{SkillTrustRow, SourceKind};
28
29#[derive(Debug, Clone)]
30pub struct SqliteStore {
31    pool: SqlitePool,
32}
33
34impl SqliteStore {
35    const DEFAULT_BUSY_TIMEOUT: Duration = Duration::from_secs(5);
36
37    /// Open (or create) the `SQLite` database and run migrations.
38    ///
39    /// Enables foreign key constraints at connection level so that
40    /// `ON DELETE CASCADE` and other FK rules are enforced.
41    ///
42    /// # Errors
43    ///
44    /// Returns an error if the database cannot be opened or migrations fail.
45    pub async fn new(path: &str) -> Result<Self, MemoryError> {
46        Self::with_pool_size(path, 5).await
47    }
48
49    /// Open (or create) the `SQLite` database with a configurable connection pool size.
50    ///
51    /// # Errors
52    ///
53    /// Returns an error if the database cannot be opened or migrations fail.
54    pub async fn with_pool_size(path: &str, pool_size: u32) -> Result<Self, MemoryError> {
55        let url = if path == ":memory:" {
56            "sqlite::memory:".to_string()
57        } else {
58            if let Some(parent) = std::path::Path::new(path).parent()
59                && !parent.as_os_str().is_empty()
60            {
61                std::fs::create_dir_all(parent)?;
62            }
63            format!("sqlite:{path}?mode=rwc")
64        };
65
66        let opts = SqliteConnectOptions::from_str(&url)?
67            .create_if_missing(true)
68            .foreign_keys(true)
69            .busy_timeout(Self::DEFAULT_BUSY_TIMEOUT)
70            .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
71            .synchronous(sqlx::sqlite::SqliteSynchronous::Normal);
72
73        let pool = SqlitePoolOptions::new()
74            .max_connections(pool_size)
75            .connect_with(opts)
76            .await?;
77
78        sqlx::migrate!("./migrations").run(&pool).await?;
79
80        Ok(Self { pool })
81    }
82
83    /// Expose the underlying pool for shared access by other stores.
84    #[must_use]
85    pub fn pool(&self) -> &SqlitePool {
86        &self.pool
87    }
88
89    /// Run all migrations on the given pool.
90    ///
91    /// # Errors
92    ///
93    /// Returns an error if any migration fails.
94    pub async fn run_migrations(pool: &SqlitePool) -> Result<(), MemoryError> {
95        sqlx::migrate!("./migrations").run(pool).await?;
96        Ok(())
97    }
98}
99
100#[cfg(test)]
101mod tests {
102    use super::*;
103    use tempfile::NamedTempFile;
104
105    #[tokio::test]
106    async fn wal_journal_mode_enabled_on_file_db() {
107        let file = NamedTempFile::new().expect("tempfile");
108        let path = file.path().to_str().expect("valid path");
109
110        let store = SqliteStore::new(path).await.expect("SqliteStore::new");
111
112        let mode: String = sqlx::query_scalar("PRAGMA journal_mode")
113            .fetch_one(store.pool())
114            .await
115            .expect("PRAGMA query");
116
117        assert_eq!(mode, "wal", "expected WAL journal mode, got: {mode}");
118    }
119
120    #[tokio::test]
121    async fn busy_timeout_enabled_on_file_db() {
122        let file = NamedTempFile::new().expect("tempfile");
123        let path = file.path().to_str().expect("valid path");
124
125        let store = SqliteStore::new(path).await.expect("SqliteStore::new");
126
127        let timeout_ms: i64 = sqlx::query_scalar("PRAGMA busy_timeout")
128            .fetch_one(store.pool())
129            .await
130            .expect("PRAGMA query");
131
132        assert_eq!(
133            timeout_ms,
134            i64::try_from(SqliteStore::DEFAULT_BUSY_TIMEOUT.as_millis()).unwrap(),
135            "expected busy_timeout pragma to match configured timeout"
136        );
137    }
138
139    #[tokio::test]
140    async fn creates_parent_dirs() {
141        let dir = tempfile::tempdir().expect("tempdir");
142        let deep = dir.path().join("a/b/c/zeph.db");
143        let path = deep.to_str().expect("valid path");
144        let _store = SqliteStore::new(path).await.expect("SqliteStore::new");
145        assert!(deep.exists(), "database file should exist");
146    }
147
148    #[tokio::test]
149    async fn with_pool_size_accepts_custom_size() {
150        let store = SqliteStore::with_pool_size(":memory:", 2)
151            .await
152            .expect("with_pool_size");
153        // Verify the store is operational with the custom pool size.
154        let _cid = store
155            .create_conversation()
156            .await
157            .expect("create_conversation");
158    }
159}