Skip to main content

StorageEngine

Trait StorageEngine 

Source
pub trait StorageEngine: Send + Sync {
Show 46 methods // Required methods fn metadata(&self) -> &DatabaseMetadata; fn close(self: Box<Self>) -> Result<()>; fn path(&self) -> Option<&Path>; fn save_collective(&self, collective: &Collective) -> Result<()>; fn get_collective(&self, id: CollectiveId) -> Result<Option<Collective>>; fn list_collectives(&self) -> Result<Vec<Collective>>; fn delete_collective(&self, id: CollectiveId) -> Result<bool>; fn count_experiences_in_collective(&self, id: CollectiveId) -> Result<u64>; fn delete_experiences_by_collective(&self, id: CollectiveId) -> Result<u64>; fn list_experience_ids_in_collective( &self, id: CollectiveId, ) -> Result<Vec<ExperienceId>>; fn get_recent_experience_ids( &self, collective_id: CollectiveId, limit: usize, ) -> Result<Vec<(ExperienceId, Timestamp)>>; fn save_experience(&self, experience: &Experience) -> Result<()>; fn get_experience(&self, id: ExperienceId) -> Result<Option<Experience>>; fn update_experience( &self, id: ExperienceId, update: &ExperienceUpdate, ) -> Result<bool>; fn delete_experience(&self, id: ExperienceId) -> Result<bool>; fn reinforce_experience(&self, id: ExperienceId) -> Result<Option<u32>>; fn save_embedding(&self, id: ExperienceId, embedding: &[f32]) -> Result<()>; fn get_embedding(&self, id: ExperienceId) -> Result<Option<Vec<f32>>>; fn save_relation(&self, relation: &ExperienceRelation) -> Result<()>; fn get_relation(&self, id: RelationId) -> Result<Option<ExperienceRelation>>; fn delete_relation(&self, id: RelationId) -> Result<bool>; fn get_relation_ids_by_source( &self, experience_id: ExperienceId, ) -> Result<Vec<RelationId>>; fn get_relation_ids_by_target( &self, experience_id: ExperienceId, ) -> Result<Vec<RelationId>>; fn delete_relations_for_experience( &self, experience_id: ExperienceId, ) -> Result<u64>; fn relation_exists( &self, source_id: ExperienceId, target_id: ExperienceId, relation_type: RelationType, ) -> Result<bool>; fn save_insight(&self, insight: &DerivedInsight) -> Result<()>; fn get_insight(&self, id: InsightId) -> Result<Option<DerivedInsight>>; fn delete_insight(&self, id: InsightId) -> Result<bool>; fn list_insight_ids_in_collective( &self, id: CollectiveId, ) -> Result<Vec<InsightId>>; fn delete_insights_by_collective(&self, id: CollectiveId) -> Result<u64>; fn save_activity(&self, activity: &Activity) -> Result<()>; fn get_activity( &self, agent_id: &str, collective_id: CollectiveId, ) -> Result<Option<Activity>>; fn delete_activity( &self, agent_id: &str, collective_id: CollectiveId, ) -> Result<bool>; fn list_activities_in_collective( &self, collective_id: CollectiveId, ) -> Result<Vec<Activity>>; fn delete_activities_by_collective( &self, collective_id: CollectiveId, ) -> Result<u64>; fn list_experience_ids_paginated( &self, collective_id: CollectiveId, limit: usize, offset: usize, ) -> Result<Vec<ExperienceId>>; fn list_relations_in_collective( &self, collective_id: CollectiveId, limit: usize, offset: usize, ) -> Result<Vec<ExperienceRelation>>; fn list_insight_ids_paginated( &self, collective_id: CollectiveId, limit: usize, offset: usize, ) -> Result<Vec<InsightId>>; fn get_wal_sequence(&self) -> Result<u64>; fn poll_watch_events( &self, since_seq: u64, limit: usize, ) -> Result<(Vec<WatchEventRecord>, u64)>; fn poll_sync_events( &self, since_seq: u64, limit: usize, ) -> Result<Vec<(u64, WatchEventRecord)>>; fn instance_id(&self) -> InstanceId; fn save_sync_cursor(&self, cursor: &SyncCursor) -> Result<()>; fn load_sync_cursor( &self, instance_id: &InstanceId, ) -> Result<Option<SyncCursor>>; fn list_sync_cursors(&self) -> Result<Vec<SyncCursor>>; fn compact_wal_events(&self, up_to_seq: u64) -> Result<u64>;
}
Expand description

