Skip to main content

pulsedb/substrate/
mod.rs

1//! Async storage trait for integrating PulseDB with agent frameworks.
2//!
3//! This module defines the async interface for integrating PulseDB with
4//! agent frameworks and orchestration layers. Consumers hold a
5//! `Box<dyn SubstrateProvider>` to interact with the database without
6//! knowing the concrete storage implementation.
7//!
8//! # Architecture
9//!
10//! ```text
11//! ┌──────────────────────┐       ┌──────────────────────┐
12//! │   Agent Framework    │       │       PulseDB         │
13//! │                      │       │                       │
14//! │  Orchestrator ───────┼──────►│  PulseDBSubstrate     │
15//! │  Box<dyn Substrate>  │       │  (Arc<PulseDB>)       │
16//! │                      │       │                       │
17//! │  Agents interact     │       │  spawn_blocking ──►   │
18//! │  through the trait   │       │  sync storage ops     │
19//! │                      │       │                       │
20//! └──────────────────────┘       └──────────────────────┘
21//! ```
22//!
23//! # Example
24//!
25//! ```rust,no_run
26//! # fn main() -> pulsedb::Result<()> {
27//! # let dir = tempfile::tempdir().unwrap();
28//! use std::sync::Arc;
29//! use pulsedb::{PulseDB, Config, PulseDBSubstrate, SubstrateProvider};
30//!
31//! // Create PulseDB and wrap in substrate
32//! let db = Arc::new(PulseDB::open(dir.path().join("test.db"), Config::default())?);
33//! let substrate = PulseDBSubstrate::new(db);
34//!
35//! // Use as trait object
36//! let provider: Box<dyn SubstrateProvider> = Box::new(substrate);
37//!
38//! // All operations are async (shown here for illustration)
39//! // let exp_id = provider.store_experience(new_exp).await?;
40//! // let results = provider.search_similar(collective, &embedding, 10).await?;
41//! # Ok(())
42//! # }
43//! ```
44
45mod r#impl;
46
47pub use r#impl::PulseDBSubstrate;
48
49use std::pin::Pin;
50
51use async_trait::async_trait;
52use futures_core::Stream;
53
54use crate::activity::Activity;
55use crate::collective::Collective;
56use crate::error::PulseDBError;
57use crate::experience::{Experience, NewExperience};
58use crate::insight::{DerivedInsight, NewDerivedInsight};
59use crate::relation::{ExperienceRelation, NewExperienceRelation};
60use crate::search::{ContextCandidates, ContextRequest};
61use crate::types::{CollectiveId, ExperienceId, InsightId, RelationId};
62use crate::watch::WatchEvent;
63
64/// Async storage interface for agent framework integration.
65///
66/// This trait abstracts PulseDB's storage capabilities behind an async
67/// boundary, enabling agent frameworks to interact with the database
68/// without blocking the async runtime.
69///
70/// # Object Safety
71///
72/// `SubstrateProvider` is object-safe via `#[async_trait]`, allowing it to
73/// be used as `Box<dyn SubstrateProvider>` in any async context.
74///
75/// # Implementors
76///
77/// - [`PulseDBSubstrate`] — production implementation wrapping `Arc<PulseDB>`
78#[async_trait]
79pub trait SubstrateProvider: Send + Sync {
80    /// Stores a new experience and returns its assigned ID.
81    ///
82    /// Generates an embedding (if configured), writes to storage, and
83    /// indexes in the collective's HNSW graph.
84    async fn store_experience(&self, exp: NewExperience) -> Result<ExperienceId, PulseDBError>;
85
86    /// Retrieves an experience by ID, or `None` if it doesn't exist.
87    async fn get_experience(&self, id: ExperienceId) -> Result<Option<Experience>, PulseDBError>;
88
89    /// Searches for experiences similar to the given embedding.
90    ///
91    /// Returns up to `k` results as `(Experience, similarity_score)` tuples,
92    /// sorted by similarity descending (1.0 = identical).
93    async fn search_similar(
94        &self,
95        collective: CollectiveId,
96        embedding: &[f32],
97        k: usize,
98    ) -> Result<Vec<(Experience, f32)>, PulseDBError>;
99
100    /// Retrieves the most recent experiences from a collective.
101    ///
102    /// Returns up to `limit` experiences sorted by timestamp descending.
103    async fn get_recent(
104        &self,
105        collective: CollectiveId,
106        limit: usize,
107    ) -> Result<Vec<Experience>, PulseDBError>;
108
109    /// Stores a relation between two experiences.
110    async fn store_relation(&self, rel: NewExperienceRelation) -> Result<RelationId, PulseDBError>;
111
112    /// Retrieves all experiences related to the given experience (both directions).
113    ///
114    /// Returns `(related_experience, relation)` tuples for both outgoing
115    /// and incoming relations.
116    async fn get_related(
117        &self,
118        exp_id: ExperienceId,
119    ) -> Result<Vec<(Experience, ExperienceRelation)>, PulseDBError>;
120
121    /// Stores a derived insight synthesized from source experiences.
122    async fn store_insight(&self, insight: NewDerivedInsight) -> Result<InsightId, PulseDBError>;
123
124    /// Searches for insights similar to the given embedding.
125    ///
126    /// Returns up to `k` results as `(DerivedInsight, similarity_score)` tuples.
127    async fn get_insights(
128        &self,
129        collective: CollectiveId,
130        embedding: &[f32],
131        k: usize,
132    ) -> Result<Vec<(DerivedInsight, f32)>, PulseDBError>;
133
134    /// Retrieves active (non-stale) agent activities in a collective.
135    async fn get_activities(&self, collective: CollectiveId)
136        -> Result<Vec<Activity>, PulseDBError>;
137
138    /// Assembles context candidates from all retrieval primitives.
139    ///
140    /// Orchestrates similarity search, recent experiences, insights,
141    /// relations, and active agents into a single response.
142    async fn get_context_candidates(
143        &self,
144        request: ContextRequest,
145    ) -> Result<ContextCandidates, PulseDBError>;
146
147    /// Subscribes to real-time experience change events in a collective.
148    ///
149    /// Returns a `Stream` that yields [`WatchEvent`] values whenever
150    /// experiences are created, updated, archived, or deleted.
151    async fn watch(
152        &self,
153        collective: CollectiveId,
154    ) -> Result<Pin<Box<dyn Stream<Item = WatchEvent> + Send>>, PulseDBError>;
155
156    /// Creates a new collective (namespace).
157    ///
158    /// Returns the new collective's ID. Fails if a collective with the
159    /// same name already exists.
160    async fn create_collective(&self, name: &str) -> Result<CollectiveId, PulseDBError>;
161
162    /// Gets an existing collective by name, or creates it if it doesn't exist.
163    ///
164    /// This is the recommended method for SDK consumers — idempotent and safe
165    /// to call repeatedly with the same name.
166    async fn get_or_create_collective(&self, name: &str) -> Result<CollectiveId, PulseDBError>;
167
168    /// Lists all collectives in the database.
169    async fn list_collectives(&self) -> Result<Vec<Collective>, PulseDBError>;
170}