zeph_memory/sqlite/
mod.rs1mod acp_sessions;
5pub mod compression_guidelines;
6pub mod corrections;
7#[cfg(feature = "experiments")]
8pub mod experiments;
9pub mod graph_store;
10mod history;
11pub(crate) mod messages;
12pub mod overflow;
13pub mod preferences;
14mod skills;
15mod summaries;
16mod trust;
17
18use sqlx::SqlitePool;
19use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
20use std::str::FromStr;
21use std::time::Duration;
22
23use crate::error::MemoryError;
24
25pub use acp_sessions::{AcpSessionEvent, AcpSessionInfo};
26pub use messages::role_str;
27pub use skills::{SkillMetricsRow, SkillUsageRow, SkillVersionRow};
28pub use trust::{SkillTrustRow, SourceKind};
29
30#[derive(Debug, Clone)]
31pub struct SqliteStore {
32 pool: SqlitePool,
33}
34
35impl SqliteStore {
36 const DEFAULT_BUSY_TIMEOUT: Duration = Duration::from_secs(5);
37
38 pub async fn new(path: &str) -> Result<Self, MemoryError> {
47 Self::with_pool_size(path, 5).await
48 }
49
50 pub async fn with_pool_size(path: &str, pool_size: u32) -> Result<Self, MemoryError> {
56 let url = if path == ":memory:" {
57 "sqlite::memory:".to_string()
58 } else {
59 if let Some(parent) = std::path::Path::new(path).parent()
60 && !parent.as_os_str().is_empty()
61 {
62 std::fs::create_dir_all(parent)?;
63 }
64 format!("sqlite:{path}?mode=rwc")
65 };
66
67 let opts = SqliteConnectOptions::from_str(&url)?
68 .create_if_missing(true)
69 .foreign_keys(true)
70 .busy_timeout(Self::DEFAULT_BUSY_TIMEOUT)
71 .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
72 .synchronous(sqlx::sqlite::SqliteSynchronous::Normal);
73
74 let pool = SqlitePoolOptions::new()
75 .max_connections(pool_size)
76 .connect_with(opts)
77 .await?;
78
79 sqlx::migrate!("./migrations").run(&pool).await?;
80
81 Ok(Self { pool })
82 }
83
84 #[must_use]
86 pub fn pool(&self) -> &SqlitePool {
87 &self.pool
88 }
89
90 pub async fn run_migrations(pool: &SqlitePool) -> Result<(), MemoryError> {
96 sqlx::migrate!("./migrations").run(pool).await?;
97 Ok(())
98 }
99}
100
101#[cfg(test)]
102mod tests {
103 use super::*;
104 use tempfile::NamedTempFile;
105
106 #[tokio::test]
107 async fn wal_journal_mode_enabled_on_file_db() {
108 let file = NamedTempFile::new().expect("tempfile");
109 let path = file.path().to_str().expect("valid path");
110
111 let store = SqliteStore::new(path).await.expect("SqliteStore::new");
112
113 let mode: String = sqlx::query_scalar("PRAGMA journal_mode")
114 .fetch_one(store.pool())
115 .await
116 .expect("PRAGMA query");
117
118 assert_eq!(mode, "wal", "expected WAL journal mode, got: {mode}");
119 }
120
121 #[tokio::test]
122 async fn busy_timeout_enabled_on_file_db() {
123 let file = NamedTempFile::new().expect("tempfile");
124 let path = file.path().to_str().expect("valid path");
125
126 let store = SqliteStore::new(path).await.expect("SqliteStore::new");
127
128 let timeout_ms: i64 = sqlx::query_scalar("PRAGMA busy_timeout")
129 .fetch_one(store.pool())
130 .await
131 .expect("PRAGMA query");
132
133 assert_eq!(
134 timeout_ms,
135 i64::try_from(SqliteStore::DEFAULT_BUSY_TIMEOUT.as_millis()).unwrap(),
136 "expected busy_timeout pragma to match configured timeout"
137 );
138 }
139
140 #[tokio::test]
141 async fn creates_parent_dirs() {
142 let dir = tempfile::tempdir().expect("tempdir");
143 let deep = dir.path().join("a/b/c/zeph.db");
144 let path = deep.to_str().expect("valid path");
145 let _store = SqliteStore::new(path).await.expect("SqliteStore::new");
146 assert!(deep.exists(), "database file should exist");
147 }
148
149 #[tokio::test]
150 async fn with_pool_size_accepts_custom_size() {
151 let store = SqliteStore::with_pool_size(":memory:", 2)
152 .await
153 .expect("with_pool_size");
154 let _cid = store
156 .create_conversation()
157 .await
158 .expect("create_conversation");
159 }
160}