pulsedb/substrate/mod.rs
1//! Async storage trait for integrating PulseDB with agent frameworks.
2//!
3//! This module defines the async interface for integrating PulseDB with
4//! agent frameworks and orchestration layers. Consumers hold a
5//! `Box<dyn SubstrateProvider>` to interact with the database without
6//! knowing the concrete storage implementation.
7//!
8//! # Architecture
9//!
10//! ```text
11//! ┌──────────────────────┐ ┌──────────────────────┐
12//! │ Agent Framework │ │ PulseDB │
13//! │ │ │ │
14//! │ Orchestrator ───────┼──────►│ PulseDBSubstrate │
15//! │ Box<dyn Substrate> │ │ (Arc<PulseDB>) │
16//! │ │ │ │
17//! │ Agents interact │ │ spawn_blocking ──► │
18//! │ through the trait │ │ sync storage ops │
19//! │ │ │ │
20//! └──────────────────────┘ └──────────────────────┘
21//! ```
22//!
23//! # Example
24//!
25//! ```rust,no_run
26//! # fn main() -> pulsedb::Result<()> {
27//! # let dir = tempfile::tempdir().unwrap();
28//! use std::sync::Arc;
29//! use pulsedb::{PulseDB, Config, PulseDBSubstrate, SubstrateProvider};
30//!
31//! // Create PulseDB and wrap in substrate
32//! let db = Arc::new(PulseDB::open(dir.path().join("test.db"), Config::default())?);
33//! let substrate = PulseDBSubstrate::new(db);
34//!
35//! // Use as trait object
36//! let provider: Box<dyn SubstrateProvider> = Box::new(substrate);
37//!
38//! // All operations are async (shown here for illustration)
39//! // let exp_id = provider.store_experience(new_exp).await?;
40//! // let results = provider.search_similar(collective, &embedding, 10).await?;
41//! # Ok(())
42//! # }
43//! ```
44
45mod r#impl;
46
47pub use r#impl::PulseDBSubstrate;
48
49use std::pin::Pin;
50
51use async_trait::async_trait;
52use futures_core::Stream;
53
54use crate::activity::Activity;
55use crate::collective::Collective;
56use crate::error::PulseDBError;
57use crate::experience::{Experience, NewExperience};
58use crate::insight::{DerivedInsight, NewDerivedInsight};
59use crate::relation::{ExperienceRelation, NewExperienceRelation};
60use crate::search::{ContextCandidates, ContextRequest};
61use crate::types::{CollectiveId, ExperienceId, InsightId, RelationId};
62use crate::watch::WatchEvent;
63
64/// Async storage interface for agent framework integration.
65///
66/// This trait abstracts PulseDB's storage capabilities behind an async
67/// boundary, enabling agent frameworks to interact with the database
68/// without blocking the async runtime.
69///
70/// # Object Safety
71///
72/// `SubstrateProvider` is object-safe via `#[async_trait]`, allowing it to
73/// be used as `Box<dyn SubstrateProvider>` in any async context.
74///
75/// # Implementors
76///
77/// - [`PulseDBSubstrate`] — production implementation wrapping `Arc<PulseDB>`
78#[async_trait]
79pub trait SubstrateProvider: Send + Sync {
80 /// Stores a new experience and returns its assigned ID.
81 ///
82 /// Generates an embedding (if configured), writes to storage, and
83 /// indexes in the collective's HNSW graph.
84 async fn store_experience(&self, exp: NewExperience) -> Result<ExperienceId, PulseDBError>;
85
86 /// Retrieves an experience by ID, or `None` if it doesn't exist.
87 async fn get_experience(&self, id: ExperienceId) -> Result<Option<Experience>, PulseDBError>;
88
89 /// Searches for experiences similar to the given embedding.
90 ///
91 /// Returns up to `k` results as `(Experience, similarity_score)` tuples,
92 /// sorted by similarity descending (1.0 = identical).
93 async fn search_similar(
94 &self,
95 collective: CollectiveId,
96 embedding: &[f32],
97 k: usize,
98 ) -> Result<Vec<(Experience, f32)>, PulseDBError>;
99
100 /// Retrieves the most recent experiences from a collective.
101 ///
102 /// Returns up to `limit` experiences sorted by timestamp descending.
103 async fn get_recent(
104 &self,
105 collective: CollectiveId,
106 limit: usize,
107 ) -> Result<Vec<Experience>, PulseDBError>;
108
109 /// Stores a relation between two experiences.
110 async fn store_relation(&self, rel: NewExperienceRelation) -> Result<RelationId, PulseDBError>;
111
112 /// Retrieves all experiences related to the given experience (both directions).
113 ///
114 /// Returns `(related_experience, relation)` tuples for both outgoing
115 /// and incoming relations.
116 async fn get_related(
117 &self,
118 exp_id: ExperienceId,
119 ) -> Result<Vec<(Experience, ExperienceRelation)>, PulseDBError>;
120
121 /// Stores a derived insight synthesized from source experiences.
122 async fn store_insight(&self, insight: NewDerivedInsight) -> Result<InsightId, PulseDBError>;
123
124 /// Searches for insights similar to the given embedding.
125 ///
126 /// Returns up to `k` results as `(DerivedInsight, similarity_score)` tuples.
127 async fn get_insights(
128 &self,
129 collective: CollectiveId,
130 embedding: &[f32],
131 k: usize,
132 ) -> Result<Vec<(DerivedInsight, f32)>, PulseDBError>;
133
134 /// Retrieves active (non-stale) agent activities in a collective.
135 async fn get_activities(&self, collective: CollectiveId)
136 -> Result<Vec<Activity>, PulseDBError>;
137
138 /// Assembles context candidates from all retrieval primitives.
139 ///
140 /// Orchestrates similarity search, recent experiences, insights,
141 /// relations, and active agents into a single response.
142 async fn get_context_candidates(
143 &self,
144 request: ContextRequest,
145 ) -> Result<ContextCandidates, PulseDBError>;
146
147 /// Subscribes to real-time experience change events in a collective.
148 ///
149 /// Returns a `Stream` that yields [`WatchEvent`] values whenever
150 /// experiences are created, updated, archived, or deleted.
151 async fn watch(
152 &self,
153 collective: CollectiveId,
154 ) -> Result<Pin<Box<dyn Stream<Item = WatchEvent> + Send>>, PulseDBError>;
155
156 /// Creates a new collective (namespace).
157 ///
158 /// Returns the new collective's ID. Fails if a collective with the
159 /// same name already exists.
160 async fn create_collective(&self, name: &str) -> Result<CollectiveId, PulseDBError>;
161
162 /// Gets an existing collective by name, or creates it if it doesn't exist.
163 ///
164 /// This is the recommended method for SDK consumers — idempotent and safe
165 /// to call repeatedly with the same name.
166 async fn get_or_create_collective(&self, name: &str) -> Result<CollectiveId, PulseDBError>;
167
168 /// Lists all collectives in the database.
169 async fn list_collectives(&self) -> Result<Vec<Collective>, PulseDBError>;
170}