Skip to main content

pulsedb/substrate/
impl.rs

1//! Concrete SubstrateProvider implementation backed by PulseDB.
2
3use std::pin::Pin;
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures_core::Stream;
8use tokio::task::spawn_blocking;
9
10use crate::activity::Activity;
11use crate::collective::Collective;
12use crate::db::PulseDB;
13use crate::error::PulseDBError;
14use crate::experience::{Experience, NewExperience};
15use crate::insight::{DerivedInsight, NewDerivedInsight};
16use crate::relation::{ExperienceRelation, NewExperienceRelation, RelationDirection};
17use crate::search::{ContextCandidates, ContextRequest};
18use crate::types::{CollectiveId, ExperienceId, InsightId, RelationId};
19use crate::watch::WatchEvent;
20
21use super::SubstrateProvider;
22
23/// Async adapter wrapping [`PulseDB`] for use as a [`SubstrateProvider`].
24///
25/// Each async method delegates to PulseDB's synchronous API via
26/// [`tokio::task::spawn_blocking`], preventing database I/O from blocking
27/// the async runtime's worker threads.
28///
29/// # Construction
30///
31/// ```rust
32/// # fn main() -> pulsedb::Result<()> {
33/// # let dir = tempfile::tempdir().unwrap();
34/// use std::sync::Arc;
35/// use pulsedb::{PulseDB, Config, PulseDBSubstrate};
36///
37/// let db = Arc::new(PulseDB::open(dir.path().join("a.db"), Config::default())?);
38/// let substrate = PulseDBSubstrate::new(db);
39///
40/// // Or from an owned PulseDB:
41/// let db = PulseDB::open(dir.path().join("b.db"), Config::default())?;
42/// let substrate = PulseDBSubstrate::from_db(db);
43/// # Ok(())
44/// # }
45/// ```
46///
47/// # Cloning
48///
49/// `PulseDBSubstrate` implements `Clone` — cloning is cheap (Arc reference count).
50/// Multiple clones share the same underlying database.
51#[derive(Clone)]
52pub struct PulseDBSubstrate {
53    db: Arc<PulseDB>,
54}
55
56impl PulseDBSubstrate {
57    /// Creates a new substrate provider from a shared `PulseDB` reference.
58    pub fn new(db: Arc<PulseDB>) -> Self {
59        Self { db }
60    }
61
62    /// Creates a new substrate provider, wrapping the given `PulseDB` in an `Arc`.
63    pub fn from_db(db: PulseDB) -> Self {
64        Self { db: Arc::new(db) }
65    }
66}
67
68/// Runs a blocking closure on tokio's blocking thread pool.
69///
70/// Maps `JoinError` (task panic or cancellation) to `PulseDBError::Internal`.
71async fn blocking<F, T>(f: F) -> Result<T, PulseDBError>
72where
73    F: FnOnce() -> Result<T, PulseDBError> + Send + 'static,
74    T: Send + 'static,
75{
76    spawn_blocking(f)
77        .await
78        .map_err(|e| PulseDBError::internal(format!("blocking task failed: {e}")))?
79}
80
81#[async_trait]
82impl SubstrateProvider for PulseDBSubstrate {
83    async fn store_experience(&self, exp: NewExperience) -> Result<ExperienceId, PulseDBError> {
84        let db = Arc::clone(&self.db);
85        blocking(move || db.record_experience(exp)).await
86    }
87
88    async fn get_experience(&self, id: ExperienceId) -> Result<Option<Experience>, PulseDBError> {
89        let db = Arc::clone(&self.db);
90        blocking(move || db.get_experience(id)).await
91    }
92
93    async fn search_similar(
94        &self,
95        collective: CollectiveId,
96        embedding: &[f32],
97        k: usize,
98    ) -> Result<Vec<(Experience, f32)>, PulseDBError> {
99        let db = Arc::clone(&self.db);
100        // Must clone the slice — spawn_blocking requires 'static
101        let embedding = embedding.to_vec();
102        blocking(move || {
103            db.search_similar(collective, &embedding, k).map(|results| {
104                results
105                    .into_iter()
106                    .map(|r| (r.experience, r.similarity))
107                    .collect()
108            })
109        })
110        .await
111    }
112
113    async fn get_recent(
114        &self,
115        collective: CollectiveId,
116        limit: usize,
117    ) -> Result<Vec<Experience>, PulseDBError> {
118        let db = Arc::clone(&self.db);
119        blocking(move || db.get_recent_experiences(collective, limit)).await
120    }
121
122    async fn store_relation(&self, rel: NewExperienceRelation) -> Result<RelationId, PulseDBError> {
123        let db = Arc::clone(&self.db);
124        blocking(move || db.store_relation(rel)).await
125    }
126
127    async fn get_related(
128        &self,
129        exp_id: ExperienceId,
130    ) -> Result<Vec<(Experience, ExperienceRelation)>, PulseDBError> {
131        let db = Arc::clone(&self.db);
132        blocking(move || db.get_related_experiences(exp_id, RelationDirection::Both)).await
133    }
134
135    async fn store_insight(&self, insight: NewDerivedInsight) -> Result<InsightId, PulseDBError> {
136        let db = Arc::clone(&self.db);
137        blocking(move || db.store_insight(insight)).await
138    }
139
140    async fn get_insights(
141        &self,
142        collective: CollectiveId,
143        embedding: &[f32],
144        k: usize,
145    ) -> Result<Vec<(DerivedInsight, f32)>, PulseDBError> {
146        let db = Arc::clone(&self.db);
147        let embedding = embedding.to_vec();
148        blocking(move || db.get_insights(collective, &embedding, k)).await
149    }
150
151    async fn get_activities(
152        &self,
153        collective: CollectiveId,
154    ) -> Result<Vec<Activity>, PulseDBError> {
155        let db = Arc::clone(&self.db);
156        blocking(move || db.get_active_agents(collective)).await
157    }
158
159    async fn get_context_candidates(
160        &self,
161        request: ContextRequest,
162    ) -> Result<ContextCandidates, PulseDBError> {
163        let db = Arc::clone(&self.db);
164        blocking(move || db.get_context_candidates(request)).await
165    }
166
167    async fn watch(
168        &self,
169        collective: CollectiveId,
170    ) -> Result<Pin<Box<dyn Stream<Item = WatchEvent> + Send>>, PulseDBError> {
171        // watch_experiences is non-blocking (just channel setup), no spawn_blocking needed
172        let stream = self.db.watch_experiences(collective)?;
173        Ok(Box::pin(stream))
174    }
175
176    async fn create_collective(&self, name: &str) -> Result<CollectiveId, PulseDBError> {
177        let db = Arc::clone(&self.db);
178        let name = name.to_string();
179        blocking(move || db.create_collective(&name)).await
180    }
181
182    async fn get_or_create_collective(&self, name: &str) -> Result<CollectiveId, PulseDBError> {
183        let db = Arc::clone(&self.db);
184        let name = name.to_string();
185        blocking(move || {
186            // Try to find existing by name
187            let collectives = db.list_collectives()?;
188            if let Some(existing) = collectives.iter().find(|c| c.name == name) {
189                return Ok(existing.id);
190            }
191            // Not found — create new
192            db.create_collective(&name)
193        })
194        .await
195    }
196
197    async fn list_collectives(&self) -> Result<Vec<Collective>, PulseDBError> {
198        let db = Arc::clone(&self.db);
199        blocking(move || db.list_collectives()).await
200    }
201
202    async fn list_experiences(
203        &self,
204        collective: CollectiveId,
205        limit: usize,
206        offset: usize,
207    ) -> Result<Vec<Experience>, PulseDBError> {
208        let db = Arc::clone(&self.db);
209        blocking(move || db.list_experiences(collective, limit, offset)).await
210    }
211
212    async fn list_relations(
213        &self,
214        collective: CollectiveId,
215        limit: usize,
216        offset: usize,
217    ) -> Result<Vec<ExperienceRelation>, PulseDBError> {
218        let db = Arc::clone(&self.db);
219        blocking(move || db.list_relations(collective, limit, offset)).await
220    }
221
222    async fn list_insights(
223        &self,
224        collective: CollectiveId,
225        limit: usize,
226        offset: usize,
227    ) -> Result<Vec<DerivedInsight>, PulseDBError> {
228        let db = Arc::clone(&self.db);
229        blocking(move || db.list_insights(collective, limit, offset)).await
230    }
231}