Skip to main content

zeph_memory/sqlite/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4mod acp_sessions;
5pub mod corrections;
6#[cfg(feature = "experiments")]
7pub mod experiments;
8pub mod graph_store;
9mod history;
10pub(crate) mod messages;
11mod skills;
12mod summaries;
13mod trust;
14
15use sqlx::SqlitePool;
16use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
17use std::str::FromStr;
18use std::time::Duration;
19
20use crate::error::MemoryError;
21
22pub use acp_sessions::{AcpSessionEvent, AcpSessionInfo};
23pub use messages::role_str;
24pub use skills::{SkillMetricsRow, SkillUsageRow, SkillVersionRow};
25pub use trust::{SkillTrustRow, SourceKind};
26
27#[derive(Debug, Clone)]
28pub struct SqliteStore {
29    pool: SqlitePool,
30}
31
32impl SqliteStore {
33    const DEFAULT_BUSY_TIMEOUT: Duration = Duration::from_secs(5);
34
35    /// Open (or create) the `SQLite` database and run migrations.
36    ///
37    /// Enables foreign key constraints at connection level so that
38    /// `ON DELETE CASCADE` and other FK rules are enforced.
39    ///
40    /// # Errors
41    ///
42    /// Returns an error if the database cannot be opened or migrations fail.
43    pub async fn new(path: &str) -> Result<Self, MemoryError> {
44        Self::with_pool_size(path, 5).await
45    }
46
47    /// Open (or create) the `SQLite` database with a configurable connection pool size.
48    ///
49    /// # Errors
50    ///
51    /// Returns an error if the database cannot be opened or migrations fail.
52    pub async fn with_pool_size(path: &str, pool_size: u32) -> Result<Self, MemoryError> {
53        let url = if path == ":memory:" {
54            "sqlite::memory:".to_string()
55        } else {
56            if let Some(parent) = std::path::Path::new(path).parent()
57                && !parent.as_os_str().is_empty()
58            {
59                std::fs::create_dir_all(parent)?;
60            }
61            format!("sqlite:{path}?mode=rwc")
62        };
63
64        let opts = SqliteConnectOptions::from_str(&url)?
65            .create_if_missing(true)
66            .foreign_keys(true)
67            .busy_timeout(Self::DEFAULT_BUSY_TIMEOUT)
68            .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
69            .synchronous(sqlx::sqlite::SqliteSynchronous::Normal);
70
71        let pool = SqlitePoolOptions::new()
72            .max_connections(pool_size)
73            .connect_with(opts)
74            .await?;
75
76        sqlx::migrate!("./migrations").run(&pool).await?;
77
78        Ok(Self { pool })
79    }
80
81    /// Expose the underlying pool for shared access by other stores.
82    #[must_use]
83    pub fn pool(&self) -> &SqlitePool {
84        &self.pool
85    }
86
87    /// Run all migrations on the given pool.
88    ///
89    /// # Errors
90    ///
91    /// Returns an error if any migration fails.
92    pub async fn run_migrations(pool: &SqlitePool) -> Result<(), MemoryError> {
93        sqlx::migrate!("./migrations").run(pool).await?;
94        Ok(())
95    }
96}
97
98#[cfg(test)]
99mod tests {
100    use super::*;
101    use tempfile::NamedTempFile;
102
103    #[tokio::test]
104    async fn wal_journal_mode_enabled_on_file_db() {
105        let file = NamedTempFile::new().expect("tempfile");
106        let path = file.path().to_str().expect("valid path");
107
108        let store = SqliteStore::new(path).await.expect("SqliteStore::new");
109
110        let mode: String = sqlx::query_scalar("PRAGMA journal_mode")
111            .fetch_one(store.pool())
112            .await
113            .expect("PRAGMA query");
114
115        assert_eq!(mode, "wal", "expected WAL journal mode, got: {mode}");
116    }
117
118    #[tokio::test]
119    async fn busy_timeout_enabled_on_file_db() {
120        let file = NamedTempFile::new().expect("tempfile");
121        let path = file.path().to_str().expect("valid path");
122
123        let store = SqliteStore::new(path).await.expect("SqliteStore::new");
124
125        let timeout_ms: i64 = sqlx::query_scalar("PRAGMA busy_timeout")
126            .fetch_one(store.pool())
127            .await
128            .expect("PRAGMA query");
129
130        assert_eq!(
131            timeout_ms,
132            i64::try_from(SqliteStore::DEFAULT_BUSY_TIMEOUT.as_millis()).unwrap(),
133            "expected busy_timeout pragma to match configured timeout"
134        );
135    }
136
137    #[tokio::test]
138    async fn creates_parent_dirs() {
139        let dir = tempfile::tempdir().expect("tempdir");
140        let deep = dir.path().join("a/b/c/zeph.db");
141        let path = deep.to_str().expect("valid path");
142        let _store = SqliteStore::new(path).await.expect("SqliteStore::new");
143        assert!(deep.exists(), "database file should exist");
144    }
145
146    #[tokio::test]
147    async fn with_pool_size_accepts_custom_size() {
148        let store = SqliteStore::with_pool_size(":memory:", 2)
149            .await
150            .expect("with_pool_size");
151        // Verify the store is operational with the custom pool size.
152        let _cid = store
153            .create_conversation()
154            .await
155            .expect("create_conversation");
156    }
157}