Skip to main content

PulseDB

Struct PulseDB 

Source
pub struct PulseDB { /* private fields */ }
Expand description

The main PulseDB database handle.

This is the primary interface for all database operations. Create an instance with PulseDB::open() and close it with PulseDB::close().

§Ownership

PulseDB owns its storage and embedding service. When you call close(), the database is consumed and cannot be used afterward. This ensures resources are properly released.

Implementations§

Source§

impl PulseDB

Source

pub fn open(path: impl AsRef<Path>, config: Config) -> Result<Self>

Opens or creates a PulseDB database at the specified path.

If the database doesn’t exist, it will be created with the given configuration. If it exists, the configuration will be validated against the stored settings (e.g., embedding dimension must match).

§Arguments
  • path - Path to the database file (created if it doesn’t exist)
  • config - Configuration options for the database
§Errors

Returns an error if:

  • Configuration is invalid (see Config::validate)
  • Database file is corrupted
  • Database is locked by another process
  • Schema version doesn’t match (needs migration)
  • Embedding dimension doesn’t match existing database
§Example
use pulsedb::{PulseDB, Config, EmbeddingDimension};

// Open with default configuration
let db = PulseDB::open(dir.path().join("default.db"), Config::default())?;

// Open with custom embedding dimension
let db = PulseDB::open(dir.path().join("custom.db"), Config {
    embedding_dimension: EmbeddingDimension::D768,
    ..Default::default()
})?;
Source

pub fn close(self) -> Result<()>

Closes the database, flushing all pending writes.

This method consumes the PulseDB instance, ensuring it cannot be used after closing. The underlying storage engine flushes all buffered data to disk.

§Errors

Returns an error if the storage backend reports a flush failure. Note: the current redb backend flushes durably on drop, so this always returns Ok(()) in practice.

§Example
use pulsedb::{PulseDB, Config};

let db = PulseDB::open(dir.path().join("test.db"), Config::default())?;
// ... use the database ...
db.close()?;  // db is consumed here
// db.something() // Compile error: db was moved
Source

pub fn config(&self) -> &Config

Returns a reference to the database configuration.

This is the configuration that was used to open the database. Note that some settings (like embedding dimension) are locked on database creation and cannot be changed.

Source

pub fn metadata(&self) -> &DatabaseMetadata

Returns the database metadata.

Metadata includes schema version, embedding dimension, and timestamps for when the database was created and last opened.

Source

pub fn embedding_dimension(&self) -> usize

Returns the embedding dimension configured for this database.

All embeddings stored in this database must have exactly this many dimensions.

Source

pub fn is_read_only(&self) -> bool

Returns true if this database is in read-only mode.

Source

pub fn create_collective(&self, name: &str) -> Result<CollectiveId>

Creates a new collective with the given name.

The collective’s embedding dimension is locked to the database’s configured dimension at creation time.

§Arguments
  • name - Human-readable name (1-255 characters, not whitespace-only)
§Errors

Returns a validation error if the name is empty, whitespace-only, or exceeds 255 characters.

§Example
let id = db.create_collective("my-project")?;
Source

pub fn create_collective_with_owner( &self, name: &str, owner_id: &str, ) -> Result<CollectiveId>

Creates a new collective with an owner for multi-tenancy.

Same as create_collective but assigns an owner ID, enabling filtering with list_collectives_by_owner.

§Arguments
  • name - Human-readable name (1-255 characters)
  • owner_id - Owner identifier (must not be empty)
§Errors

Returns a validation error if the name or owner_id is invalid.

Source

pub fn get_collective(&self, id: CollectiveId) -> Result<Option<Collective>>

Returns a collective by ID, or None if not found.

§Example
if let Some(collective) = db.get_collective(id)? {
    println!("Found: {}", collective.name);
}
Source

pub fn list_collectives(&self) -> Result<Vec<Collective>>

Lists all collectives in the database.

Returns an empty vector if no collectives exist.

Source

pub fn list_collectives_by_owner( &self, owner_id: &str, ) -> Result<Vec<Collective>>

Lists collectives filtered by owner ID.

Returns only collectives whose owner_id matches the given value. Returns an empty vector if no matching collectives exist.

Source

pub fn get_collective_stats(&self, id: CollectiveId) -> Result<CollectiveStats>

Returns statistics for a collective.

