pulsedb/storage/mod.rs
1//! Storage layer abstractions for PulseDB.
2//!
3//! This module provides a trait-based abstraction over the storage engine,
4//! allowing different backends to be used (e.g., redb, mock for testing).
5//!
6//! # Architecture
7//!
8//! ```text
9//! ┌─────────────────────────────────────────────────────────────┐
10//! │ PulseDB │
11//! │ │ │
12//! │ ▼ │
13//! │ ┌─────────────────────┐ │
14//! │ │ StorageEngine │ ← Trait │
15//! │ └─────────────────────┘ │
16//! │ ▲ ▲ │
17//! │ │ │ │
18//! │ ┌─────────┴─┐ ┌───┴─────────┐ │
19//! │ │RedbStorage│ │ MockStorage │ │
20//! │ └───────────┘ └─────────────┘ │
21//! │ (prod) (test) │
22//! └─────────────────────────────────────────────────────────────┘
23//! ```
24
25pub mod redb;
26pub mod schema;
27
28pub use self::redb::RedbStorage;
29pub use schema::{DatabaseMetadata, SCHEMA_VERSION};
30
31use std::path::Path;
32
33use crate::activity::Activity;
34use crate::collective::Collective;
35use crate::config::Config;
36use crate::error::Result;
37use crate::experience::{Experience, ExperienceUpdate};
38use crate::insight::DerivedInsight;
39use crate::relation::{ExperienceRelation, RelationType};
40use crate::types::{CollectiveId, ExperienceId, InsightId, RelationId, Timestamp};
41
42/// Storage engine trait for PulseDB.
43///
44/// This trait defines the contract that any storage backend must implement.
45/// The primary implementation is [`RedbStorage`], but other implementations
46/// can be created for testing or alternative backends.
47///
48/// # Thread Safety
49///
50/// Implementations must be `Send + Sync` to allow the database to be shared
51/// across threads. The engine handles internal synchronization.
52///
53/// # Example
54///
55/// ```rust
56/// # fn main() -> pulsedb::Result<()> {
57/// # let dir = tempfile::tempdir().unwrap();
58/// use pulsedb::{Config, storage::{StorageEngine, RedbStorage}};
59///
60/// let config = Config::default();
61/// let storage = RedbStorage::open(dir.path().join("test.db"), &config)?;
62/// let metadata = storage.metadata();
63/// println!("Schema version: {}", metadata.schema_version);
64/// # Ok(())
65/// # }
66/// ```
67pub trait StorageEngine: Send + Sync {
68 // =========================================================================
69 // Lifecycle
70 // =========================================================================
71
72 /// Returns the database metadata.
73 ///
74 /// The metadata includes schema version, embedding dimension, and timestamps.
75 fn metadata(&self) -> &DatabaseMetadata;
76
77 /// Closes the storage engine, flushing any pending writes.
78 ///
79 /// This method consumes the storage engine. After calling `close()`,
80 /// the engine cannot be used.
81 ///
82 /// # Errors
83 ///
84 /// Returns an error if the backend supports reporting flush failures.
85 /// Note: the current redb backend flushes on drop (infallible), so
86 /// this always returns `Ok(())` for [`RedbStorage`].
87 fn close(self: Box<Self>) -> Result<()>;
88
89 /// Returns the path to the database file, if applicable.
90 ///
91 /// Some storage implementations (like in-memory) may not have a path.
92 fn path(&self) -> Option<&Path>;
93
94 // =========================================================================
95 // Collective Storage Operations
96 // =========================================================================
97
98 /// Saves a collective to storage.
99 ///
100 /// If a collective with the same ID already exists, it is overwritten.
101 /// Each call opens and commits its own write transaction.
102 ///
103 /// # Errors
104 ///
105 /// Returns an error if the transaction or serialization fails.
106 fn save_collective(&self, collective: &Collective) -> Result<()>;
107
108 /// Retrieves a collective by ID.
109 ///
110 /// Returns `None` if no collective with the given ID exists.
111 ///
112 /// # Errors
113 ///
114 /// Returns an error if the read transaction or deserialization fails.
115 fn get_collective(&self, id: CollectiveId) -> Result<Option<Collective>>;
116
117 /// Lists all collectives in the database.
118 ///
119 /// Returns an empty vector if no collectives exist.
120 ///
121 /// # Errors
122 ///
123 /// Returns an error if the read transaction or deserialization fails.
124 fn list_collectives(&self) -> Result<Vec<Collective>>;
125
126 /// Deletes a collective by ID.
127 ///
128 /// Returns `true` if the collective existed and was deleted,
129 /// `false` if no collective with the given ID was found.
130 ///
131 /// # Errors
132 ///
133 /// Returns an error if the write transaction fails.
134 fn delete_collective(&self, id: CollectiveId) -> Result<bool>;
135
136 // =========================================================================
137 // Experience Index Operations (for collective stats & cascade delete)
138 // =========================================================================
139
140 /// Counts experiences belonging to a collective.
141 ///
142 /// Queries the `experiences_by_collective` multimap index.
143 /// Returns 0 if no experiences exist for the collective.
144 ///
145 /// # Errors
146 ///
147 /// Returns an error if the read transaction fails.
148 fn count_experiences_in_collective(&self, id: CollectiveId) -> Result<u64>;
149
150 /// Deletes all experiences and related index entries for a collective.
151 ///
152 /// Used for cascade deletion when a collective is removed. Cleans up:
153 /// - Experience records
154 /// - Embedding vectors
155 /// - By-collective index entries
156 /// - By-type index entries
157 ///
158 /// Returns the number of experiences deleted.
159 ///
160 /// # Errors
161 ///
162 /// Returns an error if the write transaction fails.
163 fn delete_experiences_by_collective(&self, id: CollectiveId) -> Result<u64>;
164
165 /// Lists all experience IDs belonging to a collective.
166 ///
167 /// Used to rebuild HNSW indexes from redb embeddings on startup.
168 /// Iterates the `experiences_by_collective` multimap index.
169 fn list_experience_ids_in_collective(&self, id: CollectiveId) -> Result<Vec<ExperienceId>>;
170
171 /// Retrieves the most recent experience IDs in a collective.
172 ///
173 /// Performs a reverse iteration on `EXPERIENCES_BY_COLLECTIVE_TABLE`
174 /// to get IDs ordered by timestamp descending (newest first).
175 /// The multimap values are `[timestamp_be: 8 bytes][experience_id: 16 bytes]`,
176 /// and since timestamps are big-endian, reverse lexicographic order = newest first.
177 ///
178 /// Returns `(ExperienceId, Timestamp)` pairs for the caller to fetch full
179 /// records and apply post-filters.
180 ///
181 /// # Arguments
182 ///
183 /// * `collective_id` - The collective to query
184 /// * `limit` - Maximum number of entries to return
185 fn get_recent_experience_ids(
186 &self,
187 collective_id: CollectiveId,
188 limit: usize,
189 ) -> Result<Vec<(ExperienceId, Timestamp)>>;
190
191 // =========================================================================
192 // Experience Storage Operations
193 // =========================================================================
194
195 /// Saves an experience and its embedding to storage.
196 ///
197 /// Writes atomically to 4 tables in a single transaction:
198 /// - `EXPERIENCES_TABLE` — the experience record (without embedding)
199 /// - `EMBEDDINGS_TABLE` — the embedding vector as raw f32 bytes
200 /// - `EXPERIENCES_BY_COLLECTIVE_TABLE` — secondary index by collective+timestamp
201 /// - `EXPERIENCES_BY_TYPE_TABLE` — secondary index by collective+type
202 ///
203 /// # Errors
204 ///
205 /// Returns an error if the transaction or serialization fails.
206 fn save_experience(&self, experience: &Experience) -> Result<()>;
207
208 /// Retrieves an experience by ID, including its embedding.
209 ///
210 /// Reads from both `EXPERIENCES_TABLE` and `EMBEDDINGS_TABLE` to
211 /// reconstitute the full experience with embedding.
212 ///
213 /// Returns `None` if no experience with the given ID exists.
214 fn get_experience(&self, id: ExperienceId) -> Result<Option<Experience>>;
215
216 /// Updates mutable fields of an experience.
217 ///
218 /// Applies only the `Some` fields from the update. Immutable fields
219 /// (content, embedding, collective_id, type) are not affected.
220 ///
221 /// Returns `true` if the experience existed and was updated,
222 /// `false` if not found.
223 fn update_experience(&self, id: ExperienceId, update: &ExperienceUpdate) -> Result<bool>;
224
225 /// Permanently deletes an experience and its embedding.
226 ///
227 /// Removes from all 4 tables in a single transaction.
228 ///
229 /// Returns `true` if the experience existed and was deleted,
230 /// `false` if not found.
231 fn delete_experience(&self, id: ExperienceId) -> Result<bool>;
232
233 /// Atomically increments the applications counter for an experience.
234 ///
235 /// Performs a read-modify-write in a single write transaction to prevent
236 /// lost updates under concurrent access. Uses saturating arithmetic
237 /// (caps at `u32::MAX`, never panics).
238 ///
239 /// Returns `Some(new_count)` if the experience was found and updated,
240 /// `None` if no experience with the given ID exists.
241 fn reinforce_experience(&self, id: ExperienceId) -> Result<Option<u32>>;
242
243 /// Saves an embedding vector to storage.
244 ///
245 /// The embedding is stored as raw little-endian f32 bytes.
246 fn save_embedding(&self, id: ExperienceId, embedding: &[f32]) -> Result<()>;
247
248 /// Retrieves an embedding vector by experience ID.
249 ///
250 /// Returns `None` if no embedding exists for the given ID.
251 fn get_embedding(&self, id: ExperienceId) -> Result<Option<Vec<f32>>>;
252
253 // =========================================================================
254 // Relation Storage Operations (E3-S01)
255 // =========================================================================
256
257 /// Saves a relation and its index entries atomically.
258 ///
259 /// Writes to 3 tables in a single transaction:
260 /// - `RELATIONS_TABLE` — the relation record
261 /// - `RELATIONS_BY_SOURCE_TABLE` — index by source experience
262 /// - `RELATIONS_BY_TARGET_TABLE` — index by target experience
263 fn save_relation(&self, relation: &ExperienceRelation) -> Result<()>;
264
265 /// Retrieves a relation by ID.
266 ///
267 /// Returns `None` if no relation with the given ID exists.
268 fn get_relation(&self, id: RelationId) -> Result<Option<ExperienceRelation>>;
269
270 /// Deletes a relation and its index entries atomically.
271 ///
272 /// Returns `true` if the relation existed and was deleted,
273 /// `false` if not found.
274 fn delete_relation(&self, id: RelationId) -> Result<bool>;
275
276 /// Finds all relation IDs where the given experience is the source.
277 ///
278 /// Iterates the `RELATIONS_BY_SOURCE_TABLE` multimap for the experience.
279 fn get_relation_ids_by_source(&self, experience_id: ExperienceId) -> Result<Vec<RelationId>>;
280
281 /// Finds all relation IDs where the given experience is the target.
282 ///
283 /// Iterates the `RELATIONS_BY_TARGET_TABLE` multimap for the experience.
284 fn get_relation_ids_by_target(&self, experience_id: ExperienceId) -> Result<Vec<RelationId>>;
285
286 /// Deletes all relations where the given experience is source or target.
287 ///
288 /// Used for cascade deletion when an experience is removed.
289 /// Returns the count of deleted relations.
290 fn delete_relations_for_experience(&self, experience_id: ExperienceId) -> Result<u64>;
291
292 /// Checks if a relation with the same (source, target, type) already exists.
293 ///
294 /// Scans the source index, loads each relation, and checks for a matching
295 /// target and type. Efficient for the expected cardinality (few relations
296 /// per experience).
297 fn relation_exists(
298 &self,
299 source_id: ExperienceId,
300 target_id: ExperienceId,
301 relation_type: RelationType,
302 ) -> Result<bool>;
303
304 // =========================================================================
305 // Insight Storage Operations (E3-S02)
306 // =========================================================================
307
308 /// Saves a derived insight and its index entries atomically.
309 ///
310 /// Writes to 2 tables in a single transaction:
311 /// - `INSIGHTS_TABLE` — the insight record (with inline embedding)
312 /// - `INSIGHTS_BY_COLLECTIVE_TABLE` — index by collective
313 fn save_insight(&self, insight: &DerivedInsight) -> Result<()>;
314
315 /// Retrieves a derived insight by ID.
316 ///
317 /// Returns `None` if no insight with the given ID exists.
318 fn get_insight(&self, id: InsightId) -> Result<Option<DerivedInsight>>;
319
320 /// Deletes a derived insight and its index entries atomically.
321 ///
322 /// Returns `true` if the insight existed and was deleted,
323 /// `false` if not found.
324 fn delete_insight(&self, id: InsightId) -> Result<bool>;
325
326 /// Lists all insight IDs belonging to a collective.
327 ///
328 /// Used to rebuild HNSW indexes from stored insights on startup.
329 /// Iterates the `INSIGHTS_BY_COLLECTIVE_TABLE` multimap.
330 fn list_insight_ids_in_collective(&self, id: CollectiveId) -> Result<Vec<InsightId>>;
331
332 /// Deletes all insights belonging to a collective.
333 ///
334 /// Used for cascade deletion when a collective is removed.
335 /// Returns the count of deleted insights.
336 fn delete_insights_by_collective(&self, id: CollectiveId) -> Result<u64>;
337
338 // =========================================================================
339 // Activity Storage Operations (E3-S03)
340 // =========================================================================
341
342 /// Saves an agent activity to storage (upsert).
343 ///
344 /// If an activity for the same `(collective_id, agent_id)` already exists,
345 /// it is replaced. Uses the composite key encoding from `schema::encode_activity_key`.
346 fn save_activity(&self, activity: &Activity) -> Result<()>;
347
348 /// Retrieves an agent activity by agent ID and collective.
349 ///
350 /// Returns `None` if no activity exists for the given pair.
351 fn get_activity(&self, agent_id: &str, collective_id: CollectiveId)
352 -> Result<Option<Activity>>;
353
354 /// Deletes an agent activity.
355 ///
356 /// Returns `true` if the activity existed and was deleted,
357 /// `false` if no activity was found for the given pair.
358 fn delete_activity(&self, agent_id: &str, collective_id: CollectiveId) -> Result<bool>;
359
360 /// Lists all activities in a collective.
361 ///
362 /// Iterates the `ACTIVITIES_TABLE` and filters entries whose key
363 /// starts with the collective's 16-byte ID. Returns activities in
364 /// no guaranteed order.
365 fn list_activities_in_collective(&self, collective_id: CollectiveId) -> Result<Vec<Activity>>;
366
367 /// Deletes all activities belonging to a collective.
368 ///
369 /// Used for cascade deletion when a collective is removed.
370 /// Returns the count of deleted activities.
371 fn delete_activities_by_collective(&self, collective_id: CollectiveId) -> Result<u64>;
372
373 // =========================================================================
374 // Watch Event Operations (E4-S02) — Cross-Process Change Detection
375 // =========================================================================
376
377 /// Returns the current WAL sequence number.
378 ///
379 /// Every experience write (create, update, delete, reinforce) atomically
380 /// increments the sequence. Returns 0 if no writes have occurred yet.
381 ///
382 /// This is a read-only operation — the write-side logic is internal to the
383 /// storage implementation to maintain transactional atomicity.
384 fn get_wal_sequence(&self) -> Result<u64>;
385
386 /// Retrieves watch events with sequence numbers greater than `since_seq`.
387 ///
388 /// Returns events in ascending sequence order and the highest sequence
389 /// number seen. If no new events exist, returns an empty vec and `since_seq`.
390 ///
391 /// # Arguments
392 ///
393 /// * `since_seq` - Return events with sequence > this value (0 = all events)
394 /// * `limit` - Maximum number of events to return per call
395 fn poll_watch_events(
396 &self,
397 since_seq: u64,
398 limit: usize,
399 ) -> Result<(Vec<schema::WatchEventRecord>, u64)>;
400}
401
402/// Opens a storage engine at the given path.
403///
404/// This is a convenience function that creates a [`RedbStorage`] instance.
405/// For more control, use `RedbStorage::open()` directly.
406///
407/// # Arguments
408///
409/// * `path` - Path to the database file (created if it doesn't exist)
410/// * `config` - Database configuration
411///
412/// # Errors
413///
414/// Returns an error if:
415/// - The database file is corrupted
416/// - The database is locked by another process
417/// - Schema version doesn't match
418/// - Embedding dimension doesn't match (for existing databases)
419pub fn open_storage(path: impl AsRef<Path>, config: &Config) -> Result<Box<dyn StorageEngine>> {
420 let storage = RedbStorage::open(path, config)?;
421 Ok(Box::new(storage))
422}
423
424#[cfg(test)]
425mod tests {
426 use super::*;
427 use crate::config::EmbeddingDimension;
428 use tempfile::tempdir;
429
430 #[test]
431 fn test_open_storage() {
432 let dir = tempdir().unwrap();
433 let path = dir.path().join("test.db");
434
435 let config = Config::default();
436 let storage = open_storage(&path, &config).unwrap();
437
438 assert_eq!(
439 storage.metadata().embedding_dimension,
440 EmbeddingDimension::D384
441 );
442 assert!(storage.path().is_some());
443
444 storage.close().unwrap();
445 }
446
447 #[test]
448 fn test_storage_engine_is_send_sync() {
449 fn assert_send_sync<T: Send + Sync>() {}
450 assert_send_sync::<RedbStorage>();
451 }
452}