Skip to main content

pulsedb/storage/
mod.rs

1//! Storage layer abstractions for PulseDB.
2//!
3//! This module provides a trait-based abstraction over the storage engine,
4//! allowing different backends to be used (e.g., redb, mock for testing).
5//!
6//! # Architecture
7//!
8//! ```text
9//! ┌─────────────────────────────────────────────────────────────┐
10//! │                      PulseDB                                 │
11//! │                         │                                    │
12//! │                         ▼                                    │
13//! │              ┌─────────────────────┐                        │
14//! │              │   StorageEngine     │  ← Trait               │
15//! │              └─────────────────────┘                        │
16//! │                    ▲         ▲                              │
17//! │                    │         │                              │
18//! │         ┌─────────┴─┐   ┌───┴─────────┐                    │
19//! │         │RedbStorage│   │ MockStorage │                    │
20//! │         └───────────┘   └─────────────┘                    │
21//! │           (prod)           (test)                          │
22//! └─────────────────────────────────────────────────────────────┘
23//! ```
24
25pub mod redb;
26pub mod schema;
27
28pub use self::redb::RedbStorage;
29pub use schema::{DatabaseMetadata, SCHEMA_VERSION};
30
31use std::path::Path;
32
33use crate::activity::Activity;
34use crate::collective::Collective;
35use crate::config::Config;
36use crate::error::Result;
37use crate::experience::{Experience, ExperienceUpdate};
38use crate::insight::DerivedInsight;
39use crate::relation::{ExperienceRelation, RelationType};
40use crate::types::{CollectiveId, ExperienceId, InsightId, RelationId, Timestamp};
41
42/// Storage engine trait for PulseDB.
43///
44/// This trait defines the contract that any storage backend must implement.
45/// The primary implementation is [`RedbStorage`], but other implementations
46/// can be created for testing or alternative backends.
47///
48/// # Thread Safety
49///
50/// Implementations must be `Send + Sync` to allow the database to be shared
51/// across threads. The engine handles internal synchronization.
52///
53/// # Example
54///
55/// ```rust
56/// # fn main() -> pulsedb::Result<()> {
57/// # let dir = tempfile::tempdir().unwrap();
58/// use pulsedb::{Config, storage::{StorageEngine, RedbStorage}};
59///
60/// let config = Config::default();
61/// let storage = RedbStorage::open(dir.path().join("test.db"), &config)?;
62/// let metadata = storage.metadata();
63/// println!("Schema version: {}", metadata.schema_version);
64/// # Ok(())
65/// # }
66/// ```
67pub trait StorageEngine: Send + Sync {
68    // =========================================================================
69    // Lifecycle
70    // =========================================================================
71
72    /// Returns the database metadata.
73    ///
74    /// The metadata includes schema version, embedding dimension, and timestamps.
75    fn metadata(&self) -> &DatabaseMetadata;
76
77    /// Closes the storage engine, flushing any pending writes.
78    ///
79    /// This method consumes the storage engine. After calling `close()`,
80    /// the engine cannot be used.
81    ///
82    /// # Errors
83    ///
84    /// Returns an error if the backend supports reporting flush failures.
85    /// Note: the current redb backend flushes on drop (infallible), so
86    /// this always returns `Ok(())` for [`RedbStorage`].
87    fn close(self: Box<Self>) -> Result<()>;
88
89    /// Returns the path to the database file, if applicable.
90    ///
91    /// Some storage implementations (like in-memory) may not have a path.
92    fn path(&self) -> Option<&Path>;
93
94    // =========================================================================
95    // Collective Storage Operations
96    // =========================================================================
97
98    /// Saves a collective to storage.
99    ///
100    /// If a collective with the same ID already exists, it is overwritten.
101    /// Each call opens and commits its own write transaction.
102    ///
103    /// # Errors
104    ///
105    /// Returns an error if the transaction or serialization fails.
106    fn save_collective(&self, collective: &Collective) -> Result<()>;
107
108    /// Retrieves a collective by ID.
109    ///
110    /// Returns `None` if no collective with the given ID exists.
111    ///
112    /// # Errors
113    ///
114    /// Returns an error if the read transaction or deserialization fails.
115    fn get_collective(&self, id: CollectiveId) -> Result<Option<Collective>>;
116
117    /// Lists all collectives in the database.
118    ///
119    /// Returns an empty vector if no collectives exist.
120    ///
121    /// # Errors
122    ///
123    /// Returns an error if the read transaction or deserialization fails.
124    fn list_collectives(&self) -> Result<Vec<Collective>>;
125
126    /// Deletes a collective by ID.
127    ///
128    /// Returns `true` if the collective existed and was deleted,
129    /// `false` if no collective with the given ID was found.
130    ///
131    /// # Errors
132    ///
133    /// Returns an error if the write transaction fails.
134    fn delete_collective(&self, id: CollectiveId) -> Result<bool>;
135
136    // =========================================================================
137    // Experience Index Operations (for collective stats & cascade delete)
138    // =========================================================================
139
140    /// Counts experiences belonging to a collective.
141    ///
142    /// Queries the `experiences_by_collective` multimap index.
143    /// Returns 0 if no experiences exist for the collective.
144    ///
145    /// # Errors
146    ///
147    /// Returns an error if the read transaction fails.
148    fn count_experiences_in_collective(&self, id: CollectiveId) -> Result<u64>;
149
150    /// Deletes all experiences and related index entries for a collective.
151    ///
152    /// Used for cascade deletion when a collective is removed. Cleans up:
153    /// - Experience records
154    /// - Embedding vectors
155    /// - By-collective index entries
156    /// - By-type index entries
157    ///
158    /// Returns the number of experiences deleted.
159    ///
160    /// # Errors
161    ///
162    /// Returns an error if the write transaction fails.
163    fn delete_experiences_by_collective(&self, id: CollectiveId) -> Result<u64>;
164
165    /// Lists all experience IDs belonging to a collective.
166    ///
167    /// Used to rebuild HNSW indexes from redb embeddings on startup.
168    /// Iterates the `experiences_by_collective` multimap index.
169    fn list_experience_ids_in_collective(&self, id: CollectiveId) -> Result<Vec<ExperienceId>>;
170
171    /// Retrieves the most recent experience IDs in a collective.
172    ///
173    /// Performs a reverse iteration on `EXPERIENCES_BY_COLLECTIVE_TABLE`
174    /// to get IDs ordered by timestamp descending (newest first).
175    /// The multimap values are `[timestamp_be: 8 bytes][experience_id: 16 bytes]`,
176    /// and since timestamps are big-endian, reverse lexicographic order = newest first.
177    ///
178    /// Returns `(ExperienceId, Timestamp)` pairs for the caller to fetch full
179    /// records and apply post-filters.
180    ///
181    /// # Arguments
182    ///
183    /// * `collective_id` - The collective to query
184    /// * `limit` - Maximum number of entries to return
185    fn get_recent_experience_ids(
186        &self,
187        collective_id: CollectiveId,
188        limit: usize,
189    ) -> Result<Vec<(ExperienceId, Timestamp)>>;
190
191    // =========================================================================
192    // Experience Storage Operations
193    // =========================================================================
194
195    /// Saves an experience and its embedding to storage.
196    ///
197    /// Writes atomically to 4 tables in a single transaction:
198    /// - `EXPERIENCES_TABLE` — the experience record (without embedding)
199    /// - `EMBEDDINGS_TABLE` — the embedding vector as raw f32 bytes
200    /// - `EXPERIENCES_BY_COLLECTIVE_TABLE` — secondary index by collective+timestamp
201    /// - `EXPERIENCES_BY_TYPE_TABLE` — secondary index by collective+type
202    ///
203    /// # Errors
204    ///
205    /// Returns an error if the transaction or serialization fails.
206    fn save_experience(&self, experience: &Experience) -> Result<()>;
207
208    /// Retrieves an experience by ID, including its embedding.
209    ///
210    /// Reads from both `EXPERIENCES_TABLE` and `EMBEDDINGS_TABLE` to
211    /// reconstitute the full experience with embedding.
212    ///
213    /// Returns `None` if no experience with the given ID exists.
214    fn get_experience(&self, id: ExperienceId) -> Result<Option<Experience>>;
215
216    /// Updates mutable fields of an experience.
217    ///
218    /// Applies only the `Some` fields from the update. Immutable fields
219    /// (content, embedding, collective_id, type) are not affected.
220    ///
221    /// Returns `true` if the experience existed and was updated,
222    /// `false` if not found.
223    fn update_experience(&self, id: ExperienceId, update: &ExperienceUpdate) -> Result<bool>;
224
225    /// Permanently deletes an experience and its embedding.
226    ///
227    /// Removes from all 4 tables in a single transaction.
228    ///
229    /// Returns `true` if the experience existed and was deleted,
230    /// `false` if not found.
231    fn delete_experience(&self, id: ExperienceId) -> Result<bool>;
232
233    /// Atomically increments the applications counter for an experience.
234    ///
235    /// Performs a read-modify-write in a single write transaction to prevent
236    /// lost updates under concurrent access. Uses saturating arithmetic
237    /// (caps at `u32::MAX`, never panics).
238    ///
239    /// Returns `Some(new_count)` if the experience was found and updated,
240    /// `None` if no experience with the given ID exists.
241    fn reinforce_experience(&self, id: ExperienceId) -> Result<Option<u32>>;
242
243    /// Saves an embedding vector to storage.
244    ///
245    /// The embedding is stored as raw little-endian f32 bytes.
246    fn save_embedding(&self, id: ExperienceId, embedding: &[f32]) -> Result<()>;
247
248    /// Retrieves an embedding vector by experience ID.
249    ///
250    /// Returns `None` if no embedding exists for the given ID.
251    fn get_embedding(&self, id: ExperienceId) -> Result<Option<Vec<f32>>>;
252
253    // =========================================================================
254    // Relation Storage Operations (E3-S01)
255    // =========================================================================
256
257    /// Saves a relation and its index entries atomically.
258    ///
259    /// Writes to 3 tables in a single transaction:
260    /// - `RELATIONS_TABLE` — the relation record
261    /// - `RELATIONS_BY_SOURCE_TABLE` — index by source experience
262    /// - `RELATIONS_BY_TARGET_TABLE` — index by target experience
263    fn save_relation(&self, relation: &ExperienceRelation) -> Result<()>;
264
265    /// Retrieves a relation by ID.
266    ///
267    /// Returns `None` if no relation with the given ID exists.
268    fn get_relation(&self, id: RelationId) -> Result<Option<ExperienceRelation>>;
269
270    /// Deletes a relation and its index entries atomically.
271    ///
272    /// Returns `true` if the relation existed and was deleted,
273    /// `false` if not found.
274    fn delete_relation(&self, id: RelationId) -> Result<bool>;
275
276    /// Finds all relation IDs where the given experience is the source.
277    ///
278    /// Iterates the `RELATIONS_BY_SOURCE_TABLE` multimap for the experience.
279    fn get_relation_ids_by_source(&self, experience_id: ExperienceId) -> Result<Vec<RelationId>>;
280
281    /// Finds all relation IDs where the given experience is the target.
282    ///
283    /// Iterates the `RELATIONS_BY_TARGET_TABLE` multimap for the experience.
284    fn get_relation_ids_by_target(&self, experience_id: ExperienceId) -> Result<Vec<RelationId>>;
285
286    /// Deletes all relations where the given experience is source or target.
287    ///
288    /// Used for cascade deletion when an experience is removed.
289    /// Returns the count of deleted relations.
290    fn delete_relations_for_experience(&self, experience_id: ExperienceId) -> Result<u64>;
291
292    /// Checks if a relation with the same (source, target, type) already exists.
293    ///
294    /// Scans the source index, loads each relation, and checks for a matching
295    /// target and type. Efficient for the expected cardinality (few relations
296    /// per experience).
297    fn relation_exists(
298        &self,
299        source_id: ExperienceId,
300        target_id: ExperienceId,
301        relation_type: RelationType,
302    ) -> Result<bool>;
303
304    // =========================================================================
305    // Insight Storage Operations (E3-S02)
306    // =========================================================================
307
308    /// Saves a derived insight and its index entries atomically.
309    ///
310    /// Writes to 2 tables in a single transaction:
311    /// - `INSIGHTS_TABLE` — the insight record (with inline embedding)
312    /// - `INSIGHTS_BY_COLLECTIVE_TABLE` — index by collective
313    fn save_insight(&self, insight: &DerivedInsight) -> Result<()>;
314
315    /// Retrieves a derived insight by ID.
316    ///
317    /// Returns `None` if no insight with the given ID exists.
318    fn get_insight(&self, id: InsightId) -> Result<Option<DerivedInsight>>;
319
320    /// Deletes a derived insight and its index entries atomically.
321    ///
322    /// Returns `true` if the insight existed and was deleted,
323    /// `false` if not found.
324    fn delete_insight(&self, id: InsightId) -> Result<bool>;
325
326    /// Lists all insight IDs belonging to a collective.
327    ///
328    /// Used to rebuild HNSW indexes from stored insights on startup.
329    /// Iterates the `INSIGHTS_BY_COLLECTIVE_TABLE` multimap.
330    fn list_insight_ids_in_collective(&self, id: CollectiveId) -> Result<Vec<InsightId>>;
331
332    /// Deletes all insights belonging to a collective.
333    ///
334    /// Used for cascade deletion when a collective is removed.
335    /// Returns the count of deleted insights.
336    fn delete_insights_by_collective(&self, id: CollectiveId) -> Result<u64>;
337
338    // =========================================================================
339    // Activity Storage Operations (E3-S03)
340    // =========================================================================
341
342    /// Saves an agent activity to storage (upsert).
343    ///
344    /// If an activity for the same `(collective_id, agent_id)` already exists,
345    /// it is replaced. Uses the composite key encoding from `schema::encode_activity_key`.
346    fn save_activity(&self, activity: &Activity) -> Result<()>;
347
348    /// Retrieves an agent activity by agent ID and collective.
349    ///
350    /// Returns `None` if no activity exists for the given pair.
351    fn get_activity(&self, agent_id: &str, collective_id: CollectiveId)
352        -> Result<Option<Activity>>;
353
354    /// Deletes an agent activity.
355    ///
356    /// Returns `true` if the activity existed and was deleted,
357    /// `false` if no activity was found for the given pair.
358    fn delete_activity(&self, agent_id: &str, collective_id: CollectiveId) -> Result<bool>;
359
360    /// Lists all activities in a collective.
361    ///
362    /// Iterates the `ACTIVITIES_TABLE` and filters entries whose key
363    /// starts with the collective's 16-byte ID. Returns activities in
364    /// no guaranteed order.
365    fn list_activities_in_collective(&self, collective_id: CollectiveId) -> Result<Vec<Activity>>;
366
367    /// Deletes all activities belonging to a collective.
368    ///
369    /// Used for cascade deletion when a collective is removed.
370    /// Returns the count of deleted activities.
371    fn delete_activities_by_collective(&self, collective_id: CollectiveId) -> Result<u64>;
372
373    // =========================================================================
374    // Watch Event Operations (E4-S02) — Cross-Process Change Detection
375    // =========================================================================
376
377    /// Returns the current WAL sequence number.
378    ///
379    /// Every experience write (create, update, delete, reinforce) atomically
380    /// increments the sequence. Returns 0 if no writes have occurred yet.
381    ///
382    /// This is a read-only operation — the write-side logic is internal to the
383    /// storage implementation to maintain transactional atomicity.
384    fn get_wal_sequence(&self) -> Result<u64>;
385
386    /// Retrieves watch events with sequence numbers greater than `since_seq`.
387    ///
388    /// Returns events in ascending sequence order and the highest sequence
389    /// number seen. If no new events exist, returns an empty vec and `since_seq`.
390    ///
391    /// # Arguments
392    ///
393    /// * `since_seq` - Return events with sequence > this value (0 = all events)
394    /// * `limit` - Maximum number of events to return per call
395    fn poll_watch_events(
396        &self,
397        since_seq: u64,
398        limit: usize,
399    ) -> Result<(Vec<schema::WatchEventRecord>, u64)>;
400}
401
402/// Opens a storage engine at the given path.
403///
404/// This is a convenience function that creates a [`RedbStorage`] instance.
405/// For more control, use `RedbStorage::open()` directly.
406///
407/// # Arguments
408///
409/// * `path` - Path to the database file (created if it doesn't exist)
410/// * `config` - Database configuration
411///
412/// # Errors
413///
414/// Returns an error if:
415/// - The database file is corrupted
416/// - The database is locked by another process
417/// - Schema version doesn't match
418/// - Embedding dimension doesn't match (for existing databases)
419pub fn open_storage(path: impl AsRef<Path>, config: &Config) -> Result<Box<dyn StorageEngine>> {
420    let storage = RedbStorage::open(path, config)?;
421    Ok(Box::new(storage))
422}
423
424#[cfg(test)]
425mod tests {
426    use super::*;
427    use crate::config::EmbeddingDimension;
428    use tempfile::tempdir;
429
430    #[test]
431    fn test_open_storage() {
432        let dir = tempdir().unwrap();
433        let path = dir.path().join("test.db");
434
435        let config = Config::default();
436        let storage = open_storage(&path, &config).unwrap();
437
438        assert_eq!(
439            storage.metadata().embedding_dimension,
440            EmbeddingDimension::D384
441        );
442        assert!(storage.path().is_some());
443
444        storage.close().unwrap();
445    }
446
447    #[test]
448    fn test_storage_engine_is_send_sync() {
449        fn assert_send_sync<T: Send + Sync>() {}
450        assert_send_sync::<RedbStorage>();
451    }
452}