§Errors

Returns NotFoundError::Collective if the collective doesn’t exist.

Source

pub fn delete_collective(&self, id: CollectiveId) -> Result<()>

Deletes a collective and all its associated data.

Performs cascade deletion: removes all experiences belonging to the collective before removing the collective record itself.

§Errors

Returns NotFoundError::Collective if the collective doesn’t exist.

§Example
db.delete_collective(collective_id)?;
assert!(db.get_collective(collective_id)?.is_none());
Source

pub fn record_experience(&self, exp: NewExperience) -> Result<ExperienceId>

Records a new experience in the database.

This is the primary method for storing agent-learned knowledge. The method:

  1. Validates the input (content, scores, tags, embedding)
  2. Verifies the collective exists
  3. Resolves the embedding (generates if Builtin, requires if External)
  4. Stores the experience atomically across 4 tables
§Arguments
§Errors
Source

pub fn get_experience(&self, id: ExperienceId) -> Result<Option<Experience>>

Retrieves an experience by ID, including its embedding.

Returns None if no experience with the given ID exists.

Source

pub fn update_experience( &self, id: ExperienceId, update: ExperienceUpdate, ) -> Result<()>

Updates mutable fields of an experience.

Only fields set to Some(...) in the update are changed. Content and embedding are immutable — create a new experience instead.

§Errors
Source

pub fn archive_experience(&self, id: ExperienceId) -> Result<()>

Archives an experience (soft-delete).

Archived experiences remain in storage but are excluded from search results. Use unarchive_experience to restore.

§Errors

Returns NotFoundError::Experience if the experience doesn’t exist.

Source

pub fn unarchive_experience(&self, id: ExperienceId) -> Result<()>

Restores an archived experience.

The experience will once again appear in search results.

§Errors

Returns NotFoundError::Experience if the experience doesn’t exist.

Source

pub fn delete_experience(&self, id: ExperienceId) -> Result<()>

Permanently deletes an experience and its embedding.

This removes the experience from all tables and indices. Unlike archiving, this is irreversible.

§Errors

Returns NotFoundError::Experience if the experience doesn’t exist.

Source

pub fn reinforce_experience(&self, id: ExperienceId) -> Result<u32>

Reinforces an experience by incrementing its application count.

Each call atomically increments the applications counter by 1. Returns the new application count.

§Errors

Returns NotFoundError::Experience if the experience doesn’t exist.

Source

pub fn list_experiences( &self, collective_id: CollectiveId, limit: usize, offset: usize, ) -> Result<Vec<Experience>>

Lists experiences in a collective with pagination.

Returns full Experience records (including embeddings) ordered by timestamp. Use offset and limit for pagination.

Designed for visualization tools (PulseVision) that need to enumerate the entire embedding space of a collective.

Source

pub fn list_relations( &self, collective_id: CollectiveId, limit: usize, offset: usize, ) -> Result<Vec<ExperienceRelation>>

Lists relations in a collective with pagination.

Source

pub fn list_insights( &self, collective_id: CollectiveId, limit: usize, offset: usize, ) -> Result<Vec<DerivedInsight>>

Lists insights in a collective with pagination.

Returns full DerivedInsight records including embeddings.

Source

pub fn get_recent_experiences( &self, collective_id: CollectiveId, limit: usize, ) -> Result<Vec<Experience>>

Retrieves the most recent experiences in a collective.

Returns full experiences ordered by timestamp (newest first).

Source

pub fn get_recent_experiences_filtered( &self, collective_id: CollectiveId, limit: usize, filter: SearchFilter, ) -> Result<Vec<Experience>>

Retrieves the most recent experiences in a collective with filtering.

Like get_recent_experiences(), but applies additional filters on domain, experience type, importance, confidence, and timestamp.

Over-fetches from storage (2x limit) to account for entries removed by post-filtering, then truncates to the requested limit.

§Arguments
  • collective_id - The collective to query
  • limit - Maximum number of experiences to return (1-1000)
  • filter - Filter criteria to apply
§Errors
Source

pub fn search_similar( &self, collective_id: CollectiveId, query: &[f32], k: usize, ) -> Result<Vec<SearchResult>>

Searches for experiences semantically similar to the query embedding.

Uses the HNSW vector index for approximate nearest neighbor search, then fetches full experience records from storage. Archived experiences are excluded by default.

