pulsedb/substrate/
impl.rs1use std::pin::Pin;
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures_core::Stream;
8use tokio::task::spawn_blocking;
9
10use crate::activity::Activity;
11use crate::collective::Collective;
12use crate::db::PulseDB;
13use crate::error::PulseDBError;
14use crate::experience::{Experience, NewExperience};
15use crate::insight::{DerivedInsight, NewDerivedInsight};
16use crate::relation::{ExperienceRelation, NewExperienceRelation, RelationDirection};
17use crate::search::{ContextCandidates, ContextRequest};
18use crate::types::{CollectiveId, ExperienceId, InsightId, RelationId};
19use crate::watch::WatchEvent;
20
21use super::SubstrateProvider;
22
23#[derive(Clone)]
52pub struct PulseDBSubstrate {
53 db: Arc<PulseDB>,
54}
55
56impl PulseDBSubstrate {
57 pub fn new(db: Arc<PulseDB>) -> Self {
59 Self { db }
60 }
61
62 pub fn from_db(db: PulseDB) -> Self {
64 Self { db: Arc::new(db) }
65 }
66}
67
68async fn blocking<F, T>(f: F) -> Result<T, PulseDBError>
72where
73 F: FnOnce() -> Result<T, PulseDBError> + Send + 'static,
74 T: Send + 'static,
75{
76 spawn_blocking(f)
77 .await
78 .map_err(|e| PulseDBError::internal(format!("blocking task failed: {e}")))?
79}
80
81#[async_trait]
82impl SubstrateProvider for PulseDBSubstrate {
83 async fn store_experience(&self, exp: NewExperience) -> Result<ExperienceId, PulseDBError> {
84 let db = Arc::clone(&self.db);
85 blocking(move || db.record_experience(exp)).await
86 }
87
88 async fn get_experience(&self, id: ExperienceId) -> Result<Option<Experience>, PulseDBError> {
89 let db = Arc::clone(&self.db);
90 blocking(move || db.get_experience(id)).await
91 }
92
93 async fn search_similar(
94 &self,
95 collective: CollectiveId,
96 embedding: &[f32],
97 k: usize,
98 ) -> Result<Vec<(Experience, f32)>, PulseDBError> {
99 let db = Arc::clone(&self.db);
100 let embedding = embedding.to_vec();
102 blocking(move || {
103 db.search_similar(collective, &embedding, k).map(|results| {
104 results
105 .into_iter()
106 .map(|r| (r.experience, r.similarity))
107 .collect()
108 })
109 })
110 .await
111 }
112
113 async fn get_recent(
114 &self,
115 collective: CollectiveId,
116 limit: usize,
117 ) -> Result<Vec<Experience>, PulseDBError> {
118 let db = Arc::clone(&self.db);
119 blocking(move || db.get_recent_experiences(collective, limit)).await
120 }
121
122 async fn store_relation(&self, rel: NewExperienceRelation) -> Result<RelationId, PulseDBError> {
123 let db = Arc::clone(&self.db);
124 blocking(move || db.store_relation(rel)).await
125 }
126
127 async fn get_related(
128 &self,
129 exp_id: ExperienceId,
130 ) -> Result<Vec<(Experience, ExperienceRelation)>, PulseDBError> {
131 let db = Arc::clone(&self.db);
132 blocking(move || db.get_related_experiences(exp_id, RelationDirection::Both)).await
133 }
134
135 async fn store_insight(&self, insight: NewDerivedInsight) -> Result<InsightId, PulseDBError> {
136 let db = Arc::clone(&self.db);
137 blocking(move || db.store_insight(insight)).await
138 }
139
140 async fn get_insights(
141 &self,
142 collective: CollectiveId,
143 embedding: &[f32],
144 k: usize,
145 ) -> Result<Vec<(DerivedInsight, f32)>, PulseDBError> {
146 let db = Arc::clone(&self.db);
147 let embedding = embedding.to_vec();
148 blocking(move || db.get_insights(collective, &embedding, k)).await
149 }
150
151 async fn get_activities(
152 &self,
153 collective: CollectiveId,
154 ) -> Result<Vec<Activity>, PulseDBError> {
155 let db = Arc::clone(&self.db);
156 blocking(move || db.get_active_agents(collective)).await
157 }
158
159 async fn get_context_candidates(
160 &self,
161 request: ContextRequest,
162 ) -> Result<ContextCandidates, PulseDBError> {
163 let db = Arc::clone(&self.db);
164 blocking(move || db.get_context_candidates(request)).await
165 }
166
167 async fn watch(
168 &self,
169 collective: CollectiveId,
170 ) -> Result<Pin<Box<dyn Stream<Item = WatchEvent> + Send>>, PulseDBError> {
171 let stream = self.db.watch_experiences(collective)?;
173 Ok(Box::pin(stream))
174 }
175
176 async fn create_collective(&self, name: &str) -> Result<CollectiveId, PulseDBError> {
177 let db = Arc::clone(&self.db);
178 let name = name.to_string();
179 blocking(move || db.create_collective(&name)).await
180 }
181
182 async fn get_or_create_collective(&self, name: &str) -> Result<CollectiveId, PulseDBError> {
183 let db = Arc::clone(&self.db);
184 let name = name.to_string();
185 blocking(move || {
186 let collectives = db.list_collectives()?;
188 if let Some(existing) = collectives.iter().find(|c| c.name == name) {
189 return Ok(existing.id);
190 }
191 db.create_collective(&name)
193 })
194 .await
195 }
196
197 async fn list_collectives(&self) -> Result<Vec<Collective>, PulseDBError> {
198 let db = Arc::clone(&self.db);
199 blocking(move || db.list_collectives()).await
200 }
201
202 async fn list_experiences(
203 &self,
204 collective: CollectiveId,
205 limit: usize,
206 offset: usize,
207 ) -> Result<Vec<Experience>, PulseDBError> {
208 let db = Arc::clone(&self.db);
209 blocking(move || db.list_experiences(collective, limit, offset)).await
210 }
211
212 async fn list_relations(
213 &self,
214 collective: CollectiveId,
215 limit: usize,
216 offset: usize,
217 ) -> Result<Vec<ExperienceRelation>, PulseDBError> {
218 let db = Arc::clone(&self.db);
219 blocking(move || db.list_relations(collective, limit, offset)).await
220 }
221
222 async fn list_insights(
223 &self,
224 collective: CollectiveId,
225 limit: usize,
226 offset: usize,
227 ) -> Result<Vec<DerivedInsight>, PulseDBError> {
228 let db = Arc::clone(&self.db);
229 blocking(move || db.list_insights(collective, limit, offset)).await
230 }
231}