Skip to main content

do_memory_storage_redb/
lib.rs

1#![allow(clippy::excessive_nesting)]
2
3//! # Memory Storage - redb
4//!
5//! redb embedded database for fast cache layer.
6//!
7//! This crate provides:
8//! - High-performance key-value storage using redb
9//! - Zero-copy reads for fast retrieval
10//! - Async wrappers for synchronous redb operations
11//! - Episode and pattern caching
12//! - Bincode serialization for efficient storage
13//!
14//! ## Example
15//!
16//! ```no_run
17//! use do_memory_storage_redb::RedbStorage;
18//! use std::path::Path;
19//!
20//! # async fn example() -> anyhow::Result<()> {
21//! let storage = RedbStorage::new(Path::new("./memory.redb")).await?;
22//! # Ok(())
23//! # }
24//! ```
25
26use do_memory_core::{Error, Result};
27use redb::{Database, TableDefinition};
28use std::path::Path;
29use std::sync::Arc;
30use std::time::Duration;
31use tracing::info;
32
33use crate::cache::Cache as CacheTrait;
34
35mod backend_impl;
36mod cache;
37mod embeddings;
38mod embeddings_backend;
39mod embeddings_impl;
40mod episodes;
41mod episodes_queries;
42mod episodes_summaries;
43mod heuristics;
44mod patterns;
45mod persistence;
46mod recommendations;
47mod relationships;
48mod statistics;
49mod storage;
50mod storage_ops;
51mod tables;
52
53// Re-export cache types for external use
54pub use crate::cache::{
55    AdaptiveCache, AdaptiveCacheAdapter, AdaptiveCacheConfig, AdaptiveCacheMetrics, Cache,
56    CacheConfig, CacheMetrics, LRUCache,
57};
58
59pub use crate::statistics::StorageStatistics;
60pub use persistence::{
61    CachePersistence, CacheSnapshot, IncrementalUpdate, PersistedCacheEntry, PersistenceConfig,
62    PersistenceManager, PersistenceMode, PersistenceStats, PersistenceStrategy,
63};
64pub use storage::RedbQuery;
65
66// ============================================================================
67// Deserialization Limits (Security)
68// ============================================================================
69
70/// Maximum size for episode deserialization (10MB).
71///
72/// Prevents OOM attacks from maliciously large bincode payloads.
73pub const MAX_EPISODE_SIZE: u64 = 10_000_000;
74
75/// Maximum size for pattern deserialization (1MB).
76///
77/// Limits pattern data size to prevent resource exhaustion.
78pub const MAX_PATTERN_SIZE: u64 = 1_000_000;
79
80/// Maximum size for heuristic deserialization (100KB).
81///
82/// Restricts heuristic data size for security.
83pub const MAX_HEURISTIC_SIZE: u64 = 100_000;
84
85/// Maximum size for embedding deserialization (1MB).
86///
87/// Limits embedding vector size to prevent resource exhaustion.
88/// Typical embedding dimensions (384-1536) * 4 bytes/f32 = ~1.5KB-6KB.
89pub const MAX_EMBEDDING_SIZE: u64 = 1_000_000;
90
91// Table definitions
92pub(crate) const EPISODES_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("episodes");
93pub(crate) const PATTERNS_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("patterns");
94pub(crate) const HEURISTICS_TABLE: TableDefinition<&str, &[u8]> =
95    TableDefinition::new("heuristics");
96pub(crate) const EMBEDDINGS_TABLE: TableDefinition<&str, &[u8]> =
97    TableDefinition::new("embeddings");
98pub(crate) const METADATA_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("metadata");
99pub(crate) const SUMMARIES_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("summaries");
100pub(crate) const RELATIONSHIPS_TABLE: TableDefinition<&str, &[u8]> =
101    TableDefinition::new("relationships");
102pub(crate) const RECOMMENDATION_SESSIONS_TABLE: TableDefinition<&str, &[u8]> =
103    TableDefinition::new("recommendation_sessions");
104pub(crate) const RECOMMENDATION_FEEDBACK_TABLE: TableDefinition<&str, &[u8]> =
105    TableDefinition::new("recommendation_feedback");
106pub(crate) const RECOMMENDATION_EPISODE_INDEX_TABLE: TableDefinition<&str, &str> =
107    TableDefinition::new("recommendation_episode_index");
108
109// ============================================================================
110// Schema Versioning (Automatic Cache Invalidation)
111// ============================================================================
112
113/// Schema version for the redb cache.
114///
115/// This version is stored in the database and checked on startup.
116/// When the schema changes (e.g., Episode struct modified), increment this version
117/// to automatically invalidate stale cached data.
118///
119/// ## When to increment:
120/// - Adding/removing fields from Episode, Pattern, Heuristic, or other cached types
121/// - Changing the serialization format (postcard schema)
122/// - Any backward-incompatible change to cached data structures
123///
124/// ## Version history:
125/// - v1: Initial version (pre-versioning)
126/// - v2: Added checkpoints field to Episode (ADR-044 Feature 3)
127pub(crate) const SCHEMA_VERSION: u64 = 2;
128
129pub(crate) const SCHEMA_VERSION_TABLE: TableDefinition<&str, u64> =
130    TableDefinition::new("schema_version");
131
132// ============================================================================
133// Timeout Helper Functions
134// ============================================================================
135
136/// Timeout duration for database operations (10 seconds)
137const DB_OPERATION_TIMEOUT: Duration = Duration::from_secs(10);
138
139/// Execute a spawn_blocking operation with timeout
140pub(crate) async fn with_db_timeout<T, F>(operation: F) -> crate::Result<T>
141where
142    F: FnOnce() -> crate::Result<T> + Send + 'static,
143    T: Send + 'static,
144{
145    // spawn_blocking returns Result<T, JoinError>
146    // timeout wraps that, so we get Result<Result<T, JoinError>, Elapsed>
147    match tokio::time::timeout(DB_OPERATION_TIMEOUT, tokio::task::spawn_blocking(operation)).await {
148        Ok(Ok(result)) => result, // Inner Ok is JoinError, outer Ok is timeout success
149        Ok(Err(join_err)) => Err(Error::Storage(format!("Task join error: {}", join_err))),
150        Err(_) => Err(Error::Storage(format!(
151            "Database operation timed out after {:?}",
152            DB_OPERATION_TIMEOUT
153        ))),
154    }
155}
156
157/// redb storage backend for fast caching
158pub struct RedbStorage {
159    pub(crate) db: Arc<Database>,
160    pub(crate) cache: Box<dyn CacheTrait>,
161}
162
163impl RedbStorage {
164    /// Create a new redb storage instance with default adaptive cache
165    ///
166    /// Uses `AdaptiveCacheAdapter` by default for intelligent TTL adjustment.
167    ///
168    /// # Arguments
169    ///
170    /// * `path` - Path to the redb database file
171    ///
172    /// # Example
173    ///
174    /// ```no_run
175    /// # use do_memory_storage_redb::RedbStorage;
176    /// # use std::path::Path;
177    /// # async fn example() -> anyhow::Result<()> {
178    /// let storage = RedbStorage::new(Path::new("./memory.redb")).await?;
179    /// # Ok(())
180    /// # }
181    /// ```
182    pub async fn new(path: &Path) -> Result<Self> {
183        Self::new_with_adaptive_config(path, AdaptiveCacheConfig::default()).await
184    }
185
186    /// Create a new redb storage instance with custom cache configuration
187    ///
188    /// Uses the legacy `LRUCache` implementation. For adaptive TTL features,
189    /// use `new_with_adaptive_config` instead.
190    ///
191    /// # Arguments
192    ///
193    /// * `path` - Path to the redb database file
194    /// * `cache_config` - Cache configuration settings
195    ///
196    /// # Example
197    ///
198    /// ```no_run
199    /// # use do_memory_storage_redb::{RedbStorage, CacheConfig};
200    /// # use std::path::Path;
201    /// # async fn example() -> anyhow::Result<()> {
202    /// let config = CacheConfig {
203    ///     max_size: 500,
204    ///     default_ttl_secs: 1800,
205    ///     cleanup_interval_secs: 600,
206    ///     enable_background_cleanup: true,
207    /// };
208    /// let storage = RedbStorage::new_with_cache_config(Path::new("./memory.redb"), config).await?;
209    /// # Ok(())
210    /// # }
211    /// ```
212    pub async fn new_with_cache_config(path: &Path, cache_config: CacheConfig) -> Result<Self> {
213        info!("Opening redb database at {}", path.display());
214
215        // Use spawn_blocking for synchronous redb initialization with timeout
216        let path_buf = path.to_path_buf();
217        let db = with_db_timeout(move || {
218            Database::create(&path_buf)
219                .map_err(|e| Error::Storage(format!("Failed to create redb database: {}", e)))
220        })
221        .await?;
222
223        let cache: Box<dyn CacheTrait> = Box::new(LRUCache::new(cache_config));
224        let storage = Self {
225            db: Arc::new(db),
226            cache,
227        };
228
229        // Initialize tables
230        storage.initialize_tables().await?;
231
232        info!("Successfully opened redb database with LRU cache");
233        Ok(storage)
234    }
235
236    /// Create a new redb storage instance with adaptive cache configuration
237    ///
238    /// Uses `AdaptiveCacheAdapter` for intelligent TTL adjustment based on
239    /// access patterns. Frequently accessed items get longer TTL, rarely
240    /// accessed items get shorter TTL.
241    ///
242    /// # Arguments
243    ///
244    /// * `path` - Path to the redb database file
245    /// * `config` - Adaptive cache configuration settings
246    ///
247    /// # Example
248    ///
249    /// ```no_run
250    /// # use do_memory_storage_redb::{RedbStorage, AdaptiveCacheConfig};
251    /// # use std::path::Path;
252    /// # use std::time::Duration;
253    /// # async fn example() -> anyhow::Result<()> {
254    /// let config = AdaptiveCacheConfig {
255    ///     max_size: 1000,
256    ///     default_ttl: Duration::from_secs(1800),
257    ///     min_ttl: Duration::from_secs(300),
258    ///     max_ttl: Duration::from_secs(7200),
259    ///     ..Default::default()
260    /// };
261    /// let storage = RedbStorage::new_with_adaptive_config(Path::new("./memory.redb"), config).await?;
262    /// # Ok(())
263    /// # }
264    /// ```
265    pub async fn new_with_adaptive_config(
266        path: &Path,
267        config: AdaptiveCacheConfig,
268    ) -> Result<Self> {
269        info!("Opening redb database at {}", path.display());
270
271        // Use spawn_blocking for synchronous redb initialization with timeout
272        let path_buf = path.to_path_buf();
273        let db = with_db_timeout(move || {
274            Database::create(&path_buf)
275                .map_err(|e| Error::Storage(format!("Failed to create redb database: {}", e)))
276        })
277        .await?;
278
279        let cache: Box<dyn CacheTrait> = Box::new(AdaptiveCacheAdapter::new(config));
280        let storage = Self {
281            db: Arc::new(db),
282            cache,
283        };
284
285        // Initialize tables
286        storage.initialize_tables().await?;
287
288        info!("Successfully opened redb database with adaptive cache");
289        Ok(storage)
290    }
291}