pulsedb/db.rs
1//! PulseDB main struct and lifecycle operations.
2//!
3//! The [`PulseDB`] struct is the primary interface for interacting with
4//! the database. It provides methods for:
5//!
6//! - Opening and closing the database
7//! - Managing collectives (isolation units)
8//! - Recording and querying experiences
9//! - Semantic search and context retrieval
10//!
11//! # Quick Start
12//!
13//! ```rust
14//! # fn main() -> pulsedb::Result<()> {
15//! # let dir = tempfile::tempdir().unwrap();
16//! use pulsedb::{PulseDB, Config, NewExperience};
17//!
18//! // Open or create a database
19//! let db = PulseDB::open(dir.path().join("test.db"), Config::default())?;
20//!
21//! // Create a collective for your project
22//! let collective = db.create_collective("my-project")?;
23//!
24//! // Record an experience
25//! db.record_experience(NewExperience {
26//! collective_id: collective,
27//! content: "Always validate user input".to_string(),
28//! embedding: Some(vec![0.1f32; 384]),
29//! ..Default::default()
30//! })?;
31//!
32//! // Close when done
33//! db.close()?;
34//! # Ok(())
35//! # }
36//! ```
37//!
38//! # Thread Safety
39//!
40//! `PulseDB` is `Send + Sync` and can be shared across threads using `Arc`.
41//! The underlying storage uses MVCC for concurrent reads with exclusive
42//! write locking.
43//!
44//! ```rust
45//! # fn main() -> pulsedb::Result<()> {
46//! # let dir = tempfile::tempdir().unwrap();
47//! use std::sync::Arc;
48//! use pulsedb::{PulseDB, Config};
49//!
50//! let db = Arc::new(PulseDB::open(dir.path().join("test.db"), Config::default())?);
51//!
52//! // Clone Arc for use in another thread
53//! let db_clone = Arc::clone(&db);
54//! std::thread::spawn(move || {
55//! // Safe to use db_clone here
56//! });
57//! # Ok(())
58//! # }
59//! ```
60
61use std::collections::HashMap;
62use std::path::{Path, PathBuf};
63use std::sync::{Arc, RwLock};
64
65use tracing::{info, instrument, warn};
66
67use crate::activity::{validate_new_activity, Activity, NewActivity};
68use crate::collective::types::CollectiveStats;
69use crate::collective::{validate_collective_name, Collective};
70use crate::config::{Config, EmbeddingProvider};
71use crate::embedding::{create_embedding_service, EmbeddingService};
72use crate::error::{NotFoundError, PulseDBError, Result, ValidationError};
73use crate::experience::{
74 validate_experience_update, validate_new_experience, Experience, ExperienceUpdate,
75 NewExperience,
76};
77use crate::insight::{validate_new_insight, DerivedInsight, NewDerivedInsight};
78use crate::search::{ContextCandidates, ContextRequest, SearchFilter, SearchResult};
79use crate::storage::{open_storage, DatabaseMetadata, StorageEngine};
80use crate::types::{CollectiveId, ExperienceId, InsightId, Timestamp};
81use crate::vector::HnswIndex;
82use crate::watch::{WatchEvent, WatchEventType, WatchFilter, WatchService, WatchStream};
83
84/// The main PulseDB database handle.
85///
86/// This is the primary interface for all database operations. Create an
87/// instance with [`PulseDB::open()`] and close it with [`PulseDB::close()`].
88///
89/// # Ownership
90///
91/// `PulseDB` owns its storage and embedding service. When you call `close()`,
92/// the database is consumed and cannot be used afterward. This ensures
93/// resources are properly released.
94pub struct PulseDB {
95 /// Storage engine (redb or mock for testing).
96 storage: Box<dyn StorageEngine>,
97
98 /// Embedding service (external or ONNX).
99 embedding: Box<dyn EmbeddingService>,
100
101 /// Configuration used to open this database.
102 config: Config,
103
104 /// Per-collective HNSW vector indexes for experience semantic search.
105 ///
106 /// Outer RwLock protects the HashMap (add/remove collectives).
107 /// Each HnswIndex has its own internal RwLock for concurrent search+insert.
108 vectors: RwLock<HashMap<CollectiveId, HnswIndex>>,
109
110 /// Per-collective HNSW vector indexes for insight semantic search.
111 ///
112 /// Separate from `vectors` to prevent ID collisions between experiences
113 /// and insights. Uses InsightId→ExperienceId byte conversion for the
114 /// HNSW API (safe because indexes are isolated per collective).
115 insight_vectors: RwLock<HashMap<CollectiveId, HnswIndex>>,
116
117 /// Watch service for real-time experience change notifications.
118 ///
119 /// Arc-wrapped because [`WatchStream`] holds a weak reference for
120 /// cleanup on drop.
121 watch: Arc<WatchService>,
122}
123
124impl std::fmt::Debug for PulseDB {
125 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126 let vector_count = self.vectors.read().map(|v| v.len()).unwrap_or(0);
127 let insight_vector_count = self.insight_vectors.read().map(|v| v.len()).unwrap_or(0);
128 f.debug_struct("PulseDB")
129 .field("config", &self.config)
130 .field("embedding_dimension", &self.embedding_dimension())
131 .field("vector_indexes", &vector_count)
132 .field("insight_vector_indexes", &insight_vector_count)
133 .finish_non_exhaustive()
134 }
135}
136
137impl PulseDB {
138 /// Opens or creates a PulseDB database at the specified path.
139 ///
140 /// If the database doesn't exist, it will be created with the given
141 /// configuration. If it exists, the configuration will be validated
142 /// against the stored settings (e.g., embedding dimension must match).
143 ///
144 /// # Arguments
145 ///
146 /// * `path` - Path to the database file (created if it doesn't exist)
147 /// * `config` - Configuration options for the database
148 ///
149 /// # Errors
150 ///
151 /// Returns an error if:
152 /// - Configuration is invalid (see [`Config::validate`])
153 /// - Database file is corrupted
154 /// - Database is locked by another process
155 /// - Schema version doesn't match (needs migration)
156 /// - Embedding dimension doesn't match existing database
157 ///
158 /// # Example
159 ///
160 /// ```rust
161 /// # fn main() -> pulsedb::Result<()> {
162 /// # let dir = tempfile::tempdir().unwrap();
163 /// use pulsedb::{PulseDB, Config, EmbeddingDimension};
164 ///
165 /// // Open with default configuration
166 /// let db = PulseDB::open(dir.path().join("default.db"), Config::default())?;
167 /// # drop(db);
168 ///
169 /// // Open with custom embedding dimension
170 /// let db = PulseDB::open(dir.path().join("custom.db"), Config {
171 /// embedding_dimension: EmbeddingDimension::D768,
172 /// ..Default::default()
173 /// })?;
174 /// # Ok(())
175 /// # }
176 /// ```
177 #[instrument(skip(config), fields(path = %path.as_ref().display()))]
178 pub fn open(path: impl AsRef<Path>, config: Config) -> Result<Self> {
179 // Validate configuration first
180 config.validate().map_err(PulseDBError::from)?;
181
182 info!("Opening PulseDB");
183
184 // Open storage engine
185 let storage = open_storage(&path, &config)?;
186
187 // Create embedding service
188 let embedding = create_embedding_service(&config)?;
189
190 // Load or rebuild HNSW indexes for all existing collectives
191 let vectors = Self::load_all_indexes(&*storage, &config)?;
192 let insight_vectors = Self::load_all_insight_indexes(&*storage, &config)?;
193
194 info!(
195 dimension = config.embedding_dimension.size(),
196 sync_mode = ?config.sync_mode,
197 collectives = vectors.len(),
198 "PulseDB opened successfully"
199 );
200
201 let watch = Arc::new(WatchService::new(
202 config.watch.buffer_size,
203 config.watch.in_process,
204 ));
205
206 Ok(Self {
207 storage,
208 embedding,
209 config,
210 vectors: RwLock::new(vectors),
211 insight_vectors: RwLock::new(insight_vectors),
212 watch,
213 })
214 }
215
216 /// Closes the database, flushing all pending writes.
217 ///
218 /// This method consumes the `PulseDB` instance, ensuring it cannot
219 /// be used after closing. The underlying storage engine flushes all
220 /// buffered data to disk.
221 ///
222 /// # Errors
223 ///
224 /// Returns an error if the storage backend reports a flush failure.
225 /// Note: the current redb backend flushes durably on drop, so this
226 /// always returns `Ok(())` in practice.
227 ///
228 /// # Example
229 ///
230 /// ```rust
231 /// # fn main() -> pulsedb::Result<()> {
232 /// # let dir = tempfile::tempdir().unwrap();
233 /// use pulsedb::{PulseDB, Config};
234 ///
235 /// let db = PulseDB::open(dir.path().join("test.db"), Config::default())?;
236 /// // ... use the database ...
237 /// db.close()?; // db is consumed here
238 /// // db.something() // Compile error: db was moved
239 /// # Ok(())
240 /// # }
241 /// ```
242 #[instrument(skip(self))]
243 pub fn close(self) -> Result<()> {
244 info!("Closing PulseDB");
245
246 // Persist HNSW indexes BEFORE closing storage.
247 // If HNSW save fails, storage is still open for potential recovery.
248 // On next open(), stale/missing HNSW files trigger a rebuild from redb.
249 if let Some(hnsw_dir) = self.hnsw_dir() {
250 // Experience HNSW indexes
251 let vectors = self
252 .vectors
253 .read()
254 .map_err(|_| PulseDBError::vector("Vectors lock poisoned during close"))?;
255 for (collective_id, index) in vectors.iter() {
256 if let Err(e) = index.save_to_dir(&hnsw_dir, &collective_id.to_string()) {
257 warn!(
258 collective = %collective_id,
259 error = %e,
260 "Failed to save HNSW index (will rebuild on next open)"
261 );
262 }
263 }
264 drop(vectors);
265
266 // Insight HNSW indexes (separate files with _insights suffix)
267 let insight_vectors = self
268 .insight_vectors
269 .read()
270 .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned during close"))?;
271 for (collective_id, index) in insight_vectors.iter() {
272 let name = format!("{}_insights", collective_id);
273 if let Err(e) = index.save_to_dir(&hnsw_dir, &name) {
274 warn!(
275 collective = %collective_id,
276 error = %e,
277 "Failed to save insight HNSW index (will rebuild on next open)"
278 );
279 }
280 }
281 }
282
283 // Close storage (flushes pending writes)
284 self.storage.close()?;
285
286 info!("PulseDB closed successfully");
287 Ok(())
288 }
289
290 /// Returns a reference to the database configuration.
291 ///
292 /// This is the configuration that was used to open the database.
293 /// Note that some settings (like embedding dimension) are locked
294 /// on database creation and cannot be changed.
295 #[inline]
296 pub fn config(&self) -> &Config {
297 &self.config
298 }
299
300 /// Returns the database metadata.
301 ///
302 /// Metadata includes schema version, embedding dimension, and timestamps
303 /// for when the database was created and last opened.
304 #[inline]
305 pub fn metadata(&self) -> &DatabaseMetadata {
306 self.storage.metadata()
307 }
308
309 /// Returns the embedding dimension configured for this database.
310 ///
311 /// All embeddings stored in this database must have exactly this
312 /// many dimensions.
313 #[inline]
314 pub fn embedding_dimension(&self) -> usize {
315 self.config.embedding_dimension.size()
316 }
317
318 // =========================================================================
319 // Internal Accessors (for use by feature modules)
320 // =========================================================================
321
322 /// Returns a reference to the storage engine.
323 ///
324 /// This is for internal use by other PulseDB modules.
325 #[inline]
326 #[allow(dead_code)] // Will be used by search (Phase 2) and other modules
327 pub(crate) fn storage(&self) -> &dyn StorageEngine {
328 self.storage.as_ref()
329 }
330
331 /// Returns a reference to the embedding service.
332 ///
333 /// This is for internal use by other PulseDB modules.
334 #[inline]
335 #[allow(dead_code)] // Will be used by search (Phase 2) and other modules
336 pub(crate) fn embedding(&self) -> &dyn EmbeddingService {
337 self.embedding.as_ref()
338 }
339
340 // =========================================================================
341 // HNSW Index Lifecycle
342 // =========================================================================
343
344 /// Returns the directory for HNSW index files.
345 ///
346 /// Derives `{db_path}.hnsw/` from the storage path. Returns `None` if
347 /// the storage has no file path (e.g., in-memory tests).
348 fn hnsw_dir(&self) -> Option<PathBuf> {
349 self.storage.path().map(|p| {
350 let mut hnsw_path = p.as_os_str().to_owned();
351 hnsw_path.push(".hnsw");
352 PathBuf::from(hnsw_path)
353 })
354 }
355
356 /// Loads or rebuilds HNSW indexes for all existing collectives.
357 ///
358 /// For each collective in storage:
359 /// 1. Try loading metadata from `.hnsw.meta` file
360 /// 2. Rebuild the graph from redb embeddings (always, since we can't
361 /// load the graph due to hnsw_rs lifetime constraints)
362 /// 3. Restore deleted set from metadata if available
363 fn load_all_indexes(
364 storage: &dyn StorageEngine,
365 config: &Config,
366 ) -> Result<HashMap<CollectiveId, HnswIndex>> {
367 let collectives = storage.list_collectives()?;
368 let mut vectors = HashMap::with_capacity(collectives.len());
369
370 let hnsw_dir = storage.path().map(|p| {
371 let mut hnsw_path = p.as_os_str().to_owned();
372 hnsw_path.push(".hnsw");
373 PathBuf::from(hnsw_path)
374 });
375
376 for collective in &collectives {
377 let dimension = collective.embedding_dimension as usize;
378
379 // List all experience IDs in this collective
380 let exp_ids = storage.list_experience_ids_in_collective(collective.id)?;
381
382 // Load embeddings from redb (source of truth)
383 let mut embeddings = Vec::with_capacity(exp_ids.len());
384 for exp_id in &exp_ids {
385 if let Some(embedding) = storage.get_embedding(*exp_id)? {
386 embeddings.push((*exp_id, embedding));
387 }
388 }
389
390 // Try loading metadata (for deleted set and ID mappings)
391 let metadata = hnsw_dir
392 .as_ref()
393 .and_then(|dir| HnswIndex::load_metadata(dir, &collective.id.to_string()).ok())
394 .flatten();
395
396 // Rebuild the HNSW graph from embeddings
397 let index = if embeddings.is_empty() {
398 HnswIndex::new(dimension, &config.hnsw)
399 } else {
400 let start = std::time::Instant::now();
401 let idx = HnswIndex::rebuild_from_embeddings(dimension, &config.hnsw, embeddings)?;
402 info!(
403 collective = %collective.id,
404 vectors = idx.active_count(),
405 elapsed_ms = start.elapsed().as_millis() as u64,
406 "Rebuilt HNSW index from redb embeddings"
407 );
408 idx
409 };
410
411 // Restore deleted set from metadata if available
412 if let Some(meta) = metadata {
413 index.restore_deleted_set(&meta.deleted)?;
414 }
415
416 vectors.insert(collective.id, index);
417 }
418
419 Ok(vectors)
420 }
421
422 /// Loads or rebuilds insight HNSW indexes for all existing collectives.
423 ///
424 /// For each collective, loads all insights from storage and rebuilds
425 /// the HNSW graph from their inline embeddings. Uses InsightId→ExperienceId
426 /// byte conversion for the HNSW API.
427 fn load_all_insight_indexes(
428 storage: &dyn StorageEngine,
429 config: &Config,
430 ) -> Result<HashMap<CollectiveId, HnswIndex>> {
431 let collectives = storage.list_collectives()?;
432 let mut insight_vectors = HashMap::with_capacity(collectives.len());
433
434 let hnsw_dir = storage.path().map(|p| {
435 let mut hnsw_path = p.as_os_str().to_owned();
436 hnsw_path.push(".hnsw");
437 PathBuf::from(hnsw_path)
438 });
439
440 for collective in &collectives {
441 let dimension = collective.embedding_dimension as usize;
442
443 // List all insight IDs in this collective
444 let insight_ids = storage.list_insight_ids_in_collective(collective.id)?;
445
446 // Load insights and extract embeddings (converting InsightId → ExperienceId)
447 let mut embeddings = Vec::with_capacity(insight_ids.len());
448 for insight_id in &insight_ids {
449 if let Some(insight) = storage.get_insight(*insight_id)? {
450 let exp_id = ExperienceId::from_bytes(*insight_id.as_bytes());
451 embeddings.push((exp_id, insight.embedding));
452 }
453 }
454
455 // Try loading metadata (for deleted set)
456 let name = format!("{}_insights", collective.id);
457 let metadata = hnsw_dir
458 .as_ref()
459 .and_then(|dir| HnswIndex::load_metadata(dir, &name).ok())
460 .flatten();
461
462 // Rebuild HNSW graph from embeddings
463 let index = if embeddings.is_empty() {
464 HnswIndex::new(dimension, &config.hnsw)
465 } else {
466 let start = std::time::Instant::now();
467 let idx = HnswIndex::rebuild_from_embeddings(dimension, &config.hnsw, embeddings)?;
468 info!(
469 collective = %collective.id,
470 insights = idx.active_count(),
471 elapsed_ms = start.elapsed().as_millis() as u64,
472 "Rebuilt insight HNSW index from stored insights"
473 );
474 idx
475 };
476
477 // Restore deleted set from metadata if available
478 if let Some(meta) = metadata {
479 index.restore_deleted_set(&meta.deleted)?;
480 }
481
482 insight_vectors.insert(collective.id, index);
483 }
484
485 Ok(insight_vectors)
486 }
487
488 /// Executes a closure with the HNSW index for a collective.
489 ///
490 /// This is the primary accessor for vector search operations (used by
491 /// `search_similar()`). The closure runs while the outer RwLock guard
492 /// is held (read lock), so the HnswIndex reference stays valid.
493 /// Returns `None` if no index exists for the collective.
494 #[doc(hidden)]
495 pub fn with_vector_index<F, R>(&self, collective_id: CollectiveId, f: F) -> Result<Option<R>>
496 where
497 F: FnOnce(&HnswIndex) -> Result<R>,
498 {
499 let vectors = self
500 .vectors
501 .read()
502 .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?;
503 match vectors.get(&collective_id) {
504 Some(index) => Ok(Some(f(index)?)),
505 None => Ok(None),
506 }
507 }
508
509 // =========================================================================
510 // Test Helpers
511 // =========================================================================
512
513 /// Returns a reference to the storage engine for integration testing.
514 ///
515 /// This method is intentionally hidden from documentation. It provides
516 /// test-only access to the storage layer for verifying ACID guarantees
517 /// and crash recovery. Production code should use the public PulseDB API.
518 #[doc(hidden)]
519 #[inline]
520 pub fn storage_for_test(&self) -> &dyn StorageEngine {
521 self.storage.as_ref()
522 }
523
524 // =========================================================================
525 // Collective Management (E1-S02)
526 // =========================================================================
527
528 /// Creates a new collective with the given name.
529 ///
530 /// The collective's embedding dimension is locked to the database's
531 /// configured dimension at creation time.
532 ///
533 /// # Arguments
534 ///
535 /// * `name` - Human-readable name (1-255 characters, not whitespace-only)
536 ///
537 /// # Errors
538 ///
539 /// Returns a validation error if the name is empty, whitespace-only,
540 /// or exceeds 255 characters.
541 ///
542 /// # Example
543 ///
544 /// ```rust
545 /// # fn main() -> pulsedb::Result<()> {
546 /// # let dir = tempfile::tempdir().unwrap();
547 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
548 /// let id = db.create_collective("my-project")?;
549 /// # Ok(())
550 /// # }
551 /// ```
552 #[instrument(skip(self))]
553 pub fn create_collective(&self, name: &str) -> Result<CollectiveId> {
554 validate_collective_name(name)?;
555
556 let dimension = self.config.embedding_dimension.size() as u16;
557 let collective = Collective::new(name, dimension);
558 let id = collective.id;
559
560 // Persist to redb first (source of truth)
561 self.storage.save_collective(&collective)?;
562
563 // Create empty HNSW indexes for this collective
564 let exp_index = HnswIndex::new(dimension as usize, &self.config.hnsw);
565 let insight_index = HnswIndex::new(dimension as usize, &self.config.hnsw);
566 self.vectors
567 .write()
568 .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?
569 .insert(id, exp_index);
570 self.insight_vectors
571 .write()
572 .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?
573 .insert(id, insight_index);
574
575 info!(id = %id, name = %name, "Collective created");
576 Ok(id)
577 }
578
579 /// Creates a new collective with an owner for multi-tenancy.
580 ///
581 /// Same as [`create_collective`](Self::create_collective) but assigns
582 /// an owner ID, enabling filtering with
583 /// [`list_collectives_by_owner`](Self::list_collectives_by_owner).
584 ///
585 /// # Arguments
586 ///
587 /// * `name` - Human-readable name (1-255 characters)
588 /// * `owner_id` - Owner identifier (must not be empty)
589 ///
590 /// # Errors
591 ///
592 /// Returns a validation error if the name or owner_id is invalid.
593 #[instrument(skip(self))]
594 pub fn create_collective_with_owner(&self, name: &str, owner_id: &str) -> Result<CollectiveId> {
595 validate_collective_name(name)?;
596
597 if owner_id.is_empty() {
598 return Err(PulseDBError::from(
599 crate::error::ValidationError::required_field("owner_id"),
600 ));
601 }
602
603 let dimension = self.config.embedding_dimension.size() as u16;
604 let collective = Collective::with_owner(name, owner_id, dimension);
605 let id = collective.id;
606
607 // Persist to redb first (source of truth)
608 self.storage.save_collective(&collective)?;
609
610 // Create empty HNSW indexes for this collective
611 let exp_index = HnswIndex::new(dimension as usize, &self.config.hnsw);
612 let insight_index = HnswIndex::new(dimension as usize, &self.config.hnsw);
613 self.vectors
614 .write()
615 .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?
616 .insert(id, exp_index);
617 self.insight_vectors
618 .write()
619 .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?
620 .insert(id, insight_index);
621
622 info!(id = %id, name = %name, owner = %owner_id, "Collective created with owner");
623 Ok(id)
624 }
625
626 /// Returns a collective by ID, or `None` if not found.
627 ///
628 /// # Example
629 ///
630 /// ```rust
631 /// # fn main() -> pulsedb::Result<()> {
632 /// # let dir = tempfile::tempdir().unwrap();
633 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
634 /// # let id = db.create_collective("example")?;
635 /// if let Some(collective) = db.get_collective(id)? {
636 /// println!("Found: {}", collective.name);
637 /// }
638 /// # Ok(())
639 /// # }
640 /// ```
641 #[instrument(skip(self))]
642 pub fn get_collective(&self, id: CollectiveId) -> Result<Option<Collective>> {
643 self.storage.get_collective(id)
644 }
645
646 /// Lists all collectives in the database.
647 ///
648 /// Returns an empty vector if no collectives exist.
649 pub fn list_collectives(&self) -> Result<Vec<Collective>> {
650 self.storage.list_collectives()
651 }
652
653 /// Lists collectives filtered by owner ID.
654 ///
655 /// Returns only collectives whose `owner_id` matches the given value.
656 /// Returns an empty vector if no matching collectives exist.
657 pub fn list_collectives_by_owner(&self, owner_id: &str) -> Result<Vec<Collective>> {
658 let all = self.storage.list_collectives()?;
659 Ok(all
660 .into_iter()
661 .filter(|c| c.owner_id.as_deref() == Some(owner_id))
662 .collect())
663 }
664
665 /// Returns statistics for a collective.
666 ///
667 /// # Errors
668 ///
669 /// Returns [`NotFoundError::Collective`] if the collective doesn't exist.
670 #[instrument(skip(self))]
671 pub fn get_collective_stats(&self, id: CollectiveId) -> Result<CollectiveStats> {
672 // Verify collective exists
673 self.storage
674 .get_collective(id)?
675 .ok_or_else(|| PulseDBError::from(NotFoundError::collective(id)))?;
676
677 let experience_count = self.storage.count_experiences_in_collective(id)?;
678
679 Ok(CollectiveStats {
680 experience_count,
681 storage_bytes: 0,
682 oldest_experience: None,
683 newest_experience: None,
684 })
685 }
686
687 /// Deletes a collective and all its associated data.
688 ///
689 /// Performs cascade deletion: removes all experiences belonging to the
690 /// collective before removing the collective record itself.
691 ///
692 /// # Errors
693 ///
694 /// Returns [`NotFoundError::Collective`] if the collective doesn't exist.
695 ///
696 /// # Example
697 ///
698 /// ```rust
699 /// # fn main() -> pulsedb::Result<()> {
700 /// # let dir = tempfile::tempdir().unwrap();
701 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
702 /// # let collective_id = db.create_collective("to-delete")?;
703 /// db.delete_collective(collective_id)?;
704 /// assert!(db.get_collective(collective_id)?.is_none());
705 /// # Ok(())
706 /// # }
707 /// ```
708 #[instrument(skip(self))]
709 pub fn delete_collective(&self, id: CollectiveId) -> Result<()> {
710 // Verify collective exists
711 self.storage
712 .get_collective(id)?
713 .ok_or_else(|| PulseDBError::from(NotFoundError::collective(id)))?;
714
715 // Cascade: delete all experiences for this collective
716 let deleted_count = self.storage.delete_experiences_by_collective(id)?;
717 if deleted_count > 0 {
718 info!(count = deleted_count, "Cascade-deleted experiences");
719 }
720
721 // Cascade: delete all insights for this collective
722 let deleted_insights = self.storage.delete_insights_by_collective(id)?;
723 if deleted_insights > 0 {
724 info!(count = deleted_insights, "Cascade-deleted insights");
725 }
726
727 // Cascade: delete all activities for this collective
728 let deleted_activities = self.storage.delete_activities_by_collective(id)?;
729 if deleted_activities > 0 {
730 info!(count = deleted_activities, "Cascade-deleted activities");
731 }
732
733 // Delete the collective record from storage
734 self.storage.delete_collective(id)?;
735
736 // Remove HNSW indexes from memory
737 self.vectors
738 .write()
739 .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?
740 .remove(&id);
741 self.insight_vectors
742 .write()
743 .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?
744 .remove(&id);
745
746 // Remove HNSW files from disk (non-fatal if fails)
747 if let Some(hnsw_dir) = self.hnsw_dir() {
748 if let Err(e) = HnswIndex::remove_files(&hnsw_dir, &id.to_string()) {
749 warn!(
750 collective = %id,
751 error = %e,
752 "Failed to remove experience HNSW files (non-fatal)"
753 );
754 }
755 let insight_name = format!("{}_insights", id);
756 if let Err(e) = HnswIndex::remove_files(&hnsw_dir, &insight_name) {
757 warn!(
758 collective = %id,
759 error = %e,
760 "Failed to remove insight HNSW files (non-fatal)"
761 );
762 }
763 }
764
765 info!(id = %id, "Collective deleted");
766 Ok(())
767 }
768
769 // =========================================================================
770 // Experience CRUD (E1-S03)
771 // =========================================================================
772
773 /// Records a new experience in the database.
774 ///
775 /// This is the primary method for storing agent-learned knowledge. The method:
776 /// 1. Validates the input (content, scores, tags, embedding)
777 /// 2. Verifies the collective exists
778 /// 3. Resolves the embedding (generates if Builtin, requires if External)
779 /// 4. Stores the experience atomically across 4 tables
780 ///
781 /// # Arguments
782 ///
783 /// * `exp` - The experience to record (see [`NewExperience`])
784 ///
785 /// # Errors
786 ///
787 /// - [`ValidationError`](crate::ValidationError) if input is invalid
788 /// - [`NotFoundError::Collective`] if the collective doesn't exist
789 /// - [`PulseDBError::Embedding`] if embedding generation fails (Builtin mode)
790 #[instrument(skip(self, exp), fields(collective_id = %exp.collective_id))]
791 pub fn record_experience(&self, exp: NewExperience) -> Result<ExperienceId> {
792 let is_external = matches!(self.config.embedding_provider, EmbeddingProvider::External);
793
794 // Verify collective exists and get its dimension
795 let collective = self
796 .storage
797 .get_collective(exp.collective_id)?
798 .ok_or_else(|| PulseDBError::from(NotFoundError::collective(exp.collective_id)))?;
799
800 // Validate input
801 validate_new_experience(&exp, collective.embedding_dimension, is_external)?;
802
803 // Resolve embedding
804 let embedding = match exp.embedding {
805 Some(emb) => emb,
806 None => {
807 // Builtin mode: generate embedding from content
808 self.embedding.embed(&exp.content)?
809 }
810 };
811
812 // Clone embedding for HNSW insertion (~1.5KB for 384d, negligible vs I/O)
813 let embedding_for_hnsw = embedding.clone();
814 let collective_id = exp.collective_id;
815
816 // Construct the full experience record
817 let experience = Experience {
818 id: ExperienceId::new(),
819 collective_id,
820 content: exp.content,
821 embedding,
822 experience_type: exp.experience_type,
823 importance: exp.importance,
824 confidence: exp.confidence,
825 applications: 0,
826 domain: exp.domain,
827 related_files: exp.related_files,
828 source_agent: exp.source_agent,
829 source_task: exp.source_task,
830 timestamp: Timestamp::now(),
831 archived: false,
832 };
833
834 let id = experience.id;
835
836 // Write to redb FIRST (source of truth). If crash happens after
837 // this but before HNSW insert, rebuild on next open will include it.
838 self.storage.save_experience(&experience)?;
839
840 // Insert into HNSW index (derived structure)
841 let vectors = self
842 .vectors
843 .read()
844 .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?;
845 if let Some(index) = vectors.get(&collective_id) {
846 index.insert_experience(id, &embedding_for_hnsw)?;
847 }
848
849 // Emit watch event after both storage and HNSW succeed
850 self.watch.emit(
851 WatchEvent {
852 experience_id: id,
853 collective_id,
854 event_type: WatchEventType::Created,
855 timestamp: experience.timestamp,
856 },
857 &experience,
858 )?;
859
860 info!(id = %id, "Experience recorded");
861 Ok(id)
862 }
863
864 /// Retrieves an experience by ID, including its embedding.
865 ///
866 /// Returns `None` if no experience with the given ID exists.
867 #[instrument(skip(self))]
868 pub fn get_experience(&self, id: ExperienceId) -> Result<Option<Experience>> {
869 self.storage.get_experience(id)
870 }
871
872 /// Updates mutable fields of an experience.
873 ///
874 /// Only fields set to `Some(...)` in the update are changed.
875 /// Content and embedding are immutable — create a new experience instead.
876 ///
877 /// # Errors
878 ///
879 /// - [`ValidationError`](crate::ValidationError) if updated values are invalid
880 /// - [`NotFoundError::Experience`] if the experience doesn't exist
881 #[instrument(skip(self, update))]
882 pub fn update_experience(&self, id: ExperienceId, update: ExperienceUpdate) -> Result<()> {
883 validate_experience_update(&update)?;
884
885 let updated = self.storage.update_experience(id, &update)?;
886 if !updated {
887 return Err(PulseDBError::from(NotFoundError::experience(id)));
888 }
889
890 // Emit watch event (fetch experience for collective_id + filter matching)
891 if self.watch.has_subscribers() {
892 if let Ok(Some(exp)) = self.storage.get_experience(id) {
893 let event_type = if update.archived == Some(true) {
894 WatchEventType::Archived
895 } else {
896 WatchEventType::Updated
897 };
898 self.watch.emit(
899 WatchEvent {
900 experience_id: id,
901 collective_id: exp.collective_id,
902 event_type,
903 timestamp: Timestamp::now(),
904 },
905 &exp,
906 )?;
907 }
908 }
909
910 info!(id = %id, "Experience updated");
911 Ok(())
912 }
913
914 /// Archives an experience (soft-delete).
915 ///
916 /// Archived experiences remain in storage but are excluded from search
917 /// results. Use [`unarchive_experience`](Self::unarchive_experience) to restore.
918 ///
919 /// # Errors
920 ///
921 /// Returns [`NotFoundError::Experience`] if the experience doesn't exist.
922 #[instrument(skip(self))]
923 pub fn archive_experience(&self, id: ExperienceId) -> Result<()> {
924 self.update_experience(
925 id,
926 ExperienceUpdate {
927 archived: Some(true),
928 ..Default::default()
929 },
930 )
931 }
932
933 /// Restores an archived experience.
934 ///
935 /// The experience will once again appear in search results.
936 ///
937 /// # Errors
938 ///
939 /// Returns [`NotFoundError::Experience`] if the experience doesn't exist.
940 #[instrument(skip(self))]
941 pub fn unarchive_experience(&self, id: ExperienceId) -> Result<()> {
942 self.update_experience(
943 id,
944 ExperienceUpdate {
945 archived: Some(false),
946 ..Default::default()
947 },
948 )
949 }
950
951 /// Permanently deletes an experience and its embedding.
952 ///
953 /// This removes the experience from all tables and indices.
954 /// Unlike archiving, this is irreversible.
955 ///
956 /// # Errors
957 ///
958 /// Returns [`NotFoundError::Experience`] if the experience doesn't exist.
959 #[instrument(skip(self))]
960 pub fn delete_experience(&self, id: ExperienceId) -> Result<()> {
961 // Read experience first to get collective_id for HNSW lookup.
962 // This adds one extra read, but delete is not a hot path.
963 let experience = self
964 .storage
965 .get_experience(id)?
966 .ok_or_else(|| PulseDBError::from(NotFoundError::experience(id)))?;
967
968 // Cascade-delete any relations involving this experience.
969 // Done before experience deletion so we can still look up relation data.
970 let rel_count = self.storage.delete_relations_for_experience(id)?;
971 if rel_count > 0 {
972 info!(
973 count = rel_count,
974 "Cascade-deleted relations for experience"
975 );
976 }
977
978 // Delete from redb FIRST (source of truth). If crash happens after
979 // this but before HNSW soft-delete, on reopen the experience won't be
980 // loaded from redb, so it's automatically excluded from the rebuilt index.
981 self.storage.delete_experience(id)?;
982
983 // Soft-delete from HNSW index (mark as deleted, not removed from graph).
984 // This takes effect immediately for the current session's searches.
985 let vectors = self
986 .vectors
987 .read()
988 .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?;
989 if let Some(index) = vectors.get(&experience.collective_id) {
990 index.delete_experience(id)?;
991 }
992
993 // Emit watch event after storage + HNSW deletion
994 self.watch.emit(
995 WatchEvent {
996 experience_id: id,
997 collective_id: experience.collective_id,
998 event_type: WatchEventType::Deleted,
999 timestamp: Timestamp::now(),
1000 },
1001 &experience,
1002 )?;
1003
1004 info!(id = %id, "Experience deleted");
1005 Ok(())
1006 }
1007
1008 /// Reinforces an experience by incrementing its application count.
1009 ///
1010 /// Each call atomically increments the `applications` counter by 1.
1011 /// Returns the new application count.
1012 ///
1013 /// # Errors
1014 ///
1015 /// Returns [`NotFoundError::Experience`] if the experience doesn't exist.
1016 #[instrument(skip(self))]
1017 pub fn reinforce_experience(&self, id: ExperienceId) -> Result<u32> {
1018 let new_count = self
1019 .storage
1020 .reinforce_experience(id)?
1021 .ok_or_else(|| PulseDBError::from(NotFoundError::experience(id)))?;
1022
1023 // Emit watch event (fetch experience for collective_id + filter matching)
1024 if self.watch.has_subscribers() {
1025 if let Ok(Some(exp)) = self.storage.get_experience(id) {
1026 self.watch.emit(
1027 WatchEvent {
1028 experience_id: id,
1029 collective_id: exp.collective_id,
1030 event_type: WatchEventType::Updated,
1031 timestamp: Timestamp::now(),
1032 },
1033 &exp,
1034 )?;
1035 }
1036 }
1037
1038 info!(id = %id, applications = new_count, "Experience reinforced");
1039 Ok(new_count)
1040 }
1041
1042 // =========================================================================
1043 // Recent Experiences
1044 // =========================================================================
1045
1046 /// Retrieves the most recent experiences in a collective, newest first.
1047 ///
1048 /// Returns up to `limit` non-archived experiences ordered by timestamp
1049 /// descending (newest first). Uses the by-collective timestamp index
1050 /// for efficient retrieval.
1051 ///
1052 /// # Arguments
1053 ///
1054 /// * `collective_id` - The collective to query
1055 /// * `limit` - Maximum number of experiences to return (1-1000)
1056 ///
1057 /// # Errors
1058 ///
1059 /// - [`ValidationError::InvalidField`] if `limit` is 0 or > 1000
1060 /// - [`NotFoundError::Collective`] if the collective doesn't exist
1061 ///
1062 /// # Example
1063 ///
1064 /// ```rust
1065 /// # fn main() -> pulsedb::Result<()> {
1066 /// # let dir = tempfile::tempdir().unwrap();
1067 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
1068 /// # let collective_id = db.create_collective("example")?;
1069 /// let recent = db.get_recent_experiences(collective_id, 50)?;
1070 /// for exp in &recent {
1071 /// println!("{}: {}", exp.timestamp, exp.content);
1072 /// }
1073 /// # Ok(())
1074 /// # }
1075 /// ```
1076 #[instrument(skip(self))]
1077 pub fn get_recent_experiences(
1078 &self,
1079 collective_id: CollectiveId,
1080 limit: usize,
1081 ) -> Result<Vec<Experience>> {
1082 self.get_recent_experiences_filtered(collective_id, limit, SearchFilter::default())
1083 }
1084
1085 /// Retrieves the most recent experiences in a collective with filtering.
1086 ///
1087 /// Like [`get_recent_experiences()`](Self::get_recent_experiences), but
1088 /// applies additional filters on domain, experience type, importance,
1089 /// confidence, and timestamp.
1090 ///
1091 /// Over-fetches from storage (2x `limit`) to account for entries removed
1092 /// by post-filtering, then truncates to the requested `limit`.
1093 ///
1094 /// # Arguments
1095 ///
1096 /// * `collective_id` - The collective to query
1097 /// * `limit` - Maximum number of experiences to return (1-1000)
1098 /// * `filter` - Filter criteria to apply
1099 ///
1100 /// # Errors
1101 ///
1102 /// - [`ValidationError::InvalidField`] if `limit` is 0 or > 1000
1103 /// - [`NotFoundError::Collective`] if the collective doesn't exist
1104 #[instrument(skip(self, filter))]
1105 pub fn get_recent_experiences_filtered(
1106 &self,
1107 collective_id: CollectiveId,
1108 limit: usize,
1109 filter: SearchFilter,
1110 ) -> Result<Vec<Experience>> {
1111 // Validate limit
1112 if limit == 0 || limit > 1000 {
1113 return Err(
1114 ValidationError::invalid_field("limit", "must be between 1 and 1000").into(),
1115 );
1116 }
1117
1118 // Verify collective exists
1119 self.storage
1120 .get_collective(collective_id)?
1121 .ok_or_else(|| PulseDBError::from(NotFoundError::collective(collective_id)))?;
1122
1123 // Over-fetch IDs to account for post-filtering losses
1124 let over_fetch = limit.saturating_mul(2).min(2000);
1125 let recent_ids = self
1126 .storage
1127 .get_recent_experience_ids(collective_id, over_fetch)?;
1128
1129 // Load full experiences and apply filter
1130 let mut results = Vec::with_capacity(limit);
1131 for (exp_id, _timestamp) in recent_ids {
1132 if results.len() >= limit {
1133 break;
1134 }
1135
1136 if let Some(experience) = self.storage.get_experience(exp_id)? {
1137 if filter.matches(&experience) {
1138 results.push(experience);
1139 }
1140 }
1141 }
1142
1143 Ok(results)
1144 }
1145
1146 // =========================================================================
1147 // Similarity Search (E2-S02)
1148 // =========================================================================
1149
1150 /// Searches for experiences semantically similar to the query embedding.
1151 ///
1152 /// Uses the HNSW vector index for approximate nearest neighbor search,
1153 /// then fetches full experience records from storage. Archived experiences
1154 /// are excluded by default.
1155 ///
1156 /// Results are sorted by similarity descending (most similar first).
1157 /// Similarity is computed as `1.0 - cosine_distance`.
1158 ///
1159 /// # Arguments
1160 ///
1161 /// * `collective_id` - The collective to search within
1162 /// * `query` - Query embedding vector (must match collective's dimension)
1163 /// * `k` - Maximum number of results to return (1-1000)
1164 ///
1165 /// # Errors
1166 ///
1167 /// - [`ValidationError::InvalidField`] if `k` is 0 or > 1000
1168 /// - [`ValidationError::DimensionMismatch`] if `query.len()` doesn't match
1169 /// the collective's embedding dimension
1170 /// - [`NotFoundError::Collective`] if the collective doesn't exist
1171 ///
1172 /// # Example
1173 ///
1174 /// ```rust
1175 /// # fn main() -> pulsedb::Result<()> {
1176 /// # let dir = tempfile::tempdir().unwrap();
1177 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
1178 /// # let collective_id = db.create_collective("example")?;
1179 /// let query = vec![0.1f32; 384]; // Your query embedding
1180 /// let results = db.search_similar(collective_id, &query, 10)?;
1181 /// for result in &results {
1182 /// println!(
1183 /// "[{:.3}] {}",
1184 /// result.similarity, result.experience.content
1185 /// );
1186 /// }
1187 /// # Ok(())
1188 /// # }
1189 /// ```
1190 #[instrument(skip(self, query))]
1191 pub fn search_similar(
1192 &self,
1193 collective_id: CollectiveId,
1194 query: &[f32],
1195 k: usize,
1196 ) -> Result<Vec<SearchResult>> {
1197 self.search_similar_filtered(collective_id, query, k, SearchFilter::default())
1198 }
1199
1200 /// Searches for semantically similar experiences with additional filtering.
1201 ///
1202 /// Like [`search_similar()`](Self::search_similar), but applies additional
1203 /// filters on domain, experience type, importance, confidence, and timestamp.
1204 ///
1205 /// Over-fetches from the HNSW index (2x `k`) to account for entries removed
1206 /// by post-filtering, then truncates to the requested `k`.
1207 ///
1208 /// # Arguments
1209 ///
1210 /// * `collective_id` - The collective to search within
1211 /// * `query` - Query embedding vector (must match collective's dimension)
1212 /// * `k` - Maximum number of results to return (1-1000)
1213 /// * `filter` - Filter criteria to apply after vector search
1214 ///
1215 /// # Errors
1216 ///
1217 /// - [`ValidationError::InvalidField`] if `k` is 0 or > 1000
1218 /// - [`ValidationError::DimensionMismatch`] if `query.len()` doesn't match
1219 /// the collective's embedding dimension
1220 /// - [`NotFoundError::Collective`] if the collective doesn't exist
1221 ///
1222 /// # Example
1223 ///
1224 /// ```rust
1225 /// # fn main() -> pulsedb::Result<()> {
1226 /// # let dir = tempfile::tempdir().unwrap();
1227 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
1228 /// # let collective_id = db.create_collective("example")?;
1229 /// # let query_embedding = vec![0.1f32; 384];
1230 /// use pulsedb::SearchFilter;
1231 ///
1232 /// let filter = SearchFilter {
1233 /// domains: Some(vec!["rust".to_string()]),
1234 /// min_importance: Some(0.5),
1235 /// ..SearchFilter::default()
1236 /// };
1237 /// let results = db.search_similar_filtered(
1238 /// collective_id,
1239 /// &query_embedding,
1240 /// 10,
1241 /// filter,
1242 /// )?;
1243 /// # Ok(())
1244 /// # }
1245 /// ```
1246 #[instrument(skip(self, query, filter))]
1247 pub fn search_similar_filtered(
1248 &self,
1249 collective_id: CollectiveId,
1250 query: &[f32],
1251 k: usize,
1252 filter: SearchFilter,
1253 ) -> Result<Vec<SearchResult>> {
1254 // Validate k
1255 if k == 0 || k > 1000 {
1256 return Err(ValidationError::invalid_field("k", "must be between 1 and 1000").into());
1257 }
1258
1259 // Verify collective exists and check embedding dimension
1260 let collective = self
1261 .storage
1262 .get_collective(collective_id)?
1263 .ok_or_else(|| PulseDBError::from(NotFoundError::collective(collective_id)))?;
1264
1265 let expected_dim = collective.embedding_dimension as usize;
1266 if query.len() != expected_dim {
1267 return Err(ValidationError::dimension_mismatch(expected_dim, query.len()).into());
1268 }
1269
1270 // Over-fetch from HNSW to compensate for post-filtering losses
1271 let over_fetch = k.saturating_mul(2).min(2000);
1272 let ef_search = self.config.hnsw.ef_search;
1273
1274 // Search HNSW index — returns (ExperienceId, cosine_distance) sorted
1275 // by distance ascending (closest first)
1276 let candidates = self
1277 .with_vector_index(collective_id, |index| {
1278 index.search_experiences(query, over_fetch, ef_search)
1279 })?
1280 .unwrap_or_default();
1281
1282 // Fetch full experiences, apply filter, convert distance → similarity
1283 let mut results = Vec::with_capacity(k);
1284 for (exp_id, distance) in candidates {
1285 if results.len() >= k {
1286 break;
1287 }
1288
1289 if let Some(experience) = self.storage.get_experience(exp_id)? {
1290 if filter.matches(&experience) {
1291 results.push(SearchResult {
1292 experience,
1293 similarity: 1.0 - distance,
1294 });
1295 }
1296 }
1297 }
1298
1299 Ok(results)
1300 }
1301
1302 // =========================================================================
1303 // Experience Relations (E3-S01)
1304 // =========================================================================
1305
1306 /// Stores a new relation between two experiences.
1307 ///
1308 /// Relations are typed, directed edges connecting a source experience to a
1309 /// target experience. Both experiences must exist and belong to the same
1310 /// collective. Duplicate relations (same source, target, and type) are
1311 /// rejected.
1312 ///
1313 /// # Arguments
1314 ///
1315 /// * `relation` - The relation to create (source, target, type, strength)
1316 ///
1317 /// # Errors
1318 ///
1319 /// Returns an error if:
1320 /// - Source or target experience doesn't exist ([`NotFoundError::Experience`])
1321 /// - Experiences belong to different collectives ([`ValidationError::InvalidField`])
1322 /// - A relation with the same (source, target, type) already exists
1323 /// - Self-relation attempted (source == target)
1324 /// - Strength is out of range `[0.0, 1.0]`
1325 #[instrument(skip(self, relation))]
1326 pub fn store_relation(
1327 &self,
1328 relation: crate::relation::NewExperienceRelation,
1329 ) -> Result<crate::types::RelationId> {
1330 use crate::relation::{validate_new_relation, ExperienceRelation};
1331 use crate::types::RelationId;
1332
1333 // Validate input fields (self-relation, strength bounds, metadata size)
1334 validate_new_relation(&relation)?;
1335
1336 // Load source and target experiences to verify existence
1337 let source = self
1338 .storage
1339 .get_experience(relation.source_id)?
1340 .ok_or_else(|| PulseDBError::from(NotFoundError::experience(relation.source_id)))?;
1341 let target = self
1342 .storage
1343 .get_experience(relation.target_id)?
1344 .ok_or_else(|| PulseDBError::from(NotFoundError::experience(relation.target_id)))?;
1345
1346 // Verify same collective
1347 if source.collective_id != target.collective_id {
1348 return Err(PulseDBError::from(ValidationError::invalid_field(
1349 "target_id",
1350 "source and target experiences must belong to the same collective",
1351 )));
1352 }
1353
1354 // Check for duplicate (same source, target, type)
1355 if self.storage.relation_exists(
1356 relation.source_id,
1357 relation.target_id,
1358 relation.relation_type,
1359 )? {
1360 return Err(PulseDBError::from(ValidationError::invalid_field(
1361 "relation_type",
1362 "a relation with this source, target, and type already exists",
1363 )));
1364 }
1365
1366 // Construct the full relation
1367 let id = RelationId::new();
1368 let full_relation = ExperienceRelation {
1369 id,
1370 source_id: relation.source_id,
1371 target_id: relation.target_id,
1372 relation_type: relation.relation_type,
1373 strength: relation.strength,
1374 metadata: relation.metadata,
1375 created_at: Timestamp::now(),
1376 };
1377
1378 self.storage.save_relation(&full_relation)?;
1379
1380 info!(
1381 id = %id,
1382 source = %relation.source_id,
1383 target = %relation.target_id,
1384 relation_type = ?full_relation.relation_type,
1385 "Relation stored"
1386 );
1387 Ok(id)
1388 }
1389
1390 /// Retrieves experiences related to the given experience.
1391 ///
1392 /// Returns pairs of `(Experience, ExperienceRelation)` based on the
1393 /// requested direction:
1394 /// - `Outgoing`: experiences that this experience points TO (as source)
1395 /// - `Incoming`: experiences that point TO this experience (as target)
1396 /// - `Both`: union of outgoing and incoming
1397 ///
1398 /// To filter by relation type, use
1399 /// [`get_related_experiences_filtered`](Self::get_related_experiences_filtered).
1400 ///
1401 /// Silently skips relations where the related experience no longer exists
1402 /// (orphan tolerance).
1403 ///
1404 /// # Errors
1405 ///
1406 /// Returns a storage error if the read transaction fails.
1407 #[instrument(skip(self))]
1408 pub fn get_related_experiences(
1409 &self,
1410 experience_id: ExperienceId,
1411 direction: crate::relation::RelationDirection,
1412 ) -> Result<Vec<(Experience, crate::relation::ExperienceRelation)>> {
1413 self.get_related_experiences_filtered(experience_id, direction, None)
1414 }
1415
1416 /// Retrieves experiences related to the given experience, with optional
1417 /// type filtering.
1418 ///
1419 /// Like [`get_related_experiences()`](Self::get_related_experiences), but
1420 /// accepts an optional [`RelationType`](crate::RelationType) filter.
1421 /// When `Some(rt)`, only relations matching that type are returned.
1422 ///
1423 /// # Arguments
1424 ///
1425 /// * `experience_id` - The experience to query relations for
1426 /// * `direction` - Which direction(s) to traverse
1427 /// * `relation_type` - If `Some`, only return relations of this type
1428 ///
1429 /// # Example
1430 ///
1431 /// ```rust
1432 /// # fn main() -> pulsedb::Result<()> {
1433 /// # let dir = tempfile::tempdir().unwrap();
1434 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
1435 /// # let cid = db.create_collective("example")?;
1436 /// # let exp_a = db.record_experience(pulsedb::NewExperience {
1437 /// # collective_id: cid,
1438 /// # content: "a".into(),
1439 /// # embedding: Some(vec![0.1f32; 384]),
1440 /// # ..Default::default()
1441 /// # })?;
1442 /// use pulsedb::{RelationType, RelationDirection};
1443 ///
1444 /// // Only "Supports" relations outgoing from exp_a
1445 /// let supports = db.get_related_experiences_filtered(
1446 /// exp_a,
1447 /// RelationDirection::Outgoing,
1448 /// Some(RelationType::Supports),
1449 /// )?;
1450 /// # Ok(())
1451 /// # }
1452 /// ```
1453 #[instrument(skip(self))]
1454 pub fn get_related_experiences_filtered(
1455 &self,
1456 experience_id: ExperienceId,
1457 direction: crate::relation::RelationDirection,
1458 relation_type: Option<crate::relation::RelationType>,
1459 ) -> Result<Vec<(Experience, crate::relation::ExperienceRelation)>> {
1460 use crate::relation::RelationDirection;
1461
1462 let mut results = Vec::new();
1463
1464 // Outgoing: this experience is the source → fetch target experiences
1465 if matches!(
1466 direction,
1467 RelationDirection::Outgoing | RelationDirection::Both
1468 ) {
1469 let rel_ids = self.storage.get_relation_ids_by_source(experience_id)?;
1470 for rel_id in rel_ids {
1471 if let Some(relation) = self.storage.get_relation(rel_id)? {
1472 if relation_type.is_some_and(|rt| rt != relation.relation_type) {
1473 continue;
1474 }
1475 if let Some(experience) = self.storage.get_experience(relation.target_id)? {
1476 results.push((experience, relation));
1477 }
1478 }
1479 }
1480 }
1481
1482 // Incoming: this experience is the target → fetch source experiences
1483 if matches!(
1484 direction,
1485 RelationDirection::Incoming | RelationDirection::Both
1486 ) {
1487 let rel_ids = self.storage.get_relation_ids_by_target(experience_id)?;
1488 for rel_id in rel_ids {
1489 if let Some(relation) = self.storage.get_relation(rel_id)? {
1490 if relation_type.is_some_and(|rt| rt != relation.relation_type) {
1491 continue;
1492 }
1493 if let Some(experience) = self.storage.get_experience(relation.source_id)? {
1494 results.push((experience, relation));
1495 }
1496 }
1497 }
1498 }
1499
1500 Ok(results)
1501 }
1502
1503 /// Retrieves a relation by ID.
1504 ///
1505 /// Returns `None` if no relation with the given ID exists.
1506 pub fn get_relation(
1507 &self,
1508 id: crate::types::RelationId,
1509 ) -> Result<Option<crate::relation::ExperienceRelation>> {
1510 self.storage.get_relation(id)
1511 }
1512
1513 /// Deletes a relation by ID.
1514 ///
1515 /// # Errors
1516 ///
1517 /// Returns [`NotFoundError::Relation`] if no relation with the given ID exists.
1518 #[instrument(skip(self))]
1519 pub fn delete_relation(&self, id: crate::types::RelationId) -> Result<()> {
1520 let deleted = self.storage.delete_relation(id)?;
1521 if !deleted {
1522 return Err(PulseDBError::from(NotFoundError::relation(id)));
1523 }
1524 info!(id = %id, "Relation deleted");
1525 Ok(())
1526 }
1527
1528 // =========================================================================
1529 // Derived Insights (E3-S02)
1530 // =========================================================================
1531
1532 /// Stores a new derived insight.
1533 ///
1534 /// Creates a synthesized knowledge record from multiple source experiences.
1535 /// The method:
1536 /// 1. Validates the input (content, confidence, sources)
1537 /// 2. Verifies the collective exists
1538 /// 3. Verifies all source experiences exist and belong to the same collective
1539 /// 4. Resolves the embedding (generates if Builtin, requires if External)
1540 /// 5. Stores the insight with inline embedding
1541 /// 6. Inserts into the insight HNSW index
1542 ///
1543 /// # Arguments
1544 ///
1545 /// * `insight` - The insight to store (see [`NewDerivedInsight`])
1546 ///
1547 /// # Errors
1548 ///
1549 /// - [`ValidationError`](crate::ValidationError) if input is invalid
1550 /// - [`NotFoundError::Collective`] if the collective doesn't exist
1551 /// - [`NotFoundError::Experience`] if any source experience doesn't exist
1552 /// - [`ValidationError::InvalidField`] if source experiences belong to
1553 /// different collectives
1554 /// - [`ValidationError::DimensionMismatch`] if embedding dimension is wrong
1555 #[instrument(skip(self, insight), fields(collective_id = %insight.collective_id))]
1556 pub fn store_insight(&self, insight: NewDerivedInsight) -> Result<InsightId> {
1557 let is_external = matches!(self.config.embedding_provider, EmbeddingProvider::External);
1558
1559 // Validate input fields
1560 validate_new_insight(&insight)?;
1561
1562 // Verify collective exists
1563 let collective = self
1564 .storage
1565 .get_collective(insight.collective_id)?
1566 .ok_or_else(|| PulseDBError::from(NotFoundError::collective(insight.collective_id)))?;
1567
1568 // Verify all source experiences exist and belong to this collective
1569 for source_id in &insight.source_experience_ids {
1570 let source_exp = self
1571 .storage
1572 .get_experience(*source_id)?
1573 .ok_or_else(|| PulseDBError::from(NotFoundError::experience(*source_id)))?;
1574 if source_exp.collective_id != insight.collective_id {
1575 return Err(PulseDBError::from(ValidationError::invalid_field(
1576 "source_experience_ids",
1577 format!(
1578 "experience {} belongs to collective {}, not {}",
1579 source_id, source_exp.collective_id, insight.collective_id
1580 ),
1581 )));
1582 }
1583 }
1584
1585 // Resolve embedding
1586 let embedding = match insight.embedding {
1587 Some(ref emb) => {
1588 // Validate dimension
1589 let expected_dim = collective.embedding_dimension as usize;
1590 if emb.len() != expected_dim {
1591 return Err(ValidationError::dimension_mismatch(expected_dim, emb.len()).into());
1592 }
1593 emb.clone()
1594 }
1595 None => {
1596 if is_external {
1597 return Err(PulseDBError::embedding(
1598 "embedding is required when using External embedding provider",
1599 ));
1600 }
1601 self.embedding.embed(&insight.content)?
1602 }
1603 };
1604
1605 let embedding_for_hnsw = embedding.clone();
1606 let now = Timestamp::now();
1607 let id = InsightId::new();
1608
1609 // Construct the full insight record
1610 let derived_insight = DerivedInsight {
1611 id,
1612 collective_id: insight.collective_id,
1613 content: insight.content,
1614 embedding,
1615 source_experience_ids: insight.source_experience_ids,
1616 insight_type: insight.insight_type,
1617 confidence: insight.confidence,
1618 domain: insight.domain,
1619 created_at: now,
1620 updated_at: now,
1621 };
1622
1623 // Write to redb FIRST (source of truth)
1624 self.storage.save_insight(&derived_insight)?;
1625
1626 // Insert into insight HNSW index (using InsightId→ExperienceId byte conversion)
1627 let exp_id = ExperienceId::from_bytes(*id.as_bytes());
1628 let insight_vectors = self
1629 .insight_vectors
1630 .read()
1631 .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?;
1632 if let Some(index) = insight_vectors.get(&insight.collective_id) {
1633 index.insert_experience(exp_id, &embedding_for_hnsw)?;
1634 }
1635
1636 info!(id = %id, "Insight stored");
1637 Ok(id)
1638 }
1639
1640 /// Retrieves a derived insight by ID.
1641 ///
1642 /// Returns `None` if no insight with the given ID exists.
1643 #[instrument(skip(self))]
1644 pub fn get_insight(&self, id: InsightId) -> Result<Option<DerivedInsight>> {
1645 self.storage.get_insight(id)
1646 }
1647
1648 /// Searches for insights semantically similar to the query embedding.
1649 ///
1650 /// Uses the insight-specific HNSW index for approximate nearest neighbor
1651 /// search, then fetches full insight records from storage.
1652 ///
1653 /// # Arguments
1654 ///
1655 /// * `collective_id` - The collective to search within
1656 /// * `query` - Query embedding vector (must match collective's dimension)
1657 /// * `k` - Maximum number of results to return
1658 ///
1659 /// # Errors
1660 ///
1661 /// - [`ValidationError::DimensionMismatch`] if `query.len()` doesn't match
1662 /// - [`NotFoundError::Collective`] if the collective doesn't exist
1663 #[instrument(skip(self, query))]
1664 pub fn get_insights(
1665 &self,
1666 collective_id: CollectiveId,
1667 query: &[f32],
1668 k: usize,
1669 ) -> Result<Vec<(DerivedInsight, f32)>> {
1670 // Verify collective exists and check embedding dimension
1671 let collective = self
1672 .storage
1673 .get_collective(collective_id)?
1674 .ok_or_else(|| PulseDBError::from(NotFoundError::collective(collective_id)))?;
1675
1676 let expected_dim = collective.embedding_dimension as usize;
1677 if query.len() != expected_dim {
1678 return Err(ValidationError::dimension_mismatch(expected_dim, query.len()).into());
1679 }
1680
1681 let ef_search = self.config.hnsw.ef_search;
1682
1683 // Search insight HNSW — returns (ExperienceId, distance) pairs
1684 let insight_vectors = self
1685 .insight_vectors
1686 .read()
1687 .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?;
1688
1689 let candidates = match insight_vectors.get(&collective_id) {
1690 Some(index) => index.search_experiences(query, k, ef_search)?,
1691 None => return Ok(vec![]),
1692 };
1693 drop(insight_vectors);
1694
1695 // Convert ExperienceId back to InsightId and fetch records
1696 let mut results = Vec::with_capacity(candidates.len());
1697 for (exp_id, distance) in candidates {
1698 let insight_id = InsightId::from_bytes(*exp_id.as_bytes());
1699 if let Some(insight) = self.storage.get_insight(insight_id)? {
1700 // Convert HNSW distance to similarity (1.0 - distance), matching search_similar pattern
1701 results.push((insight, 1.0 - distance));
1702 }
1703 }
1704
1705 Ok(results)
1706 }
1707
1708 /// Deletes a derived insight by ID.
1709 ///
1710 /// Removes the insight from storage and soft-deletes it from the HNSW index.
1711 ///
1712 /// # Errors
1713 ///
1714 /// Returns [`NotFoundError::Insight`] if no insight with the given ID exists.
1715 #[instrument(skip(self))]
1716 pub fn delete_insight(&self, id: InsightId) -> Result<()> {
1717 // Read insight first to get collective_id for HNSW lookup
1718 let insight = self
1719 .storage
1720 .get_insight(id)?
1721 .ok_or_else(|| PulseDBError::from(NotFoundError::insight(id)))?;
1722
1723 // Delete from redb FIRST (source of truth)
1724 self.storage.delete_insight(id)?;
1725
1726 // Soft-delete from insight HNSW (using InsightId→ExperienceId byte conversion)
1727 let exp_id = ExperienceId::from_bytes(*id.as_bytes());
1728 let insight_vectors = self
1729 .insight_vectors
1730 .read()
1731 .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?;
1732 if let Some(index) = insight_vectors.get(&insight.collective_id) {
1733 index.delete_experience(exp_id)?;
1734 }
1735
1736 info!(id = %id, "Insight deleted");
1737 Ok(())
1738 }
1739
1740 // =========================================================================
1741 // Activity Tracking (E3-S03)
1742 // =========================================================================
1743
1744 /// Registers an agent's presence in a collective.
1745 ///
1746 /// Creates a new activity record or replaces an existing one for the
1747 /// same `(collective_id, agent_id)` pair (upsert semantics). Both
1748 /// `started_at` and `last_heartbeat` are set to `Timestamp::now()`.
1749 ///
1750 /// # Arguments
1751 ///
1752 /// * `activity` - The activity registration (see [`NewActivity`])
1753 ///
1754 /// # Errors
1755 ///
1756 /// - [`ValidationError`] if agent_id is empty or fields exceed size limits
1757 /// - [`NotFoundError::Collective`] if the collective doesn't exist
1758 ///
1759 /// # Example
1760 ///
1761 /// ```rust
1762 /// # fn main() -> pulsedb::Result<()> {
1763 /// # let dir = tempfile::tempdir().unwrap();
1764 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
1765 /// # let collective_id = db.create_collective("example")?;
1766 /// use pulsedb::NewActivity;
1767 ///
1768 /// db.register_activity(NewActivity {
1769 /// agent_id: "claude-opus".to_string(),
1770 /// collective_id,
1771 /// current_task: Some("Reviewing pull request".to_string()),
1772 /// context_summary: None,
1773 /// })?;
1774 /// # Ok(())
1775 /// # }
1776 /// ```
1777 #[instrument(skip(self, activity), fields(agent_id = %activity.agent_id, collective_id = %activity.collective_id))]
1778 pub fn register_activity(&self, activity: NewActivity) -> Result<()> {
1779 // Validate input
1780 validate_new_activity(&activity)?;
1781
1782 // Verify collective exists
1783 self.storage
1784 .get_collective(activity.collective_id)?
1785 .ok_or_else(|| PulseDBError::from(NotFoundError::collective(activity.collective_id)))?;
1786
1787 // Build stored activity with timestamps
1788 let now = Timestamp::now();
1789 let stored = Activity {
1790 agent_id: activity.agent_id,
1791 collective_id: activity.collective_id,
1792 current_task: activity.current_task,
1793 context_summary: activity.context_summary,
1794 started_at: now,
1795 last_heartbeat: now,
1796 };
1797
1798 self.storage.save_activity(&stored)?;
1799
1800 info!(
1801 agent_id = %stored.agent_id,
1802 collective_id = %stored.collective_id,
1803 "Activity registered"
1804 );
1805 Ok(())
1806 }
1807
1808 /// Updates an agent's heartbeat timestamp.
1809 ///
1810 /// Refreshes the `last_heartbeat` to `Timestamp::now()` without changing
1811 /// any other fields. The agent must have an existing activity registered.
1812 ///
1813 /// # Errors
1814 ///
1815 /// - [`NotFoundError::Activity`] if no activity exists for the agent/collective pair
1816 #[instrument(skip(self))]
1817 pub fn update_heartbeat(&self, agent_id: &str, collective_id: CollectiveId) -> Result<()> {
1818 let mut activity = self
1819 .storage
1820 .get_activity(agent_id, collective_id)?
1821 .ok_or_else(|| {
1822 PulseDBError::from(NotFoundError::activity(format!(
1823 "{} in {}",
1824 agent_id, collective_id
1825 )))
1826 })?;
1827
1828 activity.last_heartbeat = Timestamp::now();
1829 self.storage.save_activity(&activity)?;
1830
1831 info!(agent_id = %agent_id, collective_id = %collective_id, "Heartbeat updated");
1832 Ok(())
1833 }
1834
1835 /// Ends an agent's activity in a collective.
1836 ///
1837 /// Removes the activity record. After calling this, the agent will no
1838 /// longer appear in `get_active_agents()` results.
1839 ///
1840 /// # Errors
1841 ///
1842 /// - [`NotFoundError::Activity`] if no activity exists for the agent/collective pair
1843 #[instrument(skip(self))]
1844 pub fn end_activity(&self, agent_id: &str, collective_id: CollectiveId) -> Result<()> {
1845 let deleted = self.storage.delete_activity(agent_id, collective_id)?;
1846
1847 if !deleted {
1848 return Err(PulseDBError::from(NotFoundError::activity(format!(
1849 "{} in {}",
1850 agent_id, collective_id
1851 ))));
1852 }
1853
1854 info!(agent_id = %agent_id, collective_id = %collective_id, "Activity ended");
1855 Ok(())
1856 }
1857
1858 /// Returns all active (non-stale) agents in a collective.
1859 ///
1860 /// Fetches all activities, filters out those whose `last_heartbeat` is
1861 /// older than `config.activity.stale_threshold`, and returns the rest
1862 /// sorted by `last_heartbeat` descending (most recently active first).
1863 ///
1864 /// # Errors
1865 ///
1866 /// - [`NotFoundError::Collective`] if the collective doesn't exist
1867 #[instrument(skip(self))]
1868 pub fn get_active_agents(&self, collective_id: CollectiveId) -> Result<Vec<Activity>> {
1869 // Verify collective exists
1870 self.storage
1871 .get_collective(collective_id)?
1872 .ok_or_else(|| PulseDBError::from(NotFoundError::collective(collective_id)))?;
1873
1874 let all_activities = self.storage.list_activities_in_collective(collective_id)?;
1875
1876 // Filter stale activities
1877 let now = Timestamp::now();
1878 let threshold_ms = self.config.activity.stale_threshold.as_millis() as i64;
1879 let cutoff = now.as_millis() - threshold_ms;
1880
1881 let mut active: Vec<Activity> = all_activities
1882 .into_iter()
1883 .filter(|a| a.last_heartbeat.as_millis() >= cutoff)
1884 .collect();
1885
1886 // Sort by last_heartbeat descending (most recently active first)
1887 active.sort_by(|a, b| b.last_heartbeat.cmp(&a.last_heartbeat));
1888
1889 Ok(active)
1890 }
1891
1892 // =========================================================================
1893 // Context Candidates (E2-S04)
1894 // =========================================================================
1895
1896 /// Retrieves unified context candidates from all retrieval primitives.
1897 ///
1898 /// This is the primary API for context assembly. It orchestrates:
1899 /// 1. Similarity search ([`search_similar_filtered`](Self::search_similar_filtered))
1900 /// 2. Recent experiences ([`get_recent_experiences_filtered`](Self::get_recent_experiences_filtered))
1901 /// 3. Insight search ([`get_insights`](Self::get_insights)) — if requested
1902 /// 4. Relation collection ([`get_related_experiences`](Self::get_related_experiences)) — if requested
1903 /// 5. Active agents ([`get_active_agents`](Self::get_active_agents)) — if requested
1904 ///
1905 /// # Arguments
1906 ///
1907 /// * `request` - Configuration for which primitives to query and limits
1908 ///
1909 /// # Errors
1910 ///
1911 /// - [`ValidationError::InvalidField`] if `max_similar` or `max_recent` is 0 or > 1000
1912 /// - [`ValidationError::DimensionMismatch`] if `query_embedding.len()` doesn't match
1913 /// the collective's embedding dimension
1914 /// - [`NotFoundError::Collective`] if the collective doesn't exist
1915 ///
1916 /// # Performance
1917 ///
1918 /// Target: < 100ms at 100K experiences. The similarity search (~50ms) dominates;
1919 /// all other sub-calls are < 10ms each.
1920 ///
1921 /// # Example
1922 ///
1923 /// ```rust
1924 /// # fn main() -> pulsedb::Result<()> {
1925 /// # let dir = tempfile::tempdir().unwrap();
1926 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
1927 /// # let collective_id = db.create_collective("example")?;
1928 /// # let query_vec = vec![0.1f32; 384];
1929 /// use pulsedb::{ContextRequest, SearchFilter};
1930 ///
1931 /// let candidates = db.get_context_candidates(ContextRequest {
1932 /// collective_id,
1933 /// query_embedding: query_vec,
1934 /// max_similar: 10,
1935 /// max_recent: 5,
1936 /// include_insights: true,
1937 /// include_relations: true,
1938 /// include_active_agents: true,
1939 /// filter: SearchFilter {
1940 /// domains: Some(vec!["rust".to_string()]),
1941 /// ..SearchFilter::default()
1942 /// },
1943 /// ..ContextRequest::default()
1944 /// })?;
1945 /// # Ok(())
1946 /// # }
1947 /// ```
1948 #[instrument(skip(self, request), fields(collective_id = %request.collective_id))]
1949 pub fn get_context_candidates(&self, request: ContextRequest) -> Result<ContextCandidates> {
1950 // ── Validate limits ──────────────────────────────────────
1951 if request.max_similar == 0 || request.max_similar > 1000 {
1952 return Err(ValidationError::invalid_field(
1953 "max_similar",
1954 "must be between 1 and 1000",
1955 )
1956 .into());
1957 }
1958 if request.max_recent == 0 || request.max_recent > 1000 {
1959 return Err(
1960 ValidationError::invalid_field("max_recent", "must be between 1 and 1000").into(),
1961 );
1962 }
1963
1964 // ── Verify collective exists and check dimension ─────────
1965 let collective = self
1966 .storage
1967 .get_collective(request.collective_id)?
1968 .ok_or_else(|| PulseDBError::from(NotFoundError::collective(request.collective_id)))?;
1969
1970 let expected_dim = collective.embedding_dimension as usize;
1971 if request.query_embedding.len() != expected_dim {
1972 return Err(ValidationError::dimension_mismatch(
1973 expected_dim,
1974 request.query_embedding.len(),
1975 )
1976 .into());
1977 }
1978
1979 // ── 1. Similar experiences (HNSW vector search) ──────────
1980 let similar_experiences = self.search_similar_filtered(
1981 request.collective_id,
1982 &request.query_embedding,
1983 request.max_similar,
1984 request.filter.clone(),
1985 )?;
1986
1987 // ── 2. Recent experiences (timestamp index scan) ─────────
1988 let recent_experiences = self.get_recent_experiences_filtered(
1989 request.collective_id,
1990 request.max_recent,
1991 request.filter,
1992 )?;
1993
1994 // ── 3. Insights (HNSW vector search on insight index) ────
1995 let insights = if request.include_insights {
1996 self.get_insights(
1997 request.collective_id,
1998 &request.query_embedding,
1999 request.max_similar,
2000 )?
2001 .into_iter()
2002 .map(|(insight, _score)| insight)
2003 .collect()
2004 } else {
2005 vec![]
2006 };
2007
2008 // ── 4. Relations (graph traversal from result experiences) ─
2009 let relations = if request.include_relations {
2010 use std::collections::HashSet;
2011
2012 let mut seen = HashSet::new();
2013 let mut all_relations = Vec::new();
2014
2015 // Collect unique experience IDs from both result sets
2016 let exp_ids: Vec<_> = similar_experiences
2017 .iter()
2018 .map(|r| r.experience.id)
2019 .chain(recent_experiences.iter().map(|e| e.id))
2020 .collect();
2021
2022 for exp_id in exp_ids {
2023 let related =
2024 self.get_related_experiences(exp_id, crate::relation::RelationDirection::Both)?;
2025
2026 for (_experience, relation) in related {
2027 if seen.insert(relation.id) {
2028 all_relations.push(relation);
2029 }
2030 }
2031 }
2032
2033 all_relations
2034 } else {
2035 vec![]
2036 };
2037
2038 // ── 5. Active agents (staleness-filtered activity records) ─
2039 let active_agents = if request.include_active_agents {
2040 self.get_active_agents(request.collective_id)?
2041 } else {
2042 vec![]
2043 };
2044
2045 Ok(ContextCandidates {
2046 similar_experiences,
2047 recent_experiences,
2048 insights,
2049 relations,
2050 active_agents,
2051 })
2052 }
2053
2054 // =========================================================================
2055 // Watch System (E4-S01)
2056 // =========================================================================
2057
2058 /// Subscribes to all experience changes in a collective.
2059 ///
2060 /// Returns a [`WatchStream`] that yields [`WatchEvent`] values for every
2061 /// create, update, archive, and delete operation. The stream ends when
2062 /// dropped or when the `PulseDB` instance is closed.
2063 ///
2064 /// Multiple subscribers per collective are supported. Each gets an
2065 /// independent copy of every event.
2066 ///
2067 /// # Example
2068 ///
2069 /// ```rust,no_run
2070 /// # #[tokio::main]
2071 /// # async fn main() -> pulsedb::Result<()> {
2072 /// # let dir = tempfile::tempdir().unwrap();
2073 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
2074 /// # let collective_id = db.create_collective("example")?;
2075 /// use futures::StreamExt;
2076 ///
2077 /// let mut stream = db.watch_experiences(collective_id)?;
2078 /// while let Some(event) = stream.next().await {
2079 /// println!("{:?}: {}", event.event_type, event.experience_id);
2080 /// }
2081 /// # Ok(())
2082 /// # }
2083 /// ```
2084 pub fn watch_experiences(&self, collective_id: CollectiveId) -> Result<WatchStream> {
2085 self.watch.subscribe(collective_id, None)
2086 }
2087
2088 /// Subscribes to filtered experience changes in a collective.
2089 ///
2090 /// Like [`watch_experiences`](Self::watch_experiences), but only delivers
2091 /// events that match the filter criteria. Filters are applied on the
2092 /// sender side before channel delivery.
2093 ///
2094 /// # Example
2095 ///
2096 /// ```rust
2097 /// # fn main() -> pulsedb::Result<()> {
2098 /// # let dir = tempfile::tempdir().unwrap();
2099 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
2100 /// # let collective_id = db.create_collective("example")?;
2101 /// use pulsedb::WatchFilter;
2102 ///
2103 /// let filter = WatchFilter {
2104 /// domains: Some(vec!["security".to_string()]),
2105 /// min_importance: Some(0.7),
2106 /// ..Default::default()
2107 /// };
2108 /// let mut stream = db.watch_experiences_filtered(collective_id, filter)?;
2109 /// # Ok(())
2110 /// # }
2111 /// ```
2112 pub fn watch_experiences_filtered(
2113 &self,
2114 collective_id: CollectiveId,
2115 filter: WatchFilter,
2116 ) -> Result<WatchStream> {
2117 self.watch.subscribe(collective_id, Some(filter))
2118 }
2119
2120 // =========================================================================
2121 // Cross-Process Watch (E4-S02)
2122 // =========================================================================
2123
2124 /// Returns the current WAL sequence number.
2125 ///
2126 /// Use this to establish a baseline before starting to poll for changes.
2127 /// Returns 0 if no experience writes have occurred yet.
2128 ///
2129 /// # Example
2130 ///
2131 /// ```rust
2132 /// # fn main() -> pulsedb::Result<()> {
2133 /// # let dir = tempfile::tempdir().unwrap();
2134 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
2135 /// let seq = db.get_current_sequence()?;
2136 /// // ... later ...
2137 /// let (events, new_seq) = db.poll_changes(seq)?;
2138 /// # Ok(())
2139 /// # }
2140 /// ```
2141 pub fn get_current_sequence(&self) -> Result<u64> {
2142 self.storage.get_wal_sequence()
2143 }
2144
2145 /// Polls for experience changes since the given sequence number.
2146 ///
2147 /// Returns a tuple of `(events, new_sequence)`:
2148 /// - `events`: New [`WatchEvent`]s in sequence order
2149 /// - `new_sequence`: Pass this value back on the next call
2150 ///
2151 /// Returns an empty vec and the same sequence if no changes exist.
2152 ///
2153 /// # Arguments
2154 ///
2155 /// * `since_seq` - The last sequence number you received (0 for first call)
2156 ///
2157 /// # Performance
2158 ///
2159 /// Target: < 10ms per call. Internally performs a range scan on the
2160 /// watch_events table, O(k) where k is the number of new events.
2161 ///
2162 /// # Example
2163 ///
2164 /// ```rust,no_run
2165 /// # fn main() -> pulsedb::Result<()> {
2166 /// # let dir = tempfile::tempdir().unwrap();
2167 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
2168 /// use std::time::Duration;
2169 ///
2170 /// let mut seq = 0u64;
2171 /// loop {
2172 /// let (events, new_seq) = db.poll_changes(seq)?;
2173 /// seq = new_seq;
2174 /// for event in events {
2175 /// println!("{:?}: {}", event.event_type, event.experience_id);
2176 /// }
2177 /// std::thread::sleep(Duration::from_millis(100));
2178 /// }
2179 /// # }
2180 /// ```
2181 pub fn poll_changes(&self, since_seq: u64) -> Result<(Vec<WatchEvent>, u64)> {
2182 let (records, new_seq) = self.storage.poll_watch_events(since_seq, 1000)?;
2183 let events = records.into_iter().map(WatchEvent::from).collect();
2184 Ok((events, new_seq))
2185 }
2186
2187 /// Polls for changes with a custom batch size limit.
2188 ///
2189 /// Same as [`poll_changes`](Self::poll_changes) but returns at most
2190 /// `limit` events per call. Use this for backpressure control.
2191 pub fn poll_changes_batch(
2192 &self,
2193 since_seq: u64,
2194 limit: usize,
2195 ) -> Result<(Vec<WatchEvent>, u64)> {
2196 let (records, new_seq) = self.storage.poll_watch_events(since_seq, limit)?;
2197 let events = records.into_iter().map(WatchEvent::from).collect();
2198 Ok((events, new_seq))
2199 }
2200}
2201
2202// PulseDB is auto Send + Sync: Box<dyn StorageEngine + Send + Sync>,
2203// Box<dyn EmbeddingService + Send + Sync>, and Config are all Send + Sync.
2204
2205#[cfg(test)]
2206mod tests {
2207 use super::*;
2208 use crate::config::EmbeddingDimension;
2209 use tempfile::tempdir;
2210
2211 #[test]
2212 fn test_open_creates_database() {
2213 let dir = tempdir().unwrap();
2214 let path = dir.path().join("test.db");
2215
2216 let db = PulseDB::open(&path, Config::default()).unwrap();
2217
2218 assert!(path.exists());
2219 assert_eq!(db.embedding_dimension(), 384);
2220
2221 db.close().unwrap();
2222 }
2223
2224 #[test]
2225 fn test_open_existing_database() {
2226 let dir = tempdir().unwrap();
2227 let path = dir.path().join("test.db");
2228
2229 // Create
2230 let db = PulseDB::open(&path, Config::default()).unwrap();
2231 db.close().unwrap();
2232
2233 // Reopen
2234 let db = PulseDB::open(&path, Config::default()).unwrap();
2235 assert_eq!(db.embedding_dimension(), 384);
2236 db.close().unwrap();
2237 }
2238
2239 #[test]
2240 fn test_config_validation() {
2241 let dir = tempdir().unwrap();
2242 let path = dir.path().join("test.db");
2243
2244 let invalid_config = Config {
2245 cache_size_mb: 0, // Invalid
2246 ..Default::default()
2247 };
2248
2249 let result = PulseDB::open(&path, invalid_config);
2250 assert!(result.is_err());
2251 }
2252
2253 #[test]
2254 fn test_dimension_mismatch() {
2255 let dir = tempdir().unwrap();
2256 let path = dir.path().join("test.db");
2257
2258 // Create with D384
2259 let db = PulseDB::open(
2260 &path,
2261 Config {
2262 embedding_dimension: EmbeddingDimension::D384,
2263 ..Default::default()
2264 },
2265 )
2266 .unwrap();
2267 db.close().unwrap();
2268
2269 // Try to reopen with D768
2270 let result = PulseDB::open(
2271 &path,
2272 Config {
2273 embedding_dimension: EmbeddingDimension::D768,
2274 ..Default::default()
2275 },
2276 );
2277
2278 assert!(result.is_err());
2279 }
2280
2281 #[test]
2282 fn test_metadata_access() {
2283 let dir = tempdir().unwrap();
2284 let path = dir.path().join("test.db");
2285
2286 let db = PulseDB::open(&path, Config::default()).unwrap();
2287
2288 let metadata = db.metadata();
2289 assert_eq!(metadata.embedding_dimension, EmbeddingDimension::D384);
2290
2291 db.close().unwrap();
2292 }
2293
2294 #[test]
2295 fn test_pulsedb_is_send_sync() {
2296 fn assert_send_sync<T: Send + Sync>() {}
2297 assert_send_sync::<PulseDB>();
2298 }
2299}