do_memory_storage_redb/lib.rs
1#![allow(clippy::excessive_nesting)]
2
3//! # Memory Storage - redb
4//!
5//! redb embedded database for fast cache layer.
6//!
7//! This crate provides:
8//! - High-performance key-value storage using redb
9//! - Zero-copy reads for fast retrieval
10//! - Async wrappers for synchronous redb operations
11//! - Episode and pattern caching
12//! - Bincode serialization for efficient storage
13//!
14//! ## Example
15//!
16//! ```no_run
17//! use do_memory_storage_redb::RedbStorage;
18//! use std::path::Path;
19//!
20//! # async fn example() -> anyhow::Result<()> {
21//! let storage = RedbStorage::new(Path::new("./memory.redb")).await?;
22//! # Ok(())
23//! # }
24//! ```
25
26use do_memory_core::{Error, Result};
27use redb::{Database, TableDefinition};
28use std::path::Path;
29use std::sync::Arc;
30use std::time::Duration;
31use tracing::info;
32
33use crate::cache::Cache as CacheTrait;
34
35mod backend_impl;
36mod cache;
37mod embeddings;
38mod embeddings_backend;
39mod embeddings_impl;
40mod episodes;
41mod episodes_queries;
42mod episodes_summaries;
43mod heuristics;
44mod patterns;
45mod persistence;
46mod recommendations;
47mod relationships;
48mod statistics;
49mod storage;
50mod storage_ops;
51mod tables;
52
53// Re-export cache types for external use
54pub use crate::cache::{
55 AdaptiveCache, AdaptiveCacheAdapter, AdaptiveCacheConfig, AdaptiveCacheMetrics, Cache,
56 CacheConfig, CacheMetrics, LRUCache,
57};
58
59pub use crate::statistics::StorageStatistics;
60pub use persistence::{
61 CachePersistence, CacheSnapshot, IncrementalUpdate, PersistedCacheEntry, PersistenceConfig,
62 PersistenceManager, PersistenceMode, PersistenceStats, PersistenceStrategy,
63};
64pub use storage::RedbQuery;
65
66// ============================================================================
67// Deserialization Limits (Security)
68// ============================================================================
69
70/// Maximum size for episode deserialization (10MB).
71///
72/// Prevents OOM attacks from maliciously large bincode payloads.
73pub const MAX_EPISODE_SIZE: u64 = 10_000_000;
74
75/// Maximum size for pattern deserialization (1MB).
76///
77/// Limits pattern data size to prevent resource exhaustion.
78pub const MAX_PATTERN_SIZE: u64 = 1_000_000;
79
80/// Maximum size for heuristic deserialization (100KB).
81///
82/// Restricts heuristic data size for security.
83pub const MAX_HEURISTIC_SIZE: u64 = 100_000;
84
85/// Maximum size for embedding deserialization (1MB).
86///
87/// Limits embedding vector size to prevent resource exhaustion.
88/// Typical embedding dimensions (384-1536) * 4 bytes/f32 = ~1.5KB-6KB.
89pub const MAX_EMBEDDING_SIZE: u64 = 1_000_000;
90
91// Table definitions
92pub(crate) const EPISODES_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("episodes");
93pub(crate) const PATTERNS_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("patterns");
94pub(crate) const HEURISTICS_TABLE: TableDefinition<&str, &[u8]> =
95 TableDefinition::new("heuristics");
96pub(crate) const EMBEDDINGS_TABLE: TableDefinition<&str, &[u8]> =
97 TableDefinition::new("embeddings");
98pub(crate) const METADATA_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("metadata");
99pub(crate) const SUMMARIES_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("summaries");
100pub(crate) const RELATIONSHIPS_TABLE: TableDefinition<&str, &[u8]> =
101 TableDefinition::new("relationships");
102pub(crate) const RECOMMENDATION_SESSIONS_TABLE: TableDefinition<&str, &[u8]> =
103 TableDefinition::new("recommendation_sessions");
104pub(crate) const RECOMMENDATION_FEEDBACK_TABLE: TableDefinition<&str, &[u8]> =
105 TableDefinition::new("recommendation_feedback");
106pub(crate) const RECOMMENDATION_EPISODE_INDEX_TABLE: TableDefinition<&str, &str> =
107 TableDefinition::new("recommendation_episode_index");
108
109// ============================================================================
110// Schema Versioning (Automatic Cache Invalidation)
111// ============================================================================
112
113/// Schema version for the redb cache.
114///
115/// This version is stored in the database and checked on startup.
116/// When the schema changes (e.g., Episode struct modified), increment this version
117/// to automatically invalidate stale cached data.
118///
119/// ## When to increment:
120/// - Adding/removing fields from Episode, Pattern, Heuristic, or other cached types
121/// - Changing the serialization format (postcard schema)
122/// - Any backward-incompatible change to cached data structures
123///
124/// ## Version history:
125/// - v1: Initial version (pre-versioning)
126/// - v2: Added checkpoints field to Episode (ADR-044 Feature 3)
127pub(crate) const SCHEMA_VERSION: u64 = 2;
128
129pub(crate) const SCHEMA_VERSION_TABLE: TableDefinition<&str, u64> =
130 TableDefinition::new("schema_version");
131
132// ============================================================================
133// Timeout Helper Functions
134// ============================================================================
135
136/// Timeout duration for database operations (10 seconds)
137const DB_OPERATION_TIMEOUT: Duration = Duration::from_secs(10);
138
139/// Execute a spawn_blocking operation with timeout
140pub(crate) async fn with_db_timeout<T, F>(operation: F) -> crate::Result<T>
141where
142 F: FnOnce() -> crate::Result<T> + Send + 'static,
143 T: Send + 'static,
144{
145 // spawn_blocking returns Result<T, JoinError>
146 // timeout wraps that, so we get Result<Result<T, JoinError>, Elapsed>
147 match tokio::time::timeout(DB_OPERATION_TIMEOUT, tokio::task::spawn_blocking(operation)).await {
148 Ok(Ok(result)) => result, // Inner Ok is JoinError, outer Ok is timeout success
149 Ok(Err(join_err)) => Err(Error::Storage(format!("Task join error: {}", join_err))),
150 Err(_) => Err(Error::Storage(format!(
151 "Database operation timed out after {:?}",
152 DB_OPERATION_TIMEOUT
153 ))),
154 }
155}
156
157/// redb storage backend for fast caching
158pub struct RedbStorage {
159 pub(crate) db: Arc<Database>,
160 pub(crate) cache: Box<dyn CacheTrait>,
161}
162
163impl RedbStorage {
164 /// Create a new redb storage instance with default adaptive cache
165 ///
166 /// Uses `AdaptiveCacheAdapter` by default for intelligent TTL adjustment.
167 ///
168 /// # Arguments
169 ///
170 /// * `path` - Path to the redb database file
171 ///
172 /// # Example
173 ///
174 /// ```no_run
175 /// # use do_memory_storage_redb::RedbStorage;
176 /// # use std::path::Path;
177 /// # async fn example() -> anyhow::Result<()> {
178 /// let storage = RedbStorage::new(Path::new("./memory.redb")).await?;
179 /// # Ok(())
180 /// # }
181 /// ```
182 pub async fn new(path: &Path) -> Result<Self> {
183 Self::new_with_adaptive_config(path, AdaptiveCacheConfig::default()).await
184 }
185
186 /// Create a new redb storage instance with custom cache configuration
187 ///
188 /// Uses the legacy `LRUCache` implementation. For adaptive TTL features,
189 /// use `new_with_adaptive_config` instead.
190 ///
191 /// # Arguments
192 ///
193 /// * `path` - Path to the redb database file
194 /// * `cache_config` - Cache configuration settings
195 ///
196 /// # Example
197 ///
198 /// ```no_run
199 /// # use do_memory_storage_redb::{RedbStorage, CacheConfig};
200 /// # use std::path::Path;
201 /// # async fn example() -> anyhow::Result<()> {
202 /// let config = CacheConfig {
203 /// max_size: 500,
204 /// default_ttl_secs: 1800,
205 /// cleanup_interval_secs: 600,
206 /// enable_background_cleanup: true,
207 /// };
208 /// let storage = RedbStorage::new_with_cache_config(Path::new("./memory.redb"), config).await?;
209 /// # Ok(())
210 /// # }
211 /// ```
212 pub async fn new_with_cache_config(path: &Path, cache_config: CacheConfig) -> Result<Self> {
213 info!("Opening redb database at {}", path.display());
214
215 // Use spawn_blocking for synchronous redb initialization with timeout
216 let path_buf = path.to_path_buf();
217 let db = with_db_timeout(move || {
218 Database::create(&path_buf)
219 .map_err(|e| Error::Storage(format!("Failed to create redb database: {}", e)))
220 })
221 .await?;
222
223 let cache: Box<dyn CacheTrait> = Box::new(LRUCache::new(cache_config));
224 let storage = Self {
225 db: Arc::new(db),
226 cache,
227 };
228
229 // Initialize tables
230 storage.initialize_tables().await?;
231
232 info!("Successfully opened redb database with LRU cache");
233 Ok(storage)
234 }
235
236 /// Create a new redb storage instance with adaptive cache configuration
237 ///
238 /// Uses `AdaptiveCacheAdapter` for intelligent TTL adjustment based on
239 /// access patterns. Frequently accessed items get longer TTL, rarely
240 /// accessed items get shorter TTL.
241 ///
242 /// # Arguments
243 ///
244 /// * `path` - Path to the redb database file
245 /// * `config` - Adaptive cache configuration settings
246 ///
247 /// # Example
248 ///
249 /// ```no_run
250 /// # use do_memory_storage_redb::{RedbStorage, AdaptiveCacheConfig};
251 /// # use std::path::Path;
252 /// # use std::time::Duration;
253 /// # async fn example() -> anyhow::Result<()> {
254 /// let config = AdaptiveCacheConfig {
255 /// max_size: 1000,
256 /// default_ttl: Duration::from_secs(1800),
257 /// min_ttl: Duration::from_secs(300),
258 /// max_ttl: Duration::from_secs(7200),
259 /// ..Default::default()
260 /// };
261 /// let storage = RedbStorage::new_with_adaptive_config(Path::new("./memory.redb"), config).await?;
262 /// # Ok(())
263 /// # }
264 /// ```
265 pub async fn new_with_adaptive_config(
266 path: &Path,
267 config: AdaptiveCacheConfig,
268 ) -> Result<Self> {
269 info!("Opening redb database at {}", path.display());
270
271 // Use spawn_blocking for synchronous redb initialization with timeout
272 let path_buf = path.to_path_buf();
273 let db = with_db_timeout(move || {
274 Database::create(&path_buf)
275 .map_err(|e| Error::Storage(format!("Failed to create redb database: {}", e)))
276 })
277 .await?;
278
279 let cache: Box<dyn CacheTrait> = Box::new(AdaptiveCacheAdapter::new(config));
280 let storage = Self {
281 db: Arc::new(db),
282 cache,
283 };
284
285 // Initialize tables
286 storage.initialize_tables().await?;
287
288 info!("Successfully opened redb database with adaptive cache");
289 Ok(storage)
290 }
291}