Storage engine trait for PulseDB.

This trait defines the contract that any storage backend must implement. The primary implementation is RedbStorage, but other implementations can be created for testing or alternative backends.

§Thread Safety

Implementations must be Send + Sync to allow the database to be shared across threads. The engine handles internal synchronization.

§Example

use pulsedb::{Config, storage::{StorageEngine, RedbStorage}};

let config = Config::default();
let storage = RedbStorage::open(dir.path().join("test.db"), &config)?;
let metadata = storage.metadata();
println!("Schema version: {}", metadata.schema_version);

Required Methods§

Source

fn metadata(&self) -> &DatabaseMetadata

Returns the database metadata.

The metadata includes schema version, embedding dimension, and timestamps.

Source

fn close(self: Box<Self>) -> Result<()>

Closes the storage engine, flushing any pending writes.

This method consumes the storage engine. After calling close(), the engine cannot be used.

§Errors

Returns an error if the backend supports reporting flush failures. Note: the current redb backend flushes on drop (infallible), so this always returns Ok(()) for RedbStorage.

Source

fn path(&self) -> Option<&Path>

Returns the path to the database file, if applicable.

Some storage implementations (like in-memory) may not have a path.

Source

fn save_collective(&self, collective: &Collective) -> Result<()>

Saves a collective to storage.

If a collective with the same ID already exists, it is overwritten. Each call opens and commits its own write transaction.

§Errors

Returns an error if the transaction or serialization fails.

Source

fn get_collective(&self, id: CollectiveId) -> Result<Option<Collective>>

Retrieves a collective by ID.

Returns None if no collective with the given ID exists.

§Errors

Returns an error if the read transaction or deserialization fails.

Source

fn list_collectives(&self) -> Result<Vec<Collective>>

Lists all collectives in the database.

Returns an empty vector if no collectives exist.

§Errors

Returns an error if the read transaction or deserialization fails.

Source

fn delete_collective(&self, id: CollectiveId) -> Result<bool>

Deletes a collective by ID.

Returns true if the collective existed and was deleted, false if no collective with the given ID was found.

§Errors

Returns an error if the write transaction fails.

Source

fn count_experiences_in_collective(&self, id: CollectiveId) -> Result<u64>

Counts experiences belonging to a collective.

Queries the experiences_by_collective multimap index. Returns 0 if no experiences exist for the collective.

§Errors

Returns an error if the read transaction fails.

Source

fn delete_experiences_by_collective(&self, id: CollectiveId) -> Result<u64>

Deletes all experiences and related index entries for a collective.

Used for cascade deletion when a collective is removed. Cleans up:

  • Experience records
  • Embedding vectors
  • By-collective index entries
  • By-type index entries

Returns the number of experiences deleted.

§Errors

Returns an error if the write transaction fails.

Source

fn list_experience_ids_in_collective( &self, id: CollectiveId, ) -> Result<Vec<ExperienceId>>

Lists all experience IDs belonging to a collective.

Used to rebuild HNSW indexes from redb embeddings on startup. Iterates the experiences_by_collective multimap index.

Source

fn get_recent_experience_ids( &self, collective_id: CollectiveId, limit: usize, ) -> Result<Vec<(ExperienceId, Timestamp)>>

Retrieves the most recent experience IDs in a collective.

Performs a reverse iteration on EXPERIENCES_BY_COLLECTIVE_TABLE to get IDs ordered by timestamp descending (newest first). The multimap values are [timestamp_be: 8 bytes][experience_id: 16 bytes], and since timestamps are big-endian, reverse lexicographic order = newest first.

Returns (ExperienceId, Timestamp) pairs for the caller to fetch full records and apply post-filters.

§Arguments
  • collective_id - The collective to query
  • limit - Maximum number of entries to return
Source

fn save_experience(&self, experience: &Experience) -> Result<()>

Saves an experience and its embedding to storage.

Writes atomically to 4 tables in a single transaction:

  • EXPERIENCES_TABLE — the experience record (without embedding)
  • EMBEDDINGS_TABLE — the embedding vector as raw f32 bytes
  • EXPERIENCES_BY_COLLECTIVE_TABLE — secondary index by collective+timestamp
  • EXPERIENCES_BY_TYPE_TABLE — secondary index by collective+type
§Errors

Returns an error if the transaction or serialization fails.

Source

fn get_experience(&self, id: ExperienceId) -> Result<Option<Experience>>