Results are sorted by similarity descending (most similar first). Similarity is computed as 1.0 - cosine_distance.

§Arguments
  • collective_id - The collective to search within
  • query - Query embedding vector (must match collective’s dimension)
  • k - Maximum number of results to return (1-1000)
§Errors
§Example
let query = vec![0.1f32; 384]; // Your query embedding
let results = db.search_similar(collective_id, &query, 10)?;
for result in &results {
    println!(
        "[{:.3}] {}",
        result.similarity, result.experience.content
    );
}
Source

pub fn search_similar_filtered( &self, collective_id: CollectiveId, query: &[f32], k: usize, filter: SearchFilter, ) -> Result<Vec<SearchResult>>

Searches for semantically similar experiences with additional filtering.

Like search_similar(), but applies additional filters on domain, experience type, importance, confidence, and timestamp.

Over-fetches from the HNSW index (2x k) to account for entries removed by post-filtering, then truncates to the requested k.

§Arguments
  • collective_id - The collective to search within
  • query - Query embedding vector (must match collective’s dimension)
  • k - Maximum number of results to return (1-1000)
  • filter - Filter criteria to apply after vector search
§Errors
§Example
use pulsedb::SearchFilter;

let filter = SearchFilter {
    domains: Some(vec!["rust".to_string()]),
    min_importance: Some(0.5),
    ..SearchFilter::default()
};
let results = db.search_similar_filtered(
    collective_id,
    &query_embedding,
    10,
    filter,
)?;
Source

pub fn store_relation( &self, relation: NewExperienceRelation, ) -> Result<RelationId>

Stores a new relation between two experiences.

Relations are typed, directed edges connecting a source experience to a target experience. Both experiences must exist and belong to the same collective. Duplicate relations (same source, target, and type) are rejected.

§Arguments
  • relation - The relation to create (source, target, type, strength)
§Errors

Returns an error if:

  • Source or target experience doesn’t exist (NotFoundError::Experience)
  • Experiences belong to different collectives (ValidationError::InvalidField)
  • A relation with the same (source, target, type) already exists
  • Self-relation attempted (source == target)
  • Strength is out of range [0.0, 1.0]

Retrieves experiences related to the given experience.

Returns pairs of (Experience, ExperienceRelation) based on the requested direction:

  • Outgoing: experiences that this experience points TO (as source)
  • Incoming: experiences that point TO this experience (as target)
  • Both: union of outgoing and incoming

To filter by relation type, use get_related_experiences_filtered.

Silently skips relations where the related experience no longer exists (orphan tolerance).

§Errors

Returns a storage error if the read transaction fails.

Retrieves experiences related to the given experience, with optional type filtering.

Like get_related_experiences(), but accepts an optional RelationType filter. When Some(rt), only relations matching that type are returned.

§Arguments
  • experience_id - The experience to query relations for
  • direction - Which direction(s) to traverse
  • relation_type - If Some, only return relations of this type
§Example
use pulsedb::{RelationType, RelationDirection};

// Only "Supports" relations outgoing from exp_a
let supports = db.get_related_experiences_filtered(
    exp_a,
    RelationDirection::Outgoing,
    Some(RelationType::Supports),
)?;
Source

pub fn get_relation(&self, id: RelationId) -> Result<Option<ExperienceRelation>>

Retrieves a relation by ID.

Returns None if no relation with the given ID exists.

Source

pub fn delete_relation(&self, id: RelationId) -> Result<()>

Deletes a relation by ID.

§Errors

Returns NotFoundError::Relation if no relation with the given ID exists.

Source

pub fn store_insight(&self, insight: NewDerivedInsight) -> Result<InsightId>

Stores a new derived insight.

Creates a synthesized knowledge record from multiple source experiences. The method:

  1. Validates the input (content, confidence, sources)
  2. Verifies the collective exists
  3. Verifies all source experiences exist and belong to the same collective
  4. Resolves the embedding (generates if Builtin, requires if External)
  5. Stores the insight with inline embedding
  6. Inserts into the insight HNSW index
§Arguments
§Errors
Source

pub fn get_insight(&self, id: InsightId) -> Result<Option<DerivedInsight>>

Retrieves a derived insight by ID.

Returns None if no insight with the given ID exists.

Source

