pub trait StorageEngine: Send + Sync {
Show 46 methods
// Required methods
fn metadata(&self) -> &DatabaseMetadata;
fn close(self: Box<Self>) -> Result<()>;
fn path(&self) -> Option<&Path>;
fn save_collective(&self, collective: &Collective) -> Result<()>;
fn get_collective(&self, id: CollectiveId) -> Result<Option<Collective>>;
fn list_collectives(&self) -> Result<Vec<Collective>>;
fn delete_collective(&self, id: CollectiveId) -> Result<bool>;
fn count_experiences_in_collective(&self, id: CollectiveId) -> Result<u64>;
fn delete_experiences_by_collective(&self, id: CollectiveId) -> Result<u64>;
fn list_experience_ids_in_collective(
&self,
id: CollectiveId,
) -> Result<Vec<ExperienceId>>;
fn get_recent_experience_ids(
&self,
collective_id: CollectiveId,
limit: usize,
) -> Result<Vec<(ExperienceId, Timestamp)>>;
fn save_experience(&self, experience: &Experience) -> Result<()>;
fn get_experience(&self, id: ExperienceId) -> Result<Option<Experience>>;
fn update_experience(
&self,
id: ExperienceId,
update: &ExperienceUpdate,
) -> Result<bool>;
fn delete_experience(&self, id: ExperienceId) -> Result<bool>;
fn reinforce_experience(&self, id: ExperienceId) -> Result<Option<u32>>;
fn save_embedding(&self, id: ExperienceId, embedding: &[f32]) -> Result<()>;
fn get_embedding(&self, id: ExperienceId) -> Result<Option<Vec<f32>>>;
fn save_relation(&self, relation: &ExperienceRelation) -> Result<()>;
fn get_relation(&self, id: RelationId) -> Result<Option<ExperienceRelation>>;
fn delete_relation(&self, id: RelationId) -> Result<bool>;
fn get_relation_ids_by_source(
&self,
experience_id: ExperienceId,
) -> Result<Vec<RelationId>>;
fn get_relation_ids_by_target(
&self,
experience_id: ExperienceId,
) -> Result<Vec<RelationId>>;
fn delete_relations_for_experience(
&self,
experience_id: ExperienceId,
) -> Result<u64>;
fn relation_exists(
&self,
source_id: ExperienceId,
target_id: ExperienceId,
relation_type: RelationType,
) -> Result<bool>;
fn save_insight(&self, insight: &DerivedInsight) -> Result<()>;
fn get_insight(&self, id: InsightId) -> Result<Option<DerivedInsight>>;
fn delete_insight(&self, id: InsightId) -> Result<bool>;
fn list_insight_ids_in_collective(
&self,
id: CollectiveId,
) -> Result<Vec<InsightId>>;
fn delete_insights_by_collective(&self, id: CollectiveId) -> Result<u64>;
fn save_activity(&self, activity: &Activity) -> Result<()>;
fn get_activity(
&self,
agent_id: &str,
collective_id: CollectiveId,
) -> Result<Option<Activity>>;
fn delete_activity(
&self,
agent_id: &str,
collective_id: CollectiveId,
) -> Result<bool>;
fn list_activities_in_collective(
&self,
collective_id: CollectiveId,
) -> Result<Vec<Activity>>;
fn delete_activities_by_collective(
&self,
collective_id: CollectiveId,
) -> Result<u64>;
fn list_experience_ids_paginated(
&self,
collective_id: CollectiveId,
limit: usize,
offset: usize,
) -> Result<Vec<ExperienceId>>;
fn list_relations_in_collective(
&self,
collective_id: CollectiveId,
limit: usize,
offset: usize,
) -> Result<Vec<ExperienceRelation>>;
fn list_insight_ids_paginated(
&self,
collective_id: CollectiveId,
limit: usize,
offset: usize,
) -> Result<Vec<InsightId>>;
fn get_wal_sequence(&self) -> Result<u64>;
fn poll_watch_events(
&self,
since_seq: u64,
limit: usize,
) -> Result<(Vec<WatchEventRecord>, u64)>;
fn poll_sync_events(
&self,
since_seq: u64,
limit: usize,
) -> Result<Vec<(u64, WatchEventRecord)>>;
fn instance_id(&self) -> InstanceId;
fn save_sync_cursor(&self, cursor: &SyncCursor) -> Result<()>;
fn load_sync_cursor(
&self,
instance_id: &InstanceId,
) -> Result<Option<SyncCursor>>;
fn list_sync_cursors(&self) -> Result<Vec<SyncCursor>>;
fn compact_wal_events(&self, up_to_seq: u64) -> Result<u64>;
}Expand description
Storage engine trait for PulseDB.
This trait defines the contract that any storage backend must implement.
The primary implementation is RedbStorage, but other implementations
can be created for testing or alternative backends.
§Thread Safety
Implementations must be Send + Sync to allow the database to be shared
across threads. The engine handles internal synchronization.
§Example
use pulsedb::{Config, storage::{StorageEngine, RedbStorage}};
let config = Config::default();
let storage = RedbStorage::open(dir.path().join("test.db"), &config)?;
let metadata = storage.metadata();
println!("Schema version: {}", metadata.schema_version);Required Methods§
Sourcefn metadata(&self) -> &DatabaseMetadata
fn metadata(&self) -> &DatabaseMetadata
Returns the database metadata.
The metadata includes schema version, embedding dimension, and timestamps.
Sourcefn close(self: Box<Self>) -> Result<()>
fn close(self: Box<Self>) -> Result<()>
Closes the storage engine, flushing any pending writes.
This method consumes the storage engine. After calling close(),
the engine cannot be used.
§Errors
Returns an error if the backend supports reporting flush failures.
Note: the current redb backend flushes on drop (infallible), so
this always returns Ok(()) for RedbStorage.
Sourcefn path(&self) -> Option<&Path>
fn path(&self) -> Option<&Path>
Returns the path to the database file, if applicable.
Some storage implementations (like in-memory) may not have a path.
Sourcefn save_collective(&self, collective: &Collective) -> Result<()>
fn save_collective(&self, collective: &Collective) -> Result<()>
Saves a collective to storage.
If a collective with the same ID already exists, it is overwritten. Each call opens and commits its own write transaction.
§Errors
Returns an error if the transaction or serialization fails.
Sourcefn get_collective(&self, id: CollectiveId) -> Result<Option<Collective>>
fn get_collective(&self, id: CollectiveId) -> Result<Option<Collective>>
Retrieves a collective by ID.
Returns None if no collective with the given ID exists.
§Errors
Returns an error if the read transaction or deserialization fails.
Sourcefn list_collectives(&self) -> Result<Vec<Collective>>
fn list_collectives(&self) -> Result<Vec<Collective>>
Lists all collectives in the database.
Returns an empty vector if no collectives exist.
§Errors
Returns an error if the read transaction or deserialization fails.
Sourcefn delete_collective(&self, id: CollectiveId) -> Result<bool>
fn delete_collective(&self, id: CollectiveId) -> Result<bool>
Deletes a collective by ID.
Returns true if the collective existed and was deleted,
false if no collective with the given ID was found.
§Errors
Returns an error if the write transaction fails.
Sourcefn count_experiences_in_collective(&self, id: CollectiveId) -> Result<u64>
fn count_experiences_in_collective(&self, id: CollectiveId) -> Result<u64>
Counts experiences belonging to a collective.
Queries the experiences_by_collective multimap index.
Returns 0 if no experiences exist for the collective.
§Errors
Returns an error if the read transaction fails.
Sourcefn delete_experiences_by_collective(&self, id: CollectiveId) -> Result<u64>
fn delete_experiences_by_collective(&self, id: CollectiveId) -> Result<u64>
Deletes all experiences and related index entries for a collective.
Used for cascade deletion when a collective is removed. Cleans up:
- Experience records
- Embedding vectors
- By-collective index entries
- By-type index entries
Returns the number of experiences deleted.
§Errors
Returns an error if the write transaction fails.
Sourcefn list_experience_ids_in_collective(
&self,
id: CollectiveId,
) -> Result<Vec<ExperienceId>>
fn list_experience_ids_in_collective( &self, id: CollectiveId, ) -> Result<Vec<ExperienceId>>
Lists all experience IDs belonging to a collective.
Used to rebuild HNSW indexes from redb embeddings on startup.
Iterates the experiences_by_collective multimap index.
Sourcefn get_recent_experience_ids(
&self,
collective_id: CollectiveId,
limit: usize,
) -> Result<Vec<(ExperienceId, Timestamp)>>
fn get_recent_experience_ids( &self, collective_id: CollectiveId, limit: usize, ) -> Result<Vec<(ExperienceId, Timestamp)>>
Retrieves the most recent experience IDs in a collective.
Performs a reverse iteration on EXPERIENCES_BY_COLLECTIVE_TABLE
to get IDs ordered by timestamp descending (newest first).
The multimap values are [timestamp_be: 8 bytes][experience_id: 16 bytes],
and since timestamps are big-endian, reverse lexicographic order = newest first.
Returns (ExperienceId, Timestamp) pairs for the caller to fetch full
records and apply post-filters.
§Arguments
collective_id- The collective to querylimit- Maximum number of entries to return
Sourcefn save_experience(&self, experience: &Experience) -> Result<()>
fn save_experience(&self, experience: &Experience) -> Result<()>
Saves an experience and its embedding to storage.
Writes atomically to 4 tables in a single transaction:
EXPERIENCES_TABLE— the experience record (without embedding)EMBEDDINGS_TABLE— the embedding vector as raw f32 bytesEXPERIENCES_BY_COLLECTIVE_TABLE— secondary index by collective+timestampEXPERIENCES_BY_TYPE_TABLE— secondary index by collective+type
§Errors
Returns an error if the transaction or serialization fails.
Sourcefn get_experience(&self, id: ExperienceId) -> Result<Option<Experience>>
fn get_experience(&self, id: ExperienceId) -> Result<Option<Experience>>
Retrieves an experience by ID, including its embedding.
Reads from both EXPERIENCES_TABLE and EMBEDDINGS_TABLE to
reconstitute the full experience with embedding.
Returns None if no experience with the given ID exists.
Sourcefn update_experience(
&self,
id: ExperienceId,
update: &ExperienceUpdate,
) -> Result<bool>
fn update_experience( &self, id: ExperienceId, update: &ExperienceUpdate, ) -> Result<bool>
Updates mutable fields of an experience.
Applies only the Some fields from the update. Immutable fields
(content, embedding, collective_id, type) are not affected.
Returns true if the experience existed and was updated,
false if not found.
Sourcefn delete_experience(&self, id: ExperienceId) -> Result<bool>
fn delete_experience(&self, id: ExperienceId) -> Result<bool>
Permanently deletes an experience and its embedding.
Removes from all 4 tables in a single transaction.
Returns true if the experience existed and was deleted,
false if not found.
Sourcefn reinforce_experience(&self, id: ExperienceId) -> Result<Option<u32>>
fn reinforce_experience(&self, id: ExperienceId) -> Result<Option<u32>>
Atomically increments the applications counter for an experience.
Performs a read-modify-write in a single write transaction to prevent
lost updates under concurrent access. Uses saturating arithmetic
(caps at u32::MAX, never panics).
Returns Some(new_count) if the experience was found and updated,
None if no experience with the given ID exists.
Sourcefn save_embedding(&self, id: ExperienceId, embedding: &[f32]) -> Result<()>
fn save_embedding(&self, id: ExperienceId, embedding: &[f32]) -> Result<()>
Saves an embedding vector to storage.
The embedding is stored as raw little-endian f32 bytes.
Sourcefn get_embedding(&self, id: ExperienceId) -> Result<Option<Vec<f32>>>
fn get_embedding(&self, id: ExperienceId) -> Result<Option<Vec<f32>>>
Retrieves an embedding vector by experience ID.
Returns None if no embedding exists for the given ID.
Sourcefn save_relation(&self, relation: &ExperienceRelation) -> Result<()>
fn save_relation(&self, relation: &ExperienceRelation) -> Result<()>
Saves a relation and its index entries atomically.
Writes to 3 tables in a single transaction:
RELATIONS_TABLE— the relation recordRELATIONS_BY_SOURCE_TABLE— index by source experienceRELATIONS_BY_TARGET_TABLE— index by target experience
Sourcefn get_relation(&self, id: RelationId) -> Result<Option<ExperienceRelation>>
fn get_relation(&self, id: RelationId) -> Result<Option<ExperienceRelation>>
Retrieves a relation by ID.
Returns None if no relation with the given ID exists.
Sourcefn delete_relation(&self, id: RelationId) -> Result<bool>
fn delete_relation(&self, id: RelationId) -> Result<bool>
Deletes a relation and its index entries atomically.
Returns true if the relation existed and was deleted,
false if not found.
Sourcefn get_relation_ids_by_source(
&self,
experience_id: ExperienceId,
) -> Result<Vec<RelationId>>
fn get_relation_ids_by_source( &self, experience_id: ExperienceId, ) -> Result<Vec<RelationId>>
Finds all relation IDs where the given experience is the source.
Iterates the RELATIONS_BY_SOURCE_TABLE multimap for the experience.
Sourcefn get_relation_ids_by_target(
&self,
experience_id: ExperienceId,
) -> Result<Vec<RelationId>>
fn get_relation_ids_by_target( &self, experience_id: ExperienceId, ) -> Result<Vec<RelationId>>
Finds all relation IDs where the given experience is the target.
Iterates the RELATIONS_BY_TARGET_TABLE multimap for the experience.
Sourcefn delete_relations_for_experience(
&self,
experience_id: ExperienceId,
) -> Result<u64>
fn delete_relations_for_experience( &self, experience_id: ExperienceId, ) -> Result<u64>
Deletes all relations where the given experience is source or target.
Used for cascade deletion when an experience is removed. Returns the count of deleted relations.
Sourcefn relation_exists(
&self,
source_id: ExperienceId,
target_id: ExperienceId,
relation_type: RelationType,
) -> Result<bool>
fn relation_exists( &self, source_id: ExperienceId, target_id: ExperienceId, relation_type: RelationType, ) -> Result<bool>
Checks if a relation with the same (source, target, type) already exists.
Scans the source index, loads each relation, and checks for a matching target and type. Efficient for the expected cardinality (few relations per experience).
Sourcefn save_insight(&self, insight: &DerivedInsight) -> Result<()>
fn save_insight(&self, insight: &DerivedInsight) -> Result<()>
Saves a derived insight and its index entries atomically.
Writes to 2 tables in a single transaction:
INSIGHTS_TABLE— the insight record (with inline embedding)INSIGHTS_BY_COLLECTIVE_TABLE— index by collective
Sourcefn get_insight(&self, id: InsightId) -> Result<Option<DerivedInsight>>
fn get_insight(&self, id: InsightId) -> Result<Option<DerivedInsight>>
Retrieves a derived insight by ID.
Returns None if no insight with the given ID exists.
Sourcefn delete_insight(&self, id: InsightId) -> Result<bool>
fn delete_insight(&self, id: InsightId) -> Result<bool>
Deletes a derived insight and its index entries atomically.
Returns true if the insight existed and was deleted,
false if not found.
Sourcefn list_insight_ids_in_collective(
&self,
id: CollectiveId,
) -> Result<Vec<InsightId>>
fn list_insight_ids_in_collective( &self, id: CollectiveId, ) -> Result<Vec<InsightId>>
Lists all insight IDs belonging to a collective.
Used to rebuild HNSW indexes from stored insights on startup.
Iterates the INSIGHTS_BY_COLLECTIVE_TABLE multimap.
Sourcefn delete_insights_by_collective(&self, id: CollectiveId) -> Result<u64>
fn delete_insights_by_collective(&self, id: CollectiveId) -> Result<u64>
Deletes all insights belonging to a collective.
Used for cascade deletion when a collective is removed. Returns the count of deleted insights.
Sourcefn save_activity(&self, activity: &Activity) -> Result<()>
fn save_activity(&self, activity: &Activity) -> Result<()>
Saves an agent activity to storage (upsert).
If an activity for the same (collective_id, agent_id) already exists,
it is replaced. Uses the composite key encoding from schema::encode_activity_key.
Sourcefn get_activity(
&self,
agent_id: &str,
collective_id: CollectiveId,
) -> Result<Option<Activity>>
fn get_activity( &self, agent_id: &str, collective_id: CollectiveId, ) -> Result<Option<Activity>>
Retrieves an agent activity by agent ID and collective.
Returns None if no activity exists for the given pair.
Sourcefn delete_activity(
&self,
agent_id: &str,
collective_id: CollectiveId,
) -> Result<bool>
fn delete_activity( &self, agent_id: &str, collective_id: CollectiveId, ) -> Result<bool>
Deletes an agent activity.
Returns true if the activity existed and was deleted,
false if no activity was found for the given pair.
Sourcefn list_activities_in_collective(
&self,
collective_id: CollectiveId,
) -> Result<Vec<Activity>>
fn list_activities_in_collective( &self, collective_id: CollectiveId, ) -> Result<Vec<Activity>>
Lists all activities in a collective.
Iterates the ACTIVITIES_TABLE and filters entries whose key
starts with the collective’s 16-byte ID. Returns activities in
no guaranteed order.
Sourcefn delete_activities_by_collective(
&self,
collective_id: CollectiveId,
) -> Result<u64>
fn delete_activities_by_collective( &self, collective_id: CollectiveId, ) -> Result<u64>
Deletes all activities belonging to a collective.
Used for cascade deletion when a collective is removed. Returns the count of deleted activities.
Sourcefn list_experience_ids_paginated(
&self,
collective_id: CollectiveId,
limit: usize,
offset: usize,
) -> Result<Vec<ExperienceId>>
fn list_experience_ids_paginated( &self, collective_id: CollectiveId, limit: usize, offset: usize, ) -> Result<Vec<ExperienceId>>
Lists experience IDs in a collective with pagination.
Returns IDs ordered by timestamp (oldest first). Use offset to skip
previously fetched pages and limit to control page size.
Sourcefn list_relations_in_collective(
&self,
collective_id: CollectiveId,
limit: usize,
offset: usize,
) -> Result<Vec<ExperienceRelation>>
fn list_relations_in_collective( &self, collective_id: CollectiveId, limit: usize, offset: usize, ) -> Result<Vec<ExperienceRelation>>
Lists all relations in a collective with pagination.
Scans relations whose source experience belongs to the collective.
Sourcefn list_insight_ids_paginated(
&self,
collective_id: CollectiveId,
limit: usize,
offset: usize,
) -> Result<Vec<InsightId>>
fn list_insight_ids_paginated( &self, collective_id: CollectiveId, limit: usize, offset: usize, ) -> Result<Vec<InsightId>>
Lists insight IDs in a collective with pagination.
Sourcefn get_wal_sequence(&self) -> Result<u64>
fn get_wal_sequence(&self) -> Result<u64>
Returns the current WAL sequence number.
Every experience write (create, update, delete, reinforce) atomically increments the sequence. Returns 0 if no writes have occurred yet.
This is a read-only operation — the write-side logic is internal to the storage implementation to maintain transactional atomicity.
Sourcefn poll_watch_events(
&self,
since_seq: u64,
limit: usize,
) -> Result<(Vec<WatchEventRecord>, u64)>
fn poll_watch_events( &self, since_seq: u64, limit: usize, ) -> Result<(Vec<WatchEventRecord>, u64)>
Retrieves watch events with sequence numbers greater than since_seq.
Returns events in ascending sequence order and the highest sequence
number seen. If no new events exist, returns an empty vec and since_seq.
§Arguments
since_seq- Return events with sequence > this value (0 = all events)limit- Maximum number of events to return per call
Sourcefn poll_sync_events(
&self,
since_seq: u64,
limit: usize,
) -> Result<Vec<(u64, WatchEventRecord)>>
Available on crate feature sync only.
fn poll_sync_events( &self, since_seq: u64, limit: usize, ) -> Result<Vec<(u64, WatchEventRecord)>>
sync only.Retrieves ALL watch events (all entity types) with their sequence numbers.
Unlike poll_watch_events() which returns records without sequences,
this method returns (sequence, record) pairs needed by the sync
pusher to construct SyncChange objects.
Sourcefn instance_id(&self) -> InstanceId
Available on crate feature sync only.
fn instance_id(&self) -> InstanceId
sync only.Returns the persistent instance ID for this database.
Generated on first open and stable across restarts. Used by the sync protocol to identify this PulseDB instance.
Sourcefn save_sync_cursor(&self, cursor: &SyncCursor) -> Result<()>
Available on crate feature sync only.
fn save_sync_cursor(&self, cursor: &SyncCursor) -> Result<()>
sync only.Saves a sync cursor for a peer instance.
Upserts the cursor in the SYNC_CURSORS_TABLE.
Sourcefn load_sync_cursor(
&self,
instance_id: &InstanceId,
) -> Result<Option<SyncCursor>>
Available on crate feature sync only.
fn load_sync_cursor( &self, instance_id: &InstanceId, ) -> Result<Option<SyncCursor>>
sync only.Loads the sync cursor for a specific peer instance.
Returns None if no cursor has been saved for this peer.
Sourcefn list_sync_cursors(&self) -> Result<Vec<SyncCursor>>
Available on crate feature sync only.
fn list_sync_cursors(&self) -> Result<Vec<SyncCursor>>
sync only.Lists all saved sync cursors.
Returns cursors for all known peer instances.
Sourcefn compact_wal_events(&self, up_to_seq: u64) -> Result<u64>
Available on crate feature sync only.
fn compact_wal_events(&self, up_to_seq: u64) -> Result<u64>
sync only.Compacts the WAL by deleting events with sequence <= up_to_seq.
Returns the number of events deleted. This is a write operation that permanently removes old WAL entries to reclaim disk space.
§Safety
Only compact up to the minimum cursor across all peers — otherwise peers that haven’t synced yet will miss events.