Retrieves an experience by ID, including its embedding.

Reads from both EXPERIENCES_TABLE and EMBEDDINGS_TABLE to reconstitute the full experience with embedding.

Returns None if no experience with the given ID exists.

Source

fn update_experience( &self, id: ExperienceId, update: &ExperienceUpdate, ) -> Result<bool>

Updates mutable fields of an experience.

Applies only the Some fields from the update. Immutable fields (content, embedding, collective_id, type) are not affected.

Returns true if the experience existed and was updated, false if not found.

Source

fn delete_experience(&self, id: ExperienceId) -> Result<bool>

Permanently deletes an experience and its embedding.

Removes from all 4 tables in a single transaction.

Returns true if the experience existed and was deleted, false if not found.

Source

fn reinforce_experience(&self, id: ExperienceId) -> Result<Option<u32>>

Atomically increments the applications counter for an experience.

Performs a read-modify-write in a single write transaction to prevent lost updates under concurrent access. Uses saturating arithmetic (caps at u32::MAX, never panics).

Returns Some(new_count) if the experience was found and updated, None if no experience with the given ID exists.

Source

fn save_embedding(&self, id: ExperienceId, embedding: &[f32]) -> Result<()>

Saves an embedding vector to storage.

The embedding is stored as raw little-endian f32 bytes.

Source

fn get_embedding(&self, id: ExperienceId) -> Result<Option<Vec<f32>>>

Retrieves an embedding vector by experience ID.

Returns None if no embedding exists for the given ID.

Source

fn save_relation(&self, relation: &ExperienceRelation) -> Result<()>

Saves a relation and its index entries atomically.

Writes to 3 tables in a single transaction:

  • RELATIONS_TABLE — the relation record
  • RELATIONS_BY_SOURCE_TABLE — index by source experience
  • RELATIONS_BY_TARGET_TABLE — index by target experience
Source

fn get_relation(&self, id: RelationId) -> Result<Option<ExperienceRelation>>

Retrieves a relation by ID.

Returns None if no relation with the given ID exists.

Source

fn delete_relation(&self, id: RelationId) -> Result<bool>

Deletes a relation and its index entries atomically.

Returns true if the relation existed and was deleted, false if not found.

Source

fn get_relation_ids_by_source( &self, experience_id: ExperienceId, ) -> Result<Vec<RelationId>>

Finds all relation IDs where the given experience is the source.

Iterates the RELATIONS_BY_SOURCE_TABLE multimap for the experience.

Source

fn get_relation_ids_by_target( &self, experience_id: ExperienceId, ) -> Result<Vec<RelationId>>

Finds all relation IDs where the given experience is the target.

Iterates the RELATIONS_BY_TARGET_TABLE multimap for the experience.

Source

fn delete_relations_for_experience( &self, experience_id: ExperienceId, ) -> Result<u64>

Deletes all relations where the given experience is source or target.

Used for cascade deletion when an experience is removed. Returns the count of deleted relations.

Source

fn relation_exists( &self, source_id: ExperienceId, target_id: ExperienceId, relation_type: RelationType, ) -> Result<bool>

Checks if a relation with the same (source, target, type) already exists.

Scans the source index, loads each relation, and checks for a matching target and type. Efficient for the expected cardinality (few relations per experience).

Source

fn save_insight(&self, insight: &DerivedInsight) -> Result<()>

Saves a derived insight and its index entries atomically.

Writes to 2 tables in a single transaction:

  • INSIGHTS_TABLE — the insight record (with inline embedding)
  • INSIGHTS_BY_COLLECTIVE_TABLE — index by collective
Source

fn get_insight(&self, id: InsightId) -> Result<Option<DerivedInsight>>

Retrieves a derived insight by ID.

Returns None if no insight with the given ID exists.

Source

fn delete_insight(&self, id: InsightId) -> Result<bool>

Deletes a derived insight and its index entries atomically.

Returns true if the insight existed and was deleted, false if not found.

Source

fn list_insight_ids_in_collective( &self, id: CollectiveId, ) -> Result<Vec<InsightId>>

Lists all insight IDs belonging to a collective.

Used to rebuild HNSW indexes from stored insights on startup. Iterates the INSIGHTS_BY_COLLECTIVE_TABLE multimap.

Source

fn delete_insights_by_collective(&self, id: CollectiveId) -> Result<u64>

Deletes all insights belonging to a collective.