pub fn get_insights( &self, collective_id: CollectiveId, query: &[f32], k: usize, ) -> Result<Vec<(DerivedInsight, f32)>>

Searches for insights semantically similar to the query embedding.

Uses the insight-specific HNSW index for approximate nearest neighbor search, then fetches full insight records from storage.

§Arguments
  • collective_id - The collective to search within
  • query - Query embedding vector (must match collective’s dimension)
  • k - Maximum number of results to return
§Errors
Source

pub fn delete_insight(&self, id: InsightId) -> Result<()>

Deletes a derived insight by ID.

Removes the insight from storage and soft-deletes it from the HNSW index.

§Errors

Returns NotFoundError::Insight if no insight with the given ID exists.

Source

pub fn register_activity(&self, activity: NewActivity) -> Result<()>

Registers an agent’s presence in a collective.

Creates a new activity record or replaces an existing one for the same (collective_id, agent_id) pair (upsert semantics). Both started_at and last_heartbeat are set to Timestamp::now().

§Arguments
  • activity - The activity registration (see NewActivity)
§Errors
§Example
use pulsedb::NewActivity;

db.register_activity(NewActivity {
    agent_id: "claude-opus".to_string(),
    collective_id,
    current_task: Some("Reviewing pull request".to_string()),
    context_summary: None,
})?;
Source

pub fn update_heartbeat( &self, agent_id: &str, collective_id: CollectiveId, ) -> Result<()>

Updates an agent’s heartbeat timestamp.

Refreshes the last_heartbeat to Timestamp::now() without changing any other fields. The agent must have an existing activity registered.

§Errors
Source

pub fn end_activity( &self, agent_id: &str, collective_id: CollectiveId, ) -> Result<()>

Ends an agent’s activity in a collective.

Removes the activity record. After calling this, the agent will no longer appear in get_active_agents() results.

§Errors
Source

pub fn get_active_agents( &self, collective_id: CollectiveId, ) -> Result<Vec<Activity>>

Returns all active (non-stale) agents in a collective.

Fetches all activities, filters out those whose last_heartbeat is older than config.activity.stale_threshold, and returns the rest sorted by last_heartbeat descending (most recently active first).

§Errors
Source

pub fn get_context_candidates( &self, request: ContextRequest, ) -> Result<ContextCandidates>

Retrieves unified context candidates from all retrieval primitives.

This is the primary API for context assembly. It orchestrates:

  1. Similarity search (search_similar_filtered)
  2. Recent experiences (get_recent_experiences_filtered)
  3. Insight search (get_insights) — if requested
  4. Relation collection (get_related_experiences) — if requested
  5. Active agents (get_active_agents) — if requested
§Arguments
  • request - Configuration for which primitives to query and limits
§Errors
§Performance

Target: < 100ms at 100K experiences. The similarity search (~50ms) dominates; all other sub-calls are < 10ms each.

§Example
use pulsedb::{ContextRequest, SearchFilter};

let candidates = db.get_context_candidates(ContextRequest {
    collective_id,
    query_embedding: query_vec,
    max_similar: 10,
    max_recent: 5,
    include_insights: true,
    include_relations: true,
    include_active_agents: true,
    filter: SearchFilter {
        domains: Some(vec!["rust".to_string()]),
        ..SearchFilter::default()
    },
    ..ContextRequest::default()
})?;
Source

pub fn watch_experiences( &self, collective_id: CollectiveId, ) -> Result<WatchStream>

Subscribes to all experience changes in a collective.

Returns a WatchStream that yields WatchEvent values for every create, update, archive, and delete operation. The stream ends when dropped or when the PulseDB instance is closed.

Multiple subscribers per collective are supported. Each gets an independent copy of every event.

§Example
use futures::StreamExt;

let mut stream = db.watch_experiences(collective_id)?;
while let Some(event) = stream.next().await {
    println!("{:?}: {}", event.event_type, event.experience_id);
}
Source

pub fn watch_experiences_filtered( &self, collective_id: CollectiveId, filter: WatchFilter, ) -> Result<WatchStream>

Subscribes to filtered experience changes in a collective.

Like watch_experiences, but only delivers events that match the filter criteria. Filters are applied on the sender side before channel delivery.

§Example
use pulsedb::WatchFilter;

