zeph_memory/store/mod.rs
1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! SQLite-backed relational store for all persistent agent data.
5//!
6//! [`DbStore`] (aliased as [`SqliteStore`]) wraps a [`zeph_db::DbPool`] and provides
7//! typed sub-store modules for every data domain:
8//!
9//! | Module | Contents |
10//! |--------|----------|
11//! | `messages` | Conversation messages, role strings, metadata |
12//! | `summaries` | Compression summaries per conversation |
13//! | `persona` | Long-lived user attributes ([`PersonaFactRow`]) |
14//! | `trajectory` | Goal trajectory entries ([`TrajectoryEntryRow`]) |
15//! | `memory_tree` | Hierarchical note consolidation tree ([`MemoryTreeRow`]) |
16//! | `session_digest` | Per-session digest records |
17//! | `experiments` | A/B experiment results and session summaries |
18//! | `corrections` | User-issued corrections stored for fine-tuning |
19//! | `graph_store` | Entity/edge adjacency tables for the knowledge graph |
20//! | `overflow` | Context-overflow handling metadata |
21//! | `preferences` | User preference key-value store |
22//! | `skills` | Skill metrics, usage, and version records |
23//! | `trust` | Skill trust scores by source |
24//! | `acp_sessions` | ACP protocol session events |
25//! | `mem_scenes` | Scene segmentation records |
26//! | `compression_guidelines` | LLM compression policy guidelines |
27//! | `admission_training` | A-MAC admission training data |
28//! | `channel_preferences` | Per-channel UX preferences (e.g. last active provider) |
29//! | `agent_sessions` | Fleet session lifecycle records ([`AgentSessionRow`]) |
30
31mod acp_sessions;
32pub mod admission_training;
33pub mod agent_sessions;
34pub mod channel_preferences;
35pub mod compression_guidelines;
36pub mod corrections;
37pub mod experiments;
38pub mod graph_store;
39mod history;
40mod mem_scenes;
41pub mod memory_tree;
42pub(crate) mod messages;
43pub mod overflow;
44pub mod persona;
45pub mod preferences;
46pub mod retrieval_failures;
47pub mod session_digest;
48mod skills;
49mod summaries;
50pub mod trajectory;
51mod trust;
52
53#[allow(unused_imports)]
54use zeph_db::sql;
55use zeph_db::{DbConfig, DbPool};
56
57use crate::error::MemoryError;
58
59pub use acp_sessions::{AcpSessionEvent, AcpSessionInfo};
60pub use agent_sessions::{AgentSessionRow, SessionChannel, SessionKind, SessionStatus};
61pub use memory_tree::MemoryTreeRow;
62pub use messages::role_str;
63pub use persona::PersonaFactRow;
64pub use skills::{SkillMetricsRow, SkillUsageRow, SkillVersionRow};
65pub use trajectory::{NewTrajectoryEntry, TrajectoryEntryRow};
66pub use trust::{SkillTrustRow, SourceKind};
67
68/// Backward-compatible type alias. Prefer [`DbStore`] in new code.
69pub type SqliteStore = DbStore;
70
71/// Primary relational data store backed by a [`DbPool`].
72///
73/// Opening a `DbStore` runs all pending `SQLite` migrations automatically.
74///
75/// # Examples
76///
77/// ```rust,no_run
78/// # async fn example() -> Result<(), zeph_memory::MemoryError> {
79/// use zeph_memory::store::DbStore;
80///
81/// let store = DbStore::new(":memory:").await?;
82/// let cid = store.create_conversation().await?;
83/// # Ok(())
84/// # }
85/// ```
86#[derive(Debug, Clone)]
87pub struct DbStore {
88 pool: DbPool,
89}
90
91impl DbStore {
92 /// Open (or create) the database and run migrations.
93 ///
94 /// # Errors
95 ///
96 /// Returns an error if the database cannot be opened or migrations fail.
97 pub async fn new(path: &str) -> Result<Self, MemoryError> {
98 Self::with_pool_size(path, 5).await
99 }
100
101 /// Open (or create) the database with a configurable connection pool size.
102 ///
103 /// # Errors
104 ///
105 /// Returns an error if the database cannot be opened or migrations fail.
106 pub async fn with_pool_size(path: &str, pool_size: u32) -> Result<Self, MemoryError> {
107 let pool = DbConfig {
108 url: path.to_string(),
109 max_connections: pool_size,
110 pool_size,
111 }
112 .connect()
113 .await?;
114
115 Ok(Self { pool })
116 }
117
118 /// Create a store from an already-open pool (no migrations run).
119 ///
120 /// Use this when the pool was obtained from an existing store (e.g. the main
121 /// agent memory store) to avoid redundant migration runs.
122 #[must_use]
123 pub fn from_pool(pool: DbPool) -> Self {
124 Self { pool }
125 }
126
127 /// Expose the underlying pool for shared access by other stores.
128 #[must_use]
129 pub fn pool(&self) -> &DbPool {
130 &self.pool
131 }
132
133 /// Run all migrations on the given pool.
134 ///
135 /// # Errors
136 ///
137 /// Returns an error if any migration fails.
138 pub async fn run_migrations(pool: &DbPool) -> Result<(), MemoryError> {
139 zeph_db::run_migrations(pool).await?;
140 Ok(())
141 }
142}
143
144#[cfg(test)]
145mod tests {
146 use super::*;
147 use tempfile::NamedTempFile;
148
149 // Matches `DbConfig` busy_timeout default (5 seconds in ms)
150 const DEFAULT_BUSY_TIMEOUT_MS: i64 = 5_000;
151
152 #[tokio::test]
153 async fn wal_journal_mode_enabled_on_file_db() {
154 let file = NamedTempFile::new().expect("tempfile");
155 let path = file.path().to_str().expect("valid path");
156
157 let store = DbStore::new(path).await.expect("DbStore::new");
158
159 let mode: String = zeph_db::query_scalar(sql!("PRAGMA journal_mode"))
160 .fetch_one(store.pool())
161 .await
162 .expect("PRAGMA query");
163
164 assert_eq!(mode, "wal", "expected WAL journal mode, got: {mode}");
165 }
166
167 #[tokio::test]
168 async fn busy_timeout_enabled_on_file_db() {
169 let file = NamedTempFile::new().expect("tempfile");
170 let path = file.path().to_str().expect("valid path");
171
172 let store = DbStore::new(path).await.expect("DbStore::new");
173
174 let timeout_ms: i64 = zeph_db::query_scalar(sql!("PRAGMA busy_timeout"))
175 .fetch_one(store.pool())
176 .await
177 .expect("PRAGMA query");
178
179 assert_eq!(
180 timeout_ms, DEFAULT_BUSY_TIMEOUT_MS,
181 "expected busy_timeout pragma to match configured timeout"
182 );
183 }
184
185 #[tokio::test]
186 async fn creates_parent_dirs() {
187 let dir = tempfile::tempdir().expect("tempdir");
188 let deep = dir.path().join("a/b/c/zeph.db");
189 let path = deep.to_str().expect("valid path");
190 let _store = DbStore::new(path).await.expect("DbStore::new");
191 assert!(deep.exists(), "database file should exist");
192 }
193
194 #[tokio::test]
195 async fn with_pool_size_accepts_custom_size() {
196 let store = DbStore::with_pool_size(":memory:", 2)
197 .await
198 .expect("with_pool_size");
199 // Verify the store is operational with the custom pool size.
200 let _cid = store
201 .create_conversation()
202 .await
203 .expect("create_conversation");
204 }
205}