Used for cascade deletion when a collective is removed. Returns the count of deleted insights.

Source

fn save_activity(&self, activity: &Activity) -> Result<()>

Saves an agent activity to storage (upsert).

If an activity for the same (collective_id, agent_id) already exists, it is replaced. Uses the composite key encoding from schema::encode_activity_key.

Source

fn get_activity( &self, agent_id: &str, collective_id: CollectiveId, ) -> Result<Option<Activity>>

Retrieves an agent activity by agent ID and collective.

Returns None if no activity exists for the given pair.

Source

fn delete_activity( &self, agent_id: &str, collective_id: CollectiveId, ) -> Result<bool>

Deletes an agent activity.

Returns true if the activity existed and was deleted, false if no activity was found for the given pair.

Source

fn list_activities_in_collective( &self, collective_id: CollectiveId, ) -> Result<Vec<Activity>>

Lists all activities in a collective.

Iterates the ACTIVITIES_TABLE and filters entries whose key starts with the collective’s 16-byte ID. Returns activities in no guaranteed order.

Source

fn delete_activities_by_collective( &self, collective_id: CollectiveId, ) -> Result<u64>

Deletes all activities belonging to a collective.

Used for cascade deletion when a collective is removed. Returns the count of deleted activities.

Source

fn list_experience_ids_paginated( &self, collective_id: CollectiveId, limit: usize, offset: usize, ) -> Result<Vec<ExperienceId>>

Lists experience IDs in a collective with pagination.

Returns IDs ordered by timestamp (oldest first). Use offset to skip previously fetched pages and limit to control page size.

Source

fn list_relations_in_collective( &self, collective_id: CollectiveId, limit: usize, offset: usize, ) -> Result<Vec<ExperienceRelation>>

Lists all relations in a collective with pagination.

Scans relations whose source experience belongs to the collective.

Source

fn list_insight_ids_paginated( &self, collective_id: CollectiveId, limit: usize, offset: usize, ) -> Result<Vec<InsightId>>

Lists insight IDs in a collective with pagination.

Source

fn get_wal_sequence(&self) -> Result<u64>

Returns the current WAL sequence number.

Every experience write (create, update, delete, reinforce) atomically increments the sequence. Returns 0 if no writes have occurred yet.

This is a read-only operation — the write-side logic is internal to the storage implementation to maintain transactional atomicity.

Source

fn poll_watch_events( &self, since_seq: u64, limit: usize, ) -> Result<(Vec<WatchEventRecord>, u64)>

Retrieves watch events with sequence numbers greater than since_seq.

Returns events in ascending sequence order and the highest sequence number seen. If no new events exist, returns an empty vec and since_seq.

§Arguments
  • since_seq - Return events with sequence > this value (0 = all events)
  • limit - Maximum number of events to return per call
Source

fn poll_sync_events( &self, since_seq: u64, limit: usize, ) -> Result<Vec<(u64, WatchEventRecord)>>

Available on crate feature sync only.

Retrieves ALL watch events (all entity types) with their sequence numbers.

Unlike poll_watch_events() which returns records without sequences, this method returns (sequence, record) pairs needed by the sync pusher to construct SyncChange objects.

Source

fn instance_id(&self) -> InstanceId

Available on crate feature sync only.

Returns the persistent instance ID for this database.

Generated on first open and stable across restarts. Used by the sync protocol to identify this PulseDB instance.

Source

fn save_sync_cursor(&self, cursor: &SyncCursor) -> Result<()>

Available on crate feature sync only.

Saves a sync cursor for a peer instance.

Upserts the cursor in the SYNC_CURSORS_TABLE.

Source

fn load_sync_cursor( &self, instance_id: &InstanceId, ) -> Result<Option<SyncCursor>>

Available on crate feature sync only.

Loads the sync cursor for a specific peer instance.

Returns None if no cursor has been saved for this peer.

Source

fn list_sync_cursors(&self) -> Result<Vec<SyncCursor>>

Available on crate feature sync only.

Lists all saved sync cursors.

Returns cursors for all known peer instances.

Source

fn compact_wal_events(&self, up_to_seq: u64) -> Result<u64>

Available on crate feature sync only.

Compacts the WAL by deleting events with sequence <= up_to_seq.

Returns the number of events deleted. This is a write operation that permanently removes old WAL entries to reclaim disk space.

§Safety

Only compact up to the minimum cursor across all peers — otherwise peers that haven’t synced yet will miss events.

Implementors§