let filter = WatchFilter {
    domains: Some(vec!["security".to_string()]),
    min_importance: Some(0.7),
    ..Default::default()
};
let mut stream = db.watch_experiences_filtered(collective_id, filter)?;
Source

pub fn get_current_sequence(&self) -> Result<u64>

Returns the current WAL sequence number.

Use this to establish a baseline before starting to poll for changes. Returns 0 if no experience writes have occurred yet.

§Example
let seq = db.get_current_sequence()?;
// ... later ...
let (events, new_seq) = db.poll_changes(seq)?;
Source

pub fn poll_changes(&self, since_seq: u64) -> Result<(Vec<WatchEvent>, u64)>

Polls for experience changes since the given sequence number.

Returns a tuple of (events, new_sequence):

  • events: New WatchEvents in sequence order
  • new_sequence: Pass this value back on the next call

Returns an empty vec and the same sequence if no changes exist.

§Arguments
  • since_seq - The last sequence number you received (0 for first call)
§Performance

Target: < 10ms per call. Internally performs a range scan on the watch_events table, O(k) where k is the number of new events.

§Example
use std::time::Duration;

let mut seq = 0u64;
loop {
    let (events, new_seq) = db.poll_changes(seq)?;
    seq = new_seq;
    for event in events {
        println!("{:?}: {}", event.event_type, event.experience_id);
    }
    std::thread::sleep(Duration::from_millis(100));
}
Source

pub fn poll_changes_batch( &self, since_seq: u64, limit: usize, ) -> Result<(Vec<WatchEvent>, u64)>

Polls for changes with a custom batch size limit.

Same as poll_changes but returns at most limit events per call. Use this for backpressure control.

Source

pub fn compact_wal(&self) -> Result<u64>

Available on crate feature sync only.

Compacts the WAL by removing events that all peers have already synced.

Finds the minimum cursor across all known peers and deletes WAL events up to that sequence. If no peers exist, no compaction occurs (events may be needed when a peer connects later).

Call this periodically (e.g., daily) to reclaim disk space. Returns the number of WAL events deleted.

§Example
let deleted = db.compact_wal()?;
println!("Compacted {} WAL events", deleted);
Source

pub fn apply_synced_experience(&self, experience: Experience) -> Result<()>

Available on crate feature sync only.

Applies a synced experience from a remote peer.

Writes the full experience to storage and inserts into HNSW. Caller must hold SyncApplyGuard to suppress WAL recording.

Source

pub fn apply_synced_experience_update( &self, id: ExperienceId, update: ExperienceUpdate, ) -> Result<()>

Available on crate feature sync only.

Applies a synced experience update from a remote peer.

Caller must hold SyncApplyGuard to suppress WAL recording.

Source

pub fn apply_synced_experience_delete(&self, id: ExperienceId) -> Result<()>

Available on crate feature sync only.

Applies a synced experience deletion from a remote peer.

Removes from storage and soft-deletes from HNSW. Caller must hold SyncApplyGuard to suppress WAL recording.

Source

pub fn apply_synced_relation(&self, relation: ExperienceRelation) -> Result<()>

Available on crate feature sync only.

Applies a synced relation from a remote peer.

Caller must hold SyncApplyGuard to suppress WAL recording.

Source

pub fn apply_synced_relation_delete(&self, id: RelationId) -> Result<()>

Available on crate feature sync only.

Applies a synced relation deletion from a remote peer.

Caller must hold SyncApplyGuard to suppress WAL recording.

Source

pub fn apply_synced_insight(&self, insight: DerivedInsight) -> Result<()>

Available on crate feature sync only.

Applies a synced insight from a remote peer.

Writes to storage and inserts into insight HNSW index. Caller must hold SyncApplyGuard to suppress WAL recording.

Source

pub fn apply_synced_insight_delete(&self, id: InsightId) -> Result<()>

Available on crate feature sync only.

Applies a synced insight deletion from a remote peer.

Removes from storage and soft-deletes from insight HNSW. Caller must hold SyncApplyGuard to suppress WAL recording.

Source

pub fn apply_synced_collective(&self, collective: Collective) -> Result<()>

Available on crate feature sync only.

Applies a synced collective from a remote peer.

Writes to storage and creates HNSW indexes for the collective. Caller must hold SyncApplyGuard to suppress WAL recording.

Trait Implementations§

Source§

impl Debug for PulseDB

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more