Skip to main content

pulsedb/storage/
mod.rs

1//! Storage layer abstractions for PulseDB.
2//!
3//! This module provides a trait-based abstraction over the storage engine,
4//! allowing different backends to be used (e.g., redb, mock for testing).
5//!
6//! # Architecture
7//!
8//! ```text
9//! ┌─────────────────────────────────────────────────────────────┐
10//! │                      PulseDB                                 │
11//! │                         │                                    │
12//! │                         ▼                                    │
13//! │              ┌─────────────────────┐                        │
14//! │              │   StorageEngine     │  ← Trait               │
15//! │              └─────────────────────┘                        │
16//! │                    ▲         ▲                              │
17//! │                    │         │                              │
18//! │         ┌─────────┴─┐   ┌───┴─────────┐                    │
19//! │         │RedbStorage│   │ MockStorage │                    │
20//! │         └───────────┘   └─────────────┘                    │
21//! │           (prod)           (test)                          │
22//! └─────────────────────────────────────────────────────────────┘
23//! ```
24
25pub mod redb;
26pub mod schema;
27
28pub use self::redb::RedbStorage;
29pub use schema::{DatabaseMetadata, SCHEMA_VERSION};
30
31use std::path::Path;
32
33use crate::activity::Activity;
34use crate::collective::Collective;
35use crate::config::Config;
36use crate::error::Result;
37use crate::experience::{Experience, ExperienceUpdate};
38use crate::insight::DerivedInsight;
39use crate::relation::{ExperienceRelation, RelationType};
40use crate::types::{CollectiveId, ExperienceId, InsightId, RelationId, Timestamp};
41
42/// Storage engine trait for PulseDB.
43///
44/// This trait defines the contract that any storage backend must implement.
45/// The primary implementation is [`RedbStorage`], but other implementations
46/// can be created for testing or alternative backends.
47///
48/// # Thread Safety
49///
50/// Implementations must be `Send + Sync` to allow the database to be shared
51/// across threads. The engine handles internal synchronization.
52///
53/// # Example
54///
55/// ```rust
56/// # fn main() -> pulsedb::Result<()> {
57/// # let dir = tempfile::tempdir().unwrap();
58/// use pulsedb::{Config, storage::{StorageEngine, RedbStorage}};
59///
60/// let config = Config::default();
61/// let storage = RedbStorage::open(dir.path().join("test.db"), &config)?;
62/// let metadata = storage.metadata();
63/// println!("Schema version: {}", metadata.schema_version);
64/// # Ok(())
65/// # }
66/// ```
67pub trait StorageEngine: Send + Sync {
68    // =========================================================================
69    // Lifecycle
70    // =========================================================================
71
72    /// Returns the database metadata.
73    ///
74    /// The metadata includes schema version, embedding dimension, and timestamps.
75    fn metadata(&self) -> &DatabaseMetadata;
76
77    /// Closes the storage engine, flushing any pending writes.
78    ///
79    /// This method consumes the storage engine. After calling `close()`,
80    /// the engine cannot be used.
81    ///
82    /// # Errors
83    ///
84    /// Returns an error if the backend supports reporting flush failures.
85    /// Note: the current redb backend flushes on drop (infallible), so
86    /// this always returns `Ok(())` for [`RedbStorage`].
87    fn close(self: Box<Self>) -> Result<()>;
88
89    /// Returns the path to the database file, if applicable.
90    ///
91    /// Some storage implementations (like in-memory) may not have a path.
92    fn path(&self) -> Option<&Path>;
93
94    // =========================================================================
95    // Collective Storage Operations
96    // =========================================================================
97
98    /// Saves a collective to storage.
99    ///
100    /// If a collective with the same ID already exists, it is overwritten.
101    /// Each call opens and commits its own write transaction.
102    ///
103    /// # Errors
104    ///
105    /// Returns an error if the transaction or serialization fails.
106    fn save_collective(&self, collective: &Collective) -> Result<()>;
107
108    /// Retrieves a collective by ID.
109    ///
110    /// Returns `None` if no collective with the given ID exists.
111    ///
112    /// # Errors
113    ///
114    /// Returns an error if the read transaction or deserialization fails.
115    fn get_collective(&self, id: CollectiveId) -> Result<Option<Collective>>;
116
117    /// Lists all collectives in the database.
118    ///
119    /// Returns an empty vector if no collectives exist.
120    ///
121    /// # Errors
122    ///
123    /// Returns an error if the read transaction or deserialization fails.
124    fn list_collectives(&self) -> Result<Vec<Collective>>;
125
126    /// Deletes a collective by ID.
127    ///
128    /// Returns `true` if the collective existed and was deleted,
129    /// `false` if no collective with the given ID was found.
130    ///
131    /// # Errors
132    ///
133    /// Returns an error if the write transaction fails.
134    fn delete_collective(&self, id: CollectiveId) -> Result<bool>;
135
136    // =========================================================================
137    // Experience Index Operations (for collective stats & cascade delete)
138    // =========================================================================
139
140    /// Counts experiences belonging to a collective.
141    ///
142    /// Queries the `experiences_by_collective` multimap index.
143    /// Returns 0 if no experiences exist for the collective.
144    ///
145    /// # Errors
146    ///
147    /// Returns an error if the read transaction fails.
148    fn count_experiences_in_collective(&self, id: CollectiveId) -> Result<u64>;
149
150    /// Deletes all experiences and related index entries for a collective.
151    ///
152    /// Used for cascade deletion when a collective is removed. Cleans up:
153    /// - Experience records
154    /// - Embedding vectors
155    /// - By-collective index entries
156    /// - By-type index entries
157    ///
158    /// Returns the number of experiences deleted.
159    ///
160    /// # Errors
161    ///
162    /// Returns an error if the write transaction fails.
163    fn delete_experiences_by_collective(&self, id: CollectiveId) -> Result<u64>;
164
165    /// Lists all experience IDs belonging to a collective.
166    ///
167    /// Used to rebuild HNSW indexes from redb embeddings on startup.
168    /// Iterates the `experiences_by_collective` multimap index.
169    fn list_experience_ids_in_collective(&self, id: CollectiveId) -> Result<Vec<ExperienceId>>;
170
171    /// Retrieves the most recent experience IDs in a collective.
172    ///
173    /// Performs a reverse iteration on `EXPERIENCES_BY_COLLECTIVE_TABLE`
174    /// to get IDs ordered by timestamp descending (newest first).
175    /// The multimap values are `[timestamp_be: 8 bytes][experience_id: 16 bytes]`,
176    /// and since timestamps are big-endian, reverse lexicographic order = newest first.
177    ///
178    /// Returns `(ExperienceId, Timestamp)` pairs for the caller to fetch full
179    /// records and apply post-filters.
180    ///
181    /// # Arguments
182    ///
183    /// * `collective_id` - The collective to query
184    /// * `limit` - Maximum number of entries to return
185    fn get_recent_experience_ids(
186        &self,
187        collective_id: CollectiveId,
188        limit: usize,
189    ) -> Result<Vec<(ExperienceId, Timestamp)>>;
190
191    // =========================================================================
192    // Experience Storage Operations
193    // =========================================================================
194
195    /// Saves an experience and its embedding to storage.
196    ///
197    /// Writes atomically to 4 tables in a single transaction:
198    /// - `EXPERIENCES_TABLE` — the experience record (without embedding)
199    /// - `EMBEDDINGS_TABLE` — the embedding vector as raw f32 bytes
200    /// - `EXPERIENCES_BY_COLLECTIVE_TABLE` — secondary index by collective+timestamp
201    /// - `EXPERIENCES_BY_TYPE_TABLE` — secondary index by collective+type
202    ///
203    /// # Errors
204    ///
205    /// Returns an error if the transaction or serialization fails.
206    fn save_experience(&self, experience: &Experience) -> Result<()>;
207
208    /// Retrieves an experience by ID, including its embedding.
209    ///
210    /// Reads from both `EXPERIENCES_TABLE` and `EMBEDDINGS_TABLE` to
211    /// reconstitute the full experience with embedding.
212    ///
213    /// Returns `None` if no experience with the given ID exists.
214    fn get_experience(&self, id: ExperienceId) -> Result<Option<Experience>>;
215
216    /// Updates mutable fields of an experience.
217    ///
218    /// Applies only the `Some` fields from the update. Immutable fields
219    /// (content, embedding, collective_id, type) are not affected.
220    ///
221    /// Returns `true` if the experience existed and was updated,
222    /// `false` if not found.
223    fn update_experience(&self, id: ExperienceId, update: &ExperienceUpdate) -> Result<bool>;
224
225    /// Permanently deletes an experience and its embedding.
226    ///
227    /// Removes from all 4 tables in a single transaction.
228    ///
229    /// Returns `true` if the experience existed and was deleted,
230    /// `false` if not found.
231    fn delete_experience(&self, id: ExperienceId) -> Result<bool>;
232
233    /// Atomically increments the applications counter for an experience.
234    ///
235    /// Performs a read-modify-write in a single write transaction to prevent
236    /// lost updates under concurrent access. Uses saturating arithmetic
237    /// (caps at `u32::MAX`, never panics).
238    ///
239    /// Returns `Some(new_count)` if the experience was found and updated,
240    /// `None` if no experience with the given ID exists.
241    fn reinforce_experience(&self, id: ExperienceId) -> Result<Option<u32>>;
242
243    /// Saves an embedding vector to storage.
244    ///
245    /// The embedding is stored as raw little-endian f32 bytes.
246    fn save_embedding(&self, id: ExperienceId, embedding: &[f32]) -> Result<()>;
247
248    /// Retrieves an embedding vector by experience ID.
249    ///
250    /// Returns `None` if no embedding exists for the given ID.
251    fn get_embedding(&self, id: ExperienceId) -> Result<Option<Vec<f32>>>;
252
253    // =========================================================================
254    // Relation Storage Operations (E3-S01)
255    // =========================================================================
256
257    /// Saves a relation and its index entries atomically.
258    ///
259    /// Writes to 3 tables in a single transaction:
260    /// - `RELATIONS_TABLE` — the relation record
261    /// - `RELATIONS_BY_SOURCE_TABLE` — index by source experience
262    /// - `RELATIONS_BY_TARGET_TABLE` — index by target experience
263    fn save_relation(&self, relation: &ExperienceRelation) -> Result<()>;
264
265    /// Retrieves a relation by ID.
266    ///
267    /// Returns `None` if no relation with the given ID exists.
268    fn get_relation(&self, id: RelationId) -> Result<Option<ExperienceRelation>>;
269
270    /// Deletes a relation and its index entries atomically.
271    ///
272    /// Returns `true` if the relation existed and was deleted,
273    /// `false` if not found.
274    fn delete_relation(&self, id: RelationId) -> Result<bool>;
275
276    /// Finds all relation IDs where the given experience is the source.
277    ///
278    /// Iterates the `RELATIONS_BY_SOURCE_TABLE` multimap for the experience.
279    fn get_relation_ids_by_source(&self, experience_id: ExperienceId) -> Result<Vec<RelationId>>;
280
281    /// Finds all relation IDs where the given experience is the target.
282    ///
283    /// Iterates the `RELATIONS_BY_TARGET_TABLE` multimap for the experience.
284    fn get_relation_ids_by_target(&self, experience_id: ExperienceId) -> Result<Vec<RelationId>>;
285
286    /// Deletes all relations where the given experience is source or target.
287    ///
288    /// Used for cascade deletion when an experience is removed.
289    /// Returns the count of deleted relations.
290    fn delete_relations_for_experience(&self, experience_id: ExperienceId) -> Result<u64>;
291
292    /// Checks if a relation with the same (source, target, type) already exists.
293    ///
294    /// Scans the source index, loads each relation, and checks for a matching
295    /// target and type. Efficient for the expected cardinality (few relations
296    /// per experience).
297    fn relation_exists(
298        &self,
299        source_id: ExperienceId,
300        target_id: ExperienceId,
301        relation_type: RelationType,
302    ) -> Result<bool>;
303
304    // =========================================================================
305    // Insight Storage Operations (E3-S02)
306    // =========================================================================
307
308    /// Saves a derived insight and its index entries atomically.
309    ///
310    /// Writes to 2 tables in a single transaction:
311    /// - `INSIGHTS_TABLE` — the insight record (with inline embedding)
312    /// - `INSIGHTS_BY_COLLECTIVE_TABLE` — index by collective
313    fn save_insight(&self, insight: &DerivedInsight) -> Result<()>;
314
315    /// Retrieves a derived insight by ID.
316    ///
317    /// Returns `None` if no insight with the given ID exists.
318    fn get_insight(&self, id: InsightId) -> Result<Option<DerivedInsight>>;
319
320    /// Deletes a derived insight and its index entries atomically.
321    ///
322    /// Returns `true` if the insight existed and was deleted,
323    /// `false` if not found.
324    fn delete_insight(&self, id: InsightId) -> Result<bool>;
325
326    /// Lists all insight IDs belonging to a collective.
327    ///
328    /// Used to rebuild HNSW indexes from stored insights on startup.
329    /// Iterates the `INSIGHTS_BY_COLLECTIVE_TABLE` multimap.
330    fn list_insight_ids_in_collective(&self, id: CollectiveId) -> Result<Vec<InsightId>>;
331
332    /// Deletes all insights belonging to a collective.
333    ///
334    /// Used for cascade deletion when a collective is removed.
335    /// Returns the count of deleted insights.
336    fn delete_insights_by_collective(&self, id: CollectiveId) -> Result<u64>;
337
338    // =========================================================================
339    // Activity Storage Operations (E3-S03)
340    // =========================================================================
341
342    /// Saves an agent activity to storage (upsert).
343    ///
344    /// If an activity for the same `(collective_id, agent_id)` already exists,
345    /// it is replaced. Uses the composite key encoding from `schema::encode_activity_key`.
346    fn save_activity(&self, activity: &Activity) -> Result<()>;
347
348    /// Retrieves an agent activity by agent ID and collective.
349    ///
350    /// Returns `None` if no activity exists for the given pair.
351    fn get_activity(&self, agent_id: &str, collective_id: CollectiveId)
352        -> Result<Option<Activity>>;
353
354    /// Deletes an agent activity.
355    ///
356    /// Returns `true` if the activity existed and was deleted,
357    /// `false` if no activity was found for the given pair.
358    fn delete_activity(&self, agent_id: &str, collective_id: CollectiveId) -> Result<bool>;
359
360    /// Lists all activities in a collective.
361    ///
362    /// Iterates the `ACTIVITIES_TABLE` and filters entries whose key
363    /// starts with the collective's 16-byte ID. Returns activities in
364    /// no guaranteed order.
365    fn list_activities_in_collective(&self, collective_id: CollectiveId) -> Result<Vec<Activity>>;
366
367    /// Deletes all activities belonging to a collective.
368    ///
369    /// Used for cascade deletion when a collective is removed.
370    /// Returns the count of deleted activities.
371    fn delete_activities_by_collective(&self, collective_id: CollectiveId) -> Result<u64>;
372
373    // =========================================================================
374    // Paginated List Operations (PulseVision)
375    // =========================================================================
376
377    /// Lists experience IDs in a collective with pagination.
378    ///
379    /// Returns IDs ordered by timestamp (oldest first). Use `offset` to skip
380    /// previously fetched pages and `limit` to control page size.
381    fn list_experience_ids_paginated(
382        &self,
383        collective_id: CollectiveId,
384        limit: usize,
385        offset: usize,
386    ) -> Result<Vec<ExperienceId>>;
387
388    /// Lists all relations in a collective with pagination.
389    ///
390    /// Scans relations whose source experience belongs to the collective.
391    fn list_relations_in_collective(
392        &self,
393        collective_id: CollectiveId,
394        limit: usize,
395        offset: usize,
396    ) -> Result<Vec<crate::relation::ExperienceRelation>>;
397
398    /// Lists insight IDs in a collective with pagination.
399    fn list_insight_ids_paginated(
400        &self,
401        collective_id: CollectiveId,
402        limit: usize,
403        offset: usize,
404    ) -> Result<Vec<InsightId>>;
405
406    // =========================================================================
407    // Watch Event Operations (E4-S02) — Cross-Process Change Detection
408    // =========================================================================
409
410    /// Returns the current WAL sequence number.
411    ///
412    /// Every experience write (create, update, delete, reinforce) atomically
413    /// increments the sequence. Returns 0 if no writes have occurred yet.
414    ///
415    /// This is a read-only operation — the write-side logic is internal to the
416    /// storage implementation to maintain transactional atomicity.
417    fn get_wal_sequence(&self) -> Result<u64>;
418
419    /// Retrieves watch events with sequence numbers greater than `since_seq`.
420    ///
421    /// Returns events in ascending sequence order and the highest sequence
422    /// number seen. If no new events exist, returns an empty vec and `since_seq`.
423    ///
424    /// # Arguments
425    ///
426    /// * `since_seq` - Return events with sequence > this value (0 = all events)
427    /// * `limit` - Maximum number of events to return per call
428    fn poll_watch_events(
429        &self,
430        since_seq: u64,
431        limit: usize,
432    ) -> Result<(Vec<schema::WatchEventRecord>, u64)>;
433
434    // =========================================================================
435    // Sync Operations (feature: sync)
436    // =========================================================================
437
438    /// Retrieves ALL watch events (all entity types) with their sequence numbers.
439    ///
440    /// Unlike `poll_watch_events()` which returns records without sequences,
441    /// this method returns `(sequence, record)` pairs needed by the sync
442    /// pusher to construct `SyncChange` objects.
443    #[cfg(feature = "sync")]
444    fn poll_sync_events(
445        &self,
446        since_seq: u64,
447        limit: usize,
448    ) -> Result<Vec<(u64, schema::WatchEventRecord)>>;
449
450    /// Returns the persistent instance ID for this database.
451    ///
452    /// Generated on first open and stable across restarts.
453    /// Used by the sync protocol to identify this PulseDB instance.
454    #[cfg(feature = "sync")]
455    fn instance_id(&self) -> crate::sync::InstanceId;
456
457    /// Saves a sync cursor for a peer instance.
458    ///
459    /// Upserts the cursor in the `SYNC_CURSORS_TABLE`.
460    #[cfg(feature = "sync")]
461    fn save_sync_cursor(&self, cursor: &crate::sync::SyncCursor) -> Result<()>;
462
463    /// Loads the sync cursor for a specific peer instance.
464    ///
465    /// Returns `None` if no cursor has been saved for this peer.
466    #[cfg(feature = "sync")]
467    fn load_sync_cursor(
468        &self,
469        instance_id: &crate::sync::InstanceId,
470    ) -> Result<Option<crate::sync::SyncCursor>>;
471
472    /// Lists all saved sync cursors.
473    ///
474    /// Returns cursors for all known peer instances.
475    #[cfg(feature = "sync")]
476    fn list_sync_cursors(&self) -> Result<Vec<crate::sync::SyncCursor>>;
477
478    /// Compacts the WAL by deleting events with sequence <= `up_to_seq`.
479    ///
480    /// Returns the number of events deleted. This is a write operation
481    /// that permanently removes old WAL entries to reclaim disk space.
482    ///
483    /// # Safety
484    ///
485    /// Only compact up to the minimum cursor across all peers — otherwise
486    /// peers that haven't synced yet will miss events.
487    #[cfg(feature = "sync")]
488    fn compact_wal_events(&self, up_to_seq: u64) -> Result<u64>;
489}
490
491/// Opens a storage engine at the given path.
492///
493/// This is a convenience function that creates a [`RedbStorage`] instance.
494/// For more control, use `RedbStorage::open()` directly.
495///
496/// # Arguments
497///
498/// * `path` - Path to the database file (created if it doesn't exist)
499/// * `config` - Database configuration
500///
501/// # Errors
502///
503/// Returns an error if:
504/// - The database file is corrupted
505/// - The database is locked by another process
506/// - Schema version doesn't match
507/// - Embedding dimension doesn't match (for existing databases)
508pub fn open_storage(path: impl AsRef<Path>, config: &Config) -> Result<Box<dyn StorageEngine>> {
509    let storage = RedbStorage::open(path, config)?;
510    Ok(Box::new(storage))
511}
512
513#[cfg(test)]
514mod tests {
515    use super::*;
516    use crate::config::EmbeddingDimension;
517    use tempfile::tempdir;
518
519    #[test]
520    fn test_open_storage() {
521        let dir = tempdir().unwrap();
522        let path = dir.path().join("test.db");
523
524        let config = Config::default();
525        let storage = open_storage(&path, &config).unwrap();
526
527        assert_eq!(
528            storage.metadata().embedding_dimension,
529            EmbeddingDimension::D384
530        );
531        assert!(storage.path().is_some());
532
533        storage.close().unwrap();
534    }
535
536    #[test]
537    fn test_storage_engine_is_send_sync() {
538        fn assert_send_sync<T: Send + Sync>() {}
539        assert_send_sync::<RedbStorage>();
540    }
541}