pulsedb/db.rs
1//! PulseDB main struct and lifecycle operations.
2//!
3//! The [`PulseDB`] struct is the primary interface for interacting with
4//! the database. It provides methods for:
5//!
6//! - Opening and closing the database
7//! - Managing collectives (isolation units)
8//! - Recording and querying experiences
9//! - Semantic search and context retrieval
10//!
11//! # Quick Start
12//!
13//! ```rust
14//! # fn main() -> pulsedb::Result<()> {
15//! # let dir = tempfile::tempdir().unwrap();
16//! use pulsedb::{PulseDB, Config, NewExperience};
17//!
18//! // Open or create a database
19//! let db = PulseDB::open(dir.path().join("test.db"), Config::default())?;
20//!
21//! // Create a collective for your project
22//! let collective = db.create_collective("my-project")?;
23//!
24//! // Record an experience
25//! db.record_experience(NewExperience {
26//! collective_id: collective,
27//! content: "Always validate user input".to_string(),
28//! embedding: Some(vec![0.1f32; 384]),
29//! ..Default::default()
30//! })?;
31//!
32//! // Close when done
33//! db.close()?;
34//! # Ok(())
35//! # }
36//! ```
37//!
38//! # Thread Safety
39//!
40//! `PulseDB` is `Send + Sync` and can be shared across threads using `Arc`.
41//! The underlying storage uses MVCC for concurrent reads with exclusive
42//! write locking.
43//!
44//! ```rust
45//! # fn main() -> pulsedb::Result<()> {
46//! # let dir = tempfile::tempdir().unwrap();
47//! use std::sync::Arc;
48//! use pulsedb::{PulseDB, Config};
49//!
50//! let db = Arc::new(PulseDB::open(dir.path().join("test.db"), Config::default())?);
51//!
52//! // Clone Arc for use in another thread
53//! let db_clone = Arc::clone(&db);
54//! std::thread::spawn(move || {
55//! // Safe to use db_clone here
56//! });
57//! # Ok(())
58//! # }
59//! ```
60
61use std::collections::HashMap;
62use std::path::{Path, PathBuf};
63use std::sync::{Arc, RwLock};
64
65#[cfg(feature = "sync")]
66use tracing::debug;
67use tracing::{info, instrument, warn};
68
69use crate::activity::{validate_new_activity, Activity, NewActivity};
70use crate::collective::types::CollectiveStats;
71use crate::collective::{validate_collective_name, Collective};
72use crate::config::{Config, EmbeddingProvider};
73use crate::embedding::{create_embedding_service, EmbeddingService};
74use crate::error::{NotFoundError, PulseDBError, Result, ValidationError};
75use crate::experience::{
76 validate_experience_update, validate_new_experience, Experience, ExperienceUpdate,
77 NewExperience,
78};
79use crate::insight::{validate_new_insight, DerivedInsight, NewDerivedInsight};
80#[cfg(feature = "sync")]
81use crate::relation::ExperienceRelation;
82use crate::search::{ContextCandidates, ContextRequest, SearchFilter, SearchResult};
83use crate::storage::{open_storage, DatabaseMetadata, StorageEngine};
84#[cfg(feature = "sync")]
85use crate::types::RelationId;
86use crate::types::{CollectiveId, ExperienceId, InsightId, Timestamp};
87use crate::vector::HnswIndex;
88use crate::watch::{WatchEvent, WatchEventType, WatchFilter, WatchService, WatchStream};
89
90/// The main PulseDB database handle.
91///
92/// This is the primary interface for all database operations. Create an
93/// instance with [`PulseDB::open()`] and close it with [`PulseDB::close()`].
94///
95/// # Ownership
96///
97/// `PulseDB` owns its storage and embedding service. When you call `close()`,
98/// the database is consumed and cannot be used afterward. This ensures
99/// resources are properly released.
100pub struct PulseDB {
101 /// Storage engine (redb or mock for testing).
102 storage: Box<dyn StorageEngine>,
103
104 /// Embedding service (external or ONNX).
105 embedding: Box<dyn EmbeddingService>,
106
107 /// Configuration used to open this database.
108 config: Config,
109
110 /// Per-collective HNSW vector indexes for experience semantic search.
111 ///
112 /// Outer RwLock protects the HashMap (add/remove collectives).
113 /// Each HnswIndex has its own internal RwLock for concurrent search+insert.
114 vectors: RwLock<HashMap<CollectiveId, HnswIndex>>,
115
116 /// Per-collective HNSW vector indexes for insight semantic search.
117 ///
118 /// Separate from `vectors` to prevent ID collisions between experiences
119 /// and insights. Uses InsightId→ExperienceId byte conversion for the
120 /// HNSW API (safe because indexes are isolated per collective).
121 insight_vectors: RwLock<HashMap<CollectiveId, HnswIndex>>,
122
123 /// Watch service for real-time experience change notifications.
124 ///
125 /// Arc-wrapped because [`WatchStream`] holds a weak reference for
126 /// cleanup on drop.
127 watch: Arc<WatchService>,
128}
129
130impl std::fmt::Debug for PulseDB {
131 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132 let vector_count = self.vectors.read().map(|v| v.len()).unwrap_or(0);
133 let insight_vector_count = self.insight_vectors.read().map(|v| v.len()).unwrap_or(0);
134 f.debug_struct("PulseDB")
135 .field("config", &self.config)
136 .field("embedding_dimension", &self.embedding_dimension())
137 .field("vector_indexes", &vector_count)
138 .field("insight_vector_indexes", &insight_vector_count)
139 .finish_non_exhaustive()
140 }
141}
142
143impl PulseDB {
144 /// Opens or creates a PulseDB database at the specified path.
145 ///
146 /// If the database doesn't exist, it will be created with the given
147 /// configuration. If it exists, the configuration will be validated
148 /// against the stored settings (e.g., embedding dimension must match).
149 ///
150 /// # Arguments
151 ///
152 /// * `path` - Path to the database file (created if it doesn't exist)
153 /// * `config` - Configuration options for the database
154 ///
155 /// # Errors
156 ///
157 /// Returns an error if:
158 /// - Configuration is invalid (see [`Config::validate`])
159 /// - Database file is corrupted
160 /// - Database is locked by another process
161 /// - Schema version doesn't match (needs migration)
162 /// - Embedding dimension doesn't match existing database
163 ///
164 /// # Example
165 ///
166 /// ```rust
167 /// # fn main() -> pulsedb::Result<()> {
168 /// # let dir = tempfile::tempdir().unwrap();
169 /// use pulsedb::{PulseDB, Config, EmbeddingDimension};
170 ///
171 /// // Open with default configuration
172 /// let db = PulseDB::open(dir.path().join("default.db"), Config::default())?;
173 /// # drop(db);
174 ///
175 /// // Open with custom embedding dimension
176 /// let db = PulseDB::open(dir.path().join("custom.db"), Config {
177 /// embedding_dimension: EmbeddingDimension::D768,
178 /// ..Default::default()
179 /// })?;
180 /// # Ok(())
181 /// # }
182 /// ```
183 #[instrument(skip(config), fields(path = %path.as_ref().display()))]
184 pub fn open(path: impl AsRef<Path>, config: Config) -> Result<Self> {
185 // Validate configuration first
186 config.validate().map_err(PulseDBError::from)?;
187
188 info!("Opening PulseDB");
189
190 // Open storage engine
191 let storage = open_storage(&path, &config)?;
192
193 // Create embedding service
194 let embedding = create_embedding_service(&config)?;
195
196 // Load or rebuild HNSW indexes for all existing collectives
197 let vectors = Self::load_all_indexes(&*storage, &config)?;
198 let insight_vectors = Self::load_all_insight_indexes(&*storage, &config)?;
199
200 info!(
201 dimension = config.embedding_dimension.size(),
202 sync_mode = ?config.sync_mode,
203 collectives = vectors.len(),
204 "PulseDB opened successfully"
205 );
206
207 let watch = Arc::new(WatchService::new(
208 config.watch.buffer_size,
209 config.watch.in_process,
210 ));
211
212 Ok(Self {
213 storage,
214 embedding,
215 config,
216 vectors: RwLock::new(vectors),
217 insight_vectors: RwLock::new(insight_vectors),
218 watch,
219 })
220 }
221
222 /// Closes the database, flushing all pending writes.
223 ///
224 /// This method consumes the `PulseDB` instance, ensuring it cannot
225 /// be used after closing. The underlying storage engine flushes all
226 /// buffered data to disk.
227 ///
228 /// # Errors
229 ///
230 /// Returns an error if the storage backend reports a flush failure.
231 /// Note: the current redb backend flushes durably on drop, so this
232 /// always returns `Ok(())` in practice.
233 ///
234 /// # Example
235 ///
236 /// ```rust
237 /// # fn main() -> pulsedb::Result<()> {
238 /// # let dir = tempfile::tempdir().unwrap();
239 /// use pulsedb::{PulseDB, Config};
240 ///
241 /// let db = PulseDB::open(dir.path().join("test.db"), Config::default())?;
242 /// // ... use the database ...
243 /// db.close()?; // db is consumed here
244 /// // db.something() // Compile error: db was moved
245 /// # Ok(())
246 /// # }
247 /// ```
248 #[instrument(skip(self))]
249 pub fn close(self) -> Result<()> {
250 info!("Closing PulseDB");
251
252 // Persist HNSW indexes BEFORE closing storage.
253 // If HNSW save fails, storage is still open for potential recovery.
254 // On next open(), stale/missing HNSW files trigger a rebuild from redb.
255 if let Some(hnsw_dir) = self.hnsw_dir() {
256 // Experience HNSW indexes
257 let vectors = self
258 .vectors
259 .read()
260 .map_err(|_| PulseDBError::vector("Vectors lock poisoned during close"))?;
261 for (collective_id, index) in vectors.iter() {
262 if let Err(e) = index.save_to_dir(&hnsw_dir, &collective_id.to_string()) {
263 warn!(
264 collective = %collective_id,
265 error = %e,
266 "Failed to save HNSW index (will rebuild on next open)"
267 );
268 }
269 }
270 drop(vectors);
271
272 // Insight HNSW indexes (separate files with _insights suffix)
273 let insight_vectors = self
274 .insight_vectors
275 .read()
276 .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned during close"))?;
277 for (collective_id, index) in insight_vectors.iter() {
278 let name = format!("{}_insights", collective_id);
279 if let Err(e) = index.save_to_dir(&hnsw_dir, &name) {
280 warn!(
281 collective = %collective_id,
282 error = %e,
283 "Failed to save insight HNSW index (will rebuild on next open)"
284 );
285 }
286 }
287 }
288
289 // Close storage (flushes pending writes)
290 self.storage.close()?;
291
292 info!("PulseDB closed successfully");
293 Ok(())
294 }
295
296 /// Returns a reference to the database configuration.
297 ///
298 /// This is the configuration that was used to open the database.
299 /// Note that some settings (like embedding dimension) are locked
300 /// on database creation and cannot be changed.
301 #[inline]
302 pub fn config(&self) -> &Config {
303 &self.config
304 }
305
306 /// Returns the database metadata.
307 ///
308 /// Metadata includes schema version, embedding dimension, and timestamps
309 /// for when the database was created and last opened.
310 #[inline]
311 pub fn metadata(&self) -> &DatabaseMetadata {
312 self.storage.metadata()
313 }
314
315 /// Returns the embedding dimension configured for this database.
316 ///
317 /// All embeddings stored in this database must have exactly this
318 /// many dimensions.
319 #[inline]
320 pub fn embedding_dimension(&self) -> usize {
321 self.config.embedding_dimension.size()
322 }
323
324 // =========================================================================
325 // Internal Accessors (for use by feature modules)
326 // =========================================================================
327
328 /// Returns a reference to the storage engine.
329 ///
330 /// This is for internal use by other PulseDB modules.
331 #[inline]
332 #[allow(dead_code)] // Will be used by search (Phase 2) and other modules
333 pub(crate) fn storage(&self) -> &dyn StorageEngine {
334 self.storage.as_ref()
335 }
336
337 /// Returns a reference to the embedding service.
338 ///
339 /// This is for internal use by other PulseDB modules.
340 #[inline]
341 #[allow(dead_code)] // Will be used by search (Phase 2) and other modules
342 pub(crate) fn embedding(&self) -> &dyn EmbeddingService {
343 self.embedding.as_ref()
344 }
345
346 // =========================================================================
347 // HNSW Index Lifecycle
348 // =========================================================================
349
350 /// Returns the directory for HNSW index files.
351 ///
352 /// Derives `{db_path}.hnsw/` from the storage path. Returns `None` if
353 /// the storage has no file path (e.g., in-memory tests).
354 fn hnsw_dir(&self) -> Option<PathBuf> {
355 self.storage.path().map(|p| {
356 let mut hnsw_path = p.as_os_str().to_owned();
357 hnsw_path.push(".hnsw");
358 PathBuf::from(hnsw_path)
359 })
360 }
361
362 /// Loads or rebuilds HNSW indexes for all existing collectives.
363 ///
364 /// For each collective in storage:
365 /// 1. Try loading metadata from `.hnsw.meta` file
366 /// 2. Rebuild the graph from redb embeddings (always, since we can't
367 /// load the graph due to hnsw_rs lifetime constraints)
368 /// 3. Restore deleted set from metadata if available
369 fn load_all_indexes(
370 storage: &dyn StorageEngine,
371 config: &Config,
372 ) -> Result<HashMap<CollectiveId, HnswIndex>> {
373 let collectives = storage.list_collectives()?;
374 let mut vectors = HashMap::with_capacity(collectives.len());
375
376 let hnsw_dir = storage.path().map(|p| {
377 let mut hnsw_path = p.as_os_str().to_owned();
378 hnsw_path.push(".hnsw");
379 PathBuf::from(hnsw_path)
380 });
381
382 for collective in &collectives {
383 let dimension = collective.embedding_dimension as usize;
384
385 // List all experience IDs in this collective
386 let exp_ids = storage.list_experience_ids_in_collective(collective.id)?;
387
388 // Load embeddings from redb (source of truth)
389 let mut embeddings = Vec::with_capacity(exp_ids.len());
390 for exp_id in &exp_ids {
391 if let Some(embedding) = storage.get_embedding(*exp_id)? {
392 embeddings.push((*exp_id, embedding));
393 }
394 }
395
396 // Try loading metadata (for deleted set and ID mappings)
397 let metadata = hnsw_dir
398 .as_ref()
399 .and_then(|dir| HnswIndex::load_metadata(dir, &collective.id.to_string()).ok())
400 .flatten();
401
402 // Rebuild the HNSW graph from embeddings
403 let index = if embeddings.is_empty() {
404 HnswIndex::new(dimension, &config.hnsw)
405 } else {
406 let start = std::time::Instant::now();
407 let idx = HnswIndex::rebuild_from_embeddings(dimension, &config.hnsw, embeddings)?;
408 info!(
409 collective = %collective.id,
410 vectors = idx.active_count(),
411 elapsed_ms = start.elapsed().as_millis() as u64,
412 "Rebuilt HNSW index from redb embeddings"
413 );
414 idx
415 };
416
417 // Restore deleted set from metadata if available
418 if let Some(meta) = metadata {
419 index.restore_deleted_set(&meta.deleted)?;
420 }
421
422 vectors.insert(collective.id, index);
423 }
424
425 Ok(vectors)
426 }
427
428 /// Loads or rebuilds insight HNSW indexes for all existing collectives.
429 ///
430 /// For each collective, loads all insights from storage and rebuilds
431 /// the HNSW graph from their inline embeddings. Uses InsightId→ExperienceId
432 /// byte conversion for the HNSW API.
433 fn load_all_insight_indexes(
434 storage: &dyn StorageEngine,
435 config: &Config,
436 ) -> Result<HashMap<CollectiveId, HnswIndex>> {
437 let collectives = storage.list_collectives()?;
438 let mut insight_vectors = HashMap::with_capacity(collectives.len());
439
440 let hnsw_dir = storage.path().map(|p| {
441 let mut hnsw_path = p.as_os_str().to_owned();
442 hnsw_path.push(".hnsw");
443 PathBuf::from(hnsw_path)
444 });
445
446 for collective in &collectives {
447 let dimension = collective.embedding_dimension as usize;
448
449 // List all insight IDs in this collective
450 let insight_ids = storage.list_insight_ids_in_collective(collective.id)?;
451
452 // Load insights and extract embeddings (converting InsightId → ExperienceId)
453 let mut embeddings = Vec::with_capacity(insight_ids.len());
454 for insight_id in &insight_ids {
455 if let Some(insight) = storage.get_insight(*insight_id)? {
456 let exp_id = ExperienceId::from_bytes(*insight_id.as_bytes());
457 embeddings.push((exp_id, insight.embedding));
458 }
459 }
460
461 // Try loading metadata (for deleted set)
462 let name = format!("{}_insights", collective.id);
463 let metadata = hnsw_dir
464 .as_ref()
465 .and_then(|dir| HnswIndex::load_metadata(dir, &name).ok())
466 .flatten();
467
468 // Rebuild HNSW graph from embeddings
469 let index = if embeddings.is_empty() {
470 HnswIndex::new(dimension, &config.hnsw)
471 } else {
472 let start = std::time::Instant::now();
473 let idx = HnswIndex::rebuild_from_embeddings(dimension, &config.hnsw, embeddings)?;
474 info!(
475 collective = %collective.id,
476 insights = idx.active_count(),
477 elapsed_ms = start.elapsed().as_millis() as u64,
478 "Rebuilt insight HNSW index from stored insights"
479 );
480 idx
481 };
482
483 // Restore deleted set from metadata if available
484 if let Some(meta) = metadata {
485 index.restore_deleted_set(&meta.deleted)?;
486 }
487
488 insight_vectors.insert(collective.id, index);
489 }
490
491 Ok(insight_vectors)
492 }
493
494 /// Executes a closure with the HNSW index for a collective.
495 ///
496 /// This is the primary accessor for vector search operations (used by
497 /// `search_similar()`). The closure runs while the outer RwLock guard
498 /// is held (read lock), so the HnswIndex reference stays valid.
499 /// Returns `None` if no index exists for the collective.
500 #[doc(hidden)]
501 pub fn with_vector_index<F, R>(&self, collective_id: CollectiveId, f: F) -> Result<Option<R>>
502 where
503 F: FnOnce(&HnswIndex) -> Result<R>,
504 {
505 let vectors = self
506 .vectors
507 .read()
508 .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?;
509 match vectors.get(&collective_id) {
510 Some(index) => Ok(Some(f(index)?)),
511 None => Ok(None),
512 }
513 }
514
515 // =========================================================================
516 // Test Helpers
517 // =========================================================================
518
519 /// Returns a reference to the storage engine for integration testing.
520 ///
521 /// This method is intentionally hidden from documentation. It provides
522 /// test-only access to the storage layer for verifying ACID guarantees
523 /// and crash recovery. Production code should use the public PulseDB API.
524 #[doc(hidden)]
525 #[inline]
526 pub fn storage_for_test(&self) -> &dyn StorageEngine {
527 self.storage.as_ref()
528 }
529
530 /// Returns true if this database is in read-only mode.
531 pub fn is_read_only(&self) -> bool {
532 self.config.read_only
533 }
534
535 /// Checks if the database is read-only and returns an error if so.
536 #[inline]
537 fn check_writable(&self) -> Result<()> {
538 if self.config.read_only {
539 return Err(PulseDBError::ReadOnly);
540 }
541 Ok(())
542 }
543
544 // =========================================================================
545 // Collective Management (E1-S02)
546 // =========================================================================
547
548 /// Creates a new collective with the given name.
549 ///
550 /// The collective's embedding dimension is locked to the database's
551 /// configured dimension at creation time.
552 ///
553 /// # Arguments
554 ///
555 /// * `name` - Human-readable name (1-255 characters, not whitespace-only)
556 ///
557 /// # Errors
558 ///
559 /// Returns a validation error if the name is empty, whitespace-only,
560 /// or exceeds 255 characters.
561 ///
562 /// # Example
563 ///
564 /// ```rust
565 /// # fn main() -> pulsedb::Result<()> {
566 /// # let dir = tempfile::tempdir().unwrap();
567 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
568 /// let id = db.create_collective("my-project")?;
569 /// # Ok(())
570 /// # }
571 /// ```
572 #[instrument(skip(self))]
573 pub fn create_collective(&self, name: &str) -> Result<CollectiveId> {
574 self.check_writable()?;
575 validate_collective_name(name)?;
576
577 let dimension = self.config.embedding_dimension.size() as u16;
578 let collective = Collective::new(name, dimension);
579 let id = collective.id;
580
581 // Persist to redb first (source of truth)
582 self.storage.save_collective(&collective)?;
583
584 // Create empty HNSW indexes for this collective
585 let exp_index = HnswIndex::new(dimension as usize, &self.config.hnsw);
586 let insight_index = HnswIndex::new(dimension as usize, &self.config.hnsw);
587 self.vectors
588 .write()
589 .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?
590 .insert(id, exp_index);
591 self.insight_vectors
592 .write()
593 .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?
594 .insert(id, insight_index);
595
596 info!(id = %id, name = %name, "Collective created");
597 Ok(id)
598 }
599
600 /// Creates a new collective with an owner for multi-tenancy.
601 ///
602 /// Same as [`create_collective`](Self::create_collective) but assigns
603 /// an owner ID, enabling filtering with
604 /// [`list_collectives_by_owner`](Self::list_collectives_by_owner).
605 ///
606 /// # Arguments
607 ///
608 /// * `name` - Human-readable name (1-255 characters)
609 /// * `owner_id` - Owner identifier (must not be empty)
610 ///
611 /// # Errors
612 ///
613 /// Returns a validation error if the name or owner_id is invalid.
614 #[instrument(skip(self))]
615 pub fn create_collective_with_owner(&self, name: &str, owner_id: &str) -> Result<CollectiveId> {
616 self.check_writable()?;
617 validate_collective_name(name)?;
618
619 if owner_id.is_empty() {
620 return Err(PulseDBError::from(
621 crate::error::ValidationError::required_field("owner_id"),
622 ));
623 }
624
625 let dimension = self.config.embedding_dimension.size() as u16;
626 let collective = Collective::with_owner(name, owner_id, dimension);
627 let id = collective.id;
628
629 // Persist to redb first (source of truth)
630 self.storage.save_collective(&collective)?;
631
632 // Create empty HNSW indexes for this collective
633 let exp_index = HnswIndex::new(dimension as usize, &self.config.hnsw);
634 let insight_index = HnswIndex::new(dimension as usize, &self.config.hnsw);
635 self.vectors
636 .write()
637 .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?
638 .insert(id, exp_index);
639 self.insight_vectors
640 .write()
641 .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?
642 .insert(id, insight_index);
643
644 info!(id = %id, name = %name, owner = %owner_id, "Collective created with owner");
645 Ok(id)
646 }
647
648 /// Returns a collective by ID, or `None` if not found.
649 ///
650 /// # Example
651 ///
652 /// ```rust
653 /// # fn main() -> pulsedb::Result<()> {
654 /// # let dir = tempfile::tempdir().unwrap();
655 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
656 /// # let id = db.create_collective("example")?;
657 /// if let Some(collective) = db.get_collective(id)? {
658 /// println!("Found: {}", collective.name);
659 /// }
660 /// # Ok(())
661 /// # }
662 /// ```
663 #[instrument(skip(self))]
664 pub fn get_collective(&self, id: CollectiveId) -> Result<Option<Collective>> {
665 self.storage.get_collective(id)
666 }
667
668 /// Lists all collectives in the database.
669 ///
670 /// Returns an empty vector if no collectives exist.
671 pub fn list_collectives(&self) -> Result<Vec<Collective>> {
672 self.storage.list_collectives()
673 }
674
675 /// Lists collectives filtered by owner ID.
676 ///
677 /// Returns only collectives whose `owner_id` matches the given value.
678 /// Returns an empty vector if no matching collectives exist.
679 pub fn list_collectives_by_owner(&self, owner_id: &str) -> Result<Vec<Collective>> {
680 let all = self.storage.list_collectives()?;
681 Ok(all
682 .into_iter()
683 .filter(|c| c.owner_id.as_deref() == Some(owner_id))
684 .collect())
685 }
686
687 /// Returns statistics for a collective.
688 ///
689 /// # Errors
690 ///
691 /// Returns [`NotFoundError::Collective`] if the collective doesn't exist.
692 #[instrument(skip(self))]
693 pub fn get_collective_stats(&self, id: CollectiveId) -> Result<CollectiveStats> {
694 // Verify collective exists
695 self.storage
696 .get_collective(id)?
697 .ok_or_else(|| PulseDBError::from(NotFoundError::collective(id)))?;
698
699 let experience_count = self.storage.count_experiences_in_collective(id)?;
700
701 Ok(CollectiveStats {
702 experience_count,
703 storage_bytes: 0,
704 oldest_experience: None,
705 newest_experience: None,
706 })
707 }
708
709 /// Deletes a collective and all its associated data.
710 ///
711 /// Performs cascade deletion: removes all experiences belonging to the
712 /// collective before removing the collective record itself.
713 ///
714 /// # Errors
715 ///
716 /// Returns [`NotFoundError::Collective`] if the collective doesn't exist.
717 ///
718 /// # Example
719 ///
720 /// ```rust
721 /// # fn main() -> pulsedb::Result<()> {
722 /// # let dir = tempfile::tempdir().unwrap();
723 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
724 /// # let collective_id = db.create_collective("to-delete")?;
725 /// db.delete_collective(collective_id)?;
726 /// assert!(db.get_collective(collective_id)?.is_none());
727 /// # Ok(())
728 /// # }
729 /// ```
730 #[instrument(skip(self))]
731 pub fn delete_collective(&self, id: CollectiveId) -> Result<()> {
732 self.check_writable()?;
733 // Verify collective exists
734 self.storage
735 .get_collective(id)?
736 .ok_or_else(|| PulseDBError::from(NotFoundError::collective(id)))?;
737
738 // Cascade: delete all experiences for this collective
739 let deleted_count = self.storage.delete_experiences_by_collective(id)?;
740 if deleted_count > 0 {
741 info!(count = deleted_count, "Cascade-deleted experiences");
742 }
743
744 // Cascade: delete all insights for this collective
745 let deleted_insights = self.storage.delete_insights_by_collective(id)?;
746 if deleted_insights > 0 {
747 info!(count = deleted_insights, "Cascade-deleted insights");
748 }
749
750 // Cascade: delete all activities for this collective
751 let deleted_activities = self.storage.delete_activities_by_collective(id)?;
752 if deleted_activities > 0 {
753 info!(count = deleted_activities, "Cascade-deleted activities");
754 }
755
756 // Delete the collective record from storage
757 self.storage.delete_collective(id)?;
758
759 // Remove HNSW indexes from memory
760 self.vectors
761 .write()
762 .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?
763 .remove(&id);
764 self.insight_vectors
765 .write()
766 .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?
767 .remove(&id);
768
769 // Remove HNSW files from disk (non-fatal if fails)
770 if let Some(hnsw_dir) = self.hnsw_dir() {
771 if let Err(e) = HnswIndex::remove_files(&hnsw_dir, &id.to_string()) {
772 warn!(
773 collective = %id,
774 error = %e,
775 "Failed to remove experience HNSW files (non-fatal)"
776 );
777 }
778 let insight_name = format!("{}_insights", id);
779 if let Err(e) = HnswIndex::remove_files(&hnsw_dir, &insight_name) {
780 warn!(
781 collective = %id,
782 error = %e,
783 "Failed to remove insight HNSW files (non-fatal)"
784 );
785 }
786 }
787
788 info!(id = %id, "Collective deleted");
789 Ok(())
790 }
791
792 // =========================================================================
793 // Experience CRUD (E1-S03)
794 // =========================================================================
795
796 /// Records a new experience in the database.
797 ///
798 /// This is the primary method for storing agent-learned knowledge. The method:
799 /// 1. Validates the input (content, scores, tags, embedding)
800 /// 2. Verifies the collective exists
801 /// 3. Resolves the embedding (generates if Builtin, requires if External)
802 /// 4. Stores the experience atomically across 4 tables
803 ///
804 /// # Arguments
805 ///
806 /// * `exp` - The experience to record (see [`NewExperience`])
807 ///
808 /// # Errors
809 ///
810 /// - [`ValidationError`](crate::ValidationError) if input is invalid
811 /// - [`NotFoundError::Collective`] if the collective doesn't exist
812 /// - [`PulseDBError::Embedding`] if embedding generation fails (Builtin mode)
813 #[instrument(skip(self, exp), fields(collective_id = %exp.collective_id))]
814 pub fn record_experience(&self, exp: NewExperience) -> Result<ExperienceId> {
815 self.check_writable()?;
816 let is_external = matches!(self.config.embedding_provider, EmbeddingProvider::External);
817
818 // Verify collective exists and get its dimension
819 let collective = self
820 .storage
821 .get_collective(exp.collective_id)?
822 .ok_or_else(|| PulseDBError::from(NotFoundError::collective(exp.collective_id)))?;
823
824 // Validate input
825 validate_new_experience(&exp, collective.embedding_dimension, is_external)?;
826
827 // Resolve embedding
828 let embedding = match exp.embedding {
829 Some(emb) => emb,
830 None => {
831 // Builtin mode: generate embedding from content
832 self.embedding.embed(&exp.content)?
833 }
834 };
835
836 // Clone embedding for HNSW insertion (~1.5KB for 384d, negligible vs I/O)
837 let embedding_for_hnsw = embedding.clone();
838 let collective_id = exp.collective_id;
839
840 // Construct the full experience record
841 let experience = Experience {
842 id: ExperienceId::new(),
843 collective_id,
844 content: exp.content,
845 embedding,
846 experience_type: exp.experience_type,
847 importance: exp.importance,
848 confidence: exp.confidence,
849 applications: 0,
850 domain: exp.domain,
851 related_files: exp.related_files,
852 source_agent: exp.source_agent,
853 source_task: exp.source_task,
854 timestamp: Timestamp::now(),
855 archived: false,
856 };
857
858 let id = experience.id;
859
860 // Write to redb FIRST (source of truth). If crash happens after
861 // this but before HNSW insert, rebuild on next open will include it.
862 self.storage.save_experience(&experience)?;
863
864 // Insert into HNSW index (derived structure)
865 let vectors = self
866 .vectors
867 .read()
868 .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?;
869 if let Some(index) = vectors.get(&collective_id) {
870 index.insert_experience(id, &embedding_for_hnsw)?;
871 }
872
873 // Emit watch event after both storage and HNSW succeed
874 self.watch.emit(
875 WatchEvent {
876 experience_id: id,
877 collective_id,
878 event_type: WatchEventType::Created,
879 timestamp: experience.timestamp,
880 experience: Some(experience.clone()),
881 },
882 &experience,
883 )?;
884
885 info!(id = %id, "Experience recorded");
886 Ok(id)
887 }
888
889 /// Retrieves an experience by ID, including its embedding.
890 ///
891 /// Returns `None` if no experience with the given ID exists.
892 #[instrument(skip(self))]
893 pub fn get_experience(&self, id: ExperienceId) -> Result<Option<Experience>> {
894 self.storage.get_experience(id)
895 }
896
897 /// Updates mutable fields of an experience.
898 ///
899 /// Only fields set to `Some(...)` in the update are changed.
900 /// Content and embedding are immutable — create a new experience instead.
901 ///
902 /// # Errors
903 ///
904 /// - [`ValidationError`](crate::ValidationError) if updated values are invalid
905 /// - [`NotFoundError::Experience`] if the experience doesn't exist
906 #[instrument(skip(self, update))]
907 pub fn update_experience(&self, id: ExperienceId, update: ExperienceUpdate) -> Result<()> {
908 self.check_writable()?;
909 validate_experience_update(&update)?;
910
911 let updated = self.storage.update_experience(id, &update)?;
912 if !updated {
913 return Err(PulseDBError::from(NotFoundError::experience(id)));
914 }
915
916 // Emit watch event (fetch experience for collective_id + filter matching)
917 if self.watch.has_subscribers() {
918 if let Ok(Some(exp)) = self.storage.get_experience(id) {
919 let event_type = if update.archived == Some(true) {
920 WatchEventType::Archived
921 } else {
922 WatchEventType::Updated
923 };
924 self.watch.emit(
925 WatchEvent {
926 experience_id: id,
927 collective_id: exp.collective_id,
928 event_type,
929 timestamp: Timestamp::now(),
930 experience: Some(exp.clone()),
931 },
932 &exp,
933 )?;
934 }
935 }
936
937 info!(id = %id, "Experience updated");
938 Ok(())
939 }
940
941 /// Archives an experience (soft-delete).
942 ///
943 /// Archived experiences remain in storage but are excluded from search
944 /// results. Use [`unarchive_experience`](Self::unarchive_experience) to restore.
945 ///
946 /// # Errors
947 ///
948 /// Returns [`NotFoundError::Experience`] if the experience doesn't exist.
949 #[instrument(skip(self))]
950 pub fn archive_experience(&self, id: ExperienceId) -> Result<()> {
951 self.check_writable()?;
952 self.update_experience(
953 id,
954 ExperienceUpdate {
955 archived: Some(true),
956 ..Default::default()
957 },
958 )
959 }
960
961 /// Restores an archived experience.
962 ///
963 /// The experience will once again appear in search results.
964 ///
965 /// # Errors
966 ///
967 /// Returns [`NotFoundError::Experience`] if the experience doesn't exist.
968 #[instrument(skip(self))]
969 pub fn unarchive_experience(&self, id: ExperienceId) -> Result<()> {
970 self.check_writable()?;
971 self.update_experience(
972 id,
973 ExperienceUpdate {
974 archived: Some(false),
975 ..Default::default()
976 },
977 )
978 }
979
980 /// Permanently deletes an experience and its embedding.
981 ///
982 /// This removes the experience from all tables and indices.
983 /// Unlike archiving, this is irreversible.
984 ///
985 /// # Errors
986 ///
987 /// Returns [`NotFoundError::Experience`] if the experience doesn't exist.
988 #[instrument(skip(self))]
989 pub fn delete_experience(&self, id: ExperienceId) -> Result<()> {
990 self.check_writable()?;
991 // Read experience first to get collective_id for HNSW lookup.
992 // This adds one extra read, but delete is not a hot path.
993 let experience = self
994 .storage
995 .get_experience(id)?
996 .ok_or_else(|| PulseDBError::from(NotFoundError::experience(id)))?;
997
998 // Cascade-delete any relations involving this experience.
999 // Done before experience deletion so we can still look up relation data.
1000 let rel_count = self.storage.delete_relations_for_experience(id)?;
1001 if rel_count > 0 {
1002 info!(
1003 count = rel_count,
1004 "Cascade-deleted relations for experience"
1005 );
1006 }
1007
1008 // Delete from redb FIRST (source of truth). If crash happens after
1009 // this but before HNSW soft-delete, on reopen the experience won't be
1010 // loaded from redb, so it's automatically excluded from the rebuilt index.
1011 self.storage.delete_experience(id)?;
1012
1013 // Soft-delete from HNSW index (mark as deleted, not removed from graph).
1014 // This takes effect immediately for the current session's searches.
1015 let vectors = self
1016 .vectors
1017 .read()
1018 .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?;
1019 if let Some(index) = vectors.get(&experience.collective_id) {
1020 index.delete_experience(id)?;
1021 }
1022
1023 // Emit watch event after storage + HNSW deletion
1024 self.watch.emit(
1025 WatchEvent {
1026 experience_id: id,
1027 collective_id: experience.collective_id,
1028 event_type: WatchEventType::Deleted,
1029 timestamp: Timestamp::now(),
1030 experience: None, // Deleted — no data to include
1031 },
1032 &experience,
1033 )?;
1034
1035 info!(id = %id, "Experience deleted");
1036 Ok(())
1037 }
1038
1039 /// Reinforces an experience by incrementing its application count.
1040 ///
1041 /// Each call atomically increments the `applications` counter by 1.
1042 /// Returns the new application count.
1043 ///
1044 /// # Errors
1045 ///
1046 /// Returns [`NotFoundError::Experience`] if the experience doesn't exist.
1047 #[instrument(skip(self))]
1048 pub fn reinforce_experience(&self, id: ExperienceId) -> Result<u32> {
1049 self.check_writable()?;
1050 let new_count = self
1051 .storage
1052 .reinforce_experience(id)?
1053 .ok_or_else(|| PulseDBError::from(NotFoundError::experience(id)))?;
1054
1055 // Emit watch event (fetch experience for collective_id + filter matching)
1056 if self.watch.has_subscribers() {
1057 if let Ok(Some(exp)) = self.storage.get_experience(id) {
1058 self.watch.emit(
1059 WatchEvent {
1060 experience_id: id,
1061 collective_id: exp.collective_id,
1062 event_type: WatchEventType::Updated,
1063 timestamp: Timestamp::now(),
1064 experience: Some(exp.clone()),
1065 },
1066 &exp,
1067 )?;
1068 }
1069 }
1070
1071 info!(id = %id, applications = new_count, "Experience reinforced");
1072 Ok(new_count)
1073 }
1074
1075 // =========================================================================
1076 // Recent Experiences
1077 // =========================================================================
1078
1079 // =========================================================================
1080 // Paginated List Operations (PulseVision)
1081 // =========================================================================
1082
1083 /// Lists experiences in a collective with pagination.
1084 ///
1085 /// Returns full `Experience` records (including embeddings) ordered by
1086 /// timestamp. Use `offset` and `limit` for pagination.
1087 ///
1088 /// Designed for visualization tools (PulseVision) that need to enumerate
1089 /// the entire embedding space of a collective.
1090 #[instrument(skip(self))]
1091 pub fn list_experiences(
1092 &self,
1093 collective_id: CollectiveId,
1094 limit: usize,
1095 offset: usize,
1096 ) -> Result<Vec<Experience>> {
1097 let ids = self
1098 .storage
1099 .list_experience_ids_paginated(collective_id, limit, offset)?;
1100 let mut experiences = Vec::with_capacity(ids.len());
1101 for id in ids {
1102 if let Some(exp) = self.storage.get_experience(id)? {
1103 experiences.push(exp);
1104 }
1105 }
1106 Ok(experiences)
1107 }
1108
1109 /// Lists relations in a collective with pagination.
1110 #[instrument(skip(self))]
1111 pub fn list_relations(
1112 &self,
1113 collective_id: CollectiveId,
1114 limit: usize,
1115 offset: usize,
1116 ) -> Result<Vec<crate::relation::ExperienceRelation>> {
1117 self.storage
1118 .list_relations_in_collective(collective_id, limit, offset)
1119 }
1120
1121 /// Lists insights in a collective with pagination.
1122 ///
1123 /// Returns full `DerivedInsight` records including embeddings.
1124 #[instrument(skip(self))]
1125 pub fn list_insights(
1126 &self,
1127 collective_id: CollectiveId,
1128 limit: usize,
1129 offset: usize,
1130 ) -> Result<Vec<DerivedInsight>> {
1131 let ids = self
1132 .storage
1133 .list_insight_ids_paginated(collective_id, limit, offset)?;
1134 let mut insights = Vec::with_capacity(ids.len());
1135 for id in ids {
1136 if let Some(insight) = self.storage.get_insight(id)? {
1137 insights.push(insight);
1138 }
1139 }
1140 Ok(insights)
1141 }
1142
1143 /// Retrieves the most recent experiences in a collective.
1144 ///
1145 /// Returns full experiences ordered by timestamp (newest first).
1146 #[instrument(skip(self))]
1147 pub fn get_recent_experiences(
1148 &self,
1149 collective_id: CollectiveId,
1150 limit: usize,
1151 ) -> Result<Vec<Experience>> {
1152 self.get_recent_experiences_filtered(collective_id, limit, SearchFilter::default())
1153 }
1154
1155 /// Retrieves the most recent experiences in a collective with filtering.
1156 ///
1157 /// Like [`get_recent_experiences()`](Self::get_recent_experiences), but
1158 /// applies additional filters on domain, experience type, importance,
1159 /// confidence, and timestamp.
1160 ///
1161 /// Over-fetches from storage (2x `limit`) to account for entries removed
1162 /// by post-filtering, then truncates to the requested `limit`.
1163 ///
1164 /// # Arguments
1165 ///
1166 /// * `collective_id` - The collective to query
1167 /// * `limit` - Maximum number of experiences to return (1-1000)
1168 /// * `filter` - Filter criteria to apply
1169 ///
1170 /// # Errors
1171 ///
1172 /// - [`ValidationError::InvalidField`] if `limit` is 0 or > 1000
1173 /// - [`NotFoundError::Collective`] if the collective doesn't exist
1174 #[instrument(skip(self, filter))]
1175 pub fn get_recent_experiences_filtered(
1176 &self,
1177 collective_id: CollectiveId,
1178 limit: usize,
1179 filter: SearchFilter,
1180 ) -> Result<Vec<Experience>> {
1181 // Validate limit
1182 if limit == 0 || limit > 1000 {
1183 return Err(
1184 ValidationError::invalid_field("limit", "must be between 1 and 1000").into(),
1185 );
1186 }
1187
1188 // Verify collective exists
1189 self.storage
1190 .get_collective(collective_id)?
1191 .ok_or_else(|| PulseDBError::from(NotFoundError::collective(collective_id)))?;
1192
1193 // Over-fetch IDs to account for post-filtering losses
1194 let over_fetch = limit.saturating_mul(2).min(2000);
1195 let recent_ids = self
1196 .storage
1197 .get_recent_experience_ids(collective_id, over_fetch)?;
1198
1199 // Load full experiences and apply filter
1200 let mut results = Vec::with_capacity(limit);
1201 for (exp_id, _timestamp) in recent_ids {
1202 if results.len() >= limit {
1203 break;
1204 }
1205
1206 if let Some(experience) = self.storage.get_experience(exp_id)? {
1207 if filter.matches(&experience) {
1208 results.push(experience);
1209 }
1210 }
1211 }
1212
1213 Ok(results)
1214 }
1215
1216 // =========================================================================
1217 // Similarity Search (E2-S02)
1218 // =========================================================================
1219
1220 /// Searches for experiences semantically similar to the query embedding.
1221 ///
1222 /// Uses the HNSW vector index for approximate nearest neighbor search,
1223 /// then fetches full experience records from storage. Archived experiences
1224 /// are excluded by default.
1225 ///
1226 /// Results are sorted by similarity descending (most similar first).
1227 /// Similarity is computed as `1.0 - cosine_distance`.
1228 ///
1229 /// # Arguments
1230 ///
1231 /// * `collective_id` - The collective to search within
1232 /// * `query` - Query embedding vector (must match collective's dimension)
1233 /// * `k` - Maximum number of results to return (1-1000)
1234 ///
1235 /// # Errors
1236 ///
1237 /// - [`ValidationError::InvalidField`] if `k` is 0 or > 1000
1238 /// - [`ValidationError::DimensionMismatch`] if `query.len()` doesn't match
1239 /// the collective's embedding dimension
1240 /// - [`NotFoundError::Collective`] if the collective doesn't exist
1241 ///
1242 /// # Example
1243 ///
1244 /// ```rust
1245 /// # fn main() -> pulsedb::Result<()> {
1246 /// # let dir = tempfile::tempdir().unwrap();
1247 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
1248 /// # let collective_id = db.create_collective("example")?;
1249 /// let query = vec![0.1f32; 384]; // Your query embedding
1250 /// let results = db.search_similar(collective_id, &query, 10)?;
1251 /// for result in &results {
1252 /// println!(
1253 /// "[{:.3}] {}",
1254 /// result.similarity, result.experience.content
1255 /// );
1256 /// }
1257 /// # Ok(())
1258 /// # }
1259 /// ```
1260 #[instrument(skip(self, query))]
1261 pub fn search_similar(
1262 &self,
1263 collective_id: CollectiveId,
1264 query: &[f32],
1265 k: usize,
1266 ) -> Result<Vec<SearchResult>> {
1267 self.search_similar_filtered(collective_id, query, k, SearchFilter::default())
1268 }
1269
1270 /// Searches for semantically similar experiences with additional filtering.
1271 ///
1272 /// Like [`search_similar()`](Self::search_similar), but applies additional
1273 /// filters on domain, experience type, importance, confidence, and timestamp.
1274 ///
1275 /// Over-fetches from the HNSW index (2x `k`) to account for entries removed
1276 /// by post-filtering, then truncates to the requested `k`.
1277 ///
1278 /// # Arguments
1279 ///
1280 /// * `collective_id` - The collective to search within
1281 /// * `query` - Query embedding vector (must match collective's dimension)
1282 /// * `k` - Maximum number of results to return (1-1000)
1283 /// * `filter` - Filter criteria to apply after vector search
1284 ///
1285 /// # Errors
1286 ///
1287 /// - [`ValidationError::InvalidField`] if `k` is 0 or > 1000
1288 /// - [`ValidationError::DimensionMismatch`] if `query.len()` doesn't match
1289 /// the collective's embedding dimension
1290 /// - [`NotFoundError::Collective`] if the collective doesn't exist
1291 ///
1292 /// # Example
1293 ///
1294 /// ```rust
1295 /// # fn main() -> pulsedb::Result<()> {
1296 /// # let dir = tempfile::tempdir().unwrap();
1297 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
1298 /// # let collective_id = db.create_collective("example")?;
1299 /// # let query_embedding = vec![0.1f32; 384];
1300 /// use pulsedb::SearchFilter;
1301 ///
1302 /// let filter = SearchFilter {
1303 /// domains: Some(vec!["rust".to_string()]),
1304 /// min_importance: Some(0.5),
1305 /// ..SearchFilter::default()
1306 /// };
1307 /// let results = db.search_similar_filtered(
1308 /// collective_id,
1309 /// &query_embedding,
1310 /// 10,
1311 /// filter,
1312 /// )?;
1313 /// # Ok(())
1314 /// # }
1315 /// ```
1316 #[instrument(skip(self, query, filter))]
1317 pub fn search_similar_filtered(
1318 &self,
1319 collective_id: CollectiveId,
1320 query: &[f32],
1321 k: usize,
1322 filter: SearchFilter,
1323 ) -> Result<Vec<SearchResult>> {
1324 // Validate k
1325 if k == 0 || k > 1000 {
1326 return Err(ValidationError::invalid_field("k", "must be between 1 and 1000").into());
1327 }
1328
1329 // Verify collective exists and check embedding dimension
1330 let collective = self
1331 .storage
1332 .get_collective(collective_id)?
1333 .ok_or_else(|| PulseDBError::from(NotFoundError::collective(collective_id)))?;
1334
1335 let expected_dim = collective.embedding_dimension as usize;
1336 if query.len() != expected_dim {
1337 return Err(ValidationError::dimension_mismatch(expected_dim, query.len()).into());
1338 }
1339
1340 // Over-fetch from HNSW to compensate for post-filtering losses
1341 let over_fetch = k.saturating_mul(2).min(2000);
1342 let ef_search = self.config.hnsw.ef_search;
1343
1344 // Search HNSW index — returns (ExperienceId, cosine_distance) sorted
1345 // by distance ascending (closest first)
1346 let candidates = self
1347 .with_vector_index(collective_id, |index| {
1348 index.search_experiences(query, over_fetch, ef_search)
1349 })?
1350 .unwrap_or_default();
1351
1352 // Fetch full experiences, apply filter, convert distance → similarity
1353 let mut results = Vec::with_capacity(k);
1354 for (exp_id, distance) in candidates {
1355 if results.len() >= k {
1356 break;
1357 }
1358
1359 if let Some(experience) = self.storage.get_experience(exp_id)? {
1360 if filter.matches(&experience) {
1361 results.push(SearchResult {
1362 experience,
1363 similarity: 1.0 - distance,
1364 });
1365 }
1366 }
1367 }
1368
1369 Ok(results)
1370 }
1371
1372 // =========================================================================
1373 // Experience Relations (E3-S01)
1374 // =========================================================================
1375
1376 /// Stores a new relation between two experiences.
1377 ///
1378 /// Relations are typed, directed edges connecting a source experience to a
1379 /// target experience. Both experiences must exist and belong to the same
1380 /// collective. Duplicate relations (same source, target, and type) are
1381 /// rejected.
1382 ///
1383 /// # Arguments
1384 ///
1385 /// * `relation` - The relation to create (source, target, type, strength)
1386 ///
1387 /// # Errors
1388 ///
1389 /// Returns an error if:
1390 /// - Source or target experience doesn't exist ([`NotFoundError::Experience`])
1391 /// - Experiences belong to different collectives ([`ValidationError::InvalidField`])
1392 /// - A relation with the same (source, target, type) already exists
1393 /// - Self-relation attempted (source == target)
1394 /// - Strength is out of range `[0.0, 1.0]`
1395 #[instrument(skip(self, relation))]
1396 pub fn store_relation(
1397 &self,
1398 relation: crate::relation::NewExperienceRelation,
1399 ) -> Result<crate::types::RelationId> {
1400 self.check_writable()?;
1401 use crate::relation::{validate_new_relation, ExperienceRelation};
1402 use crate::types::RelationId;
1403
1404 // Validate input fields (self-relation, strength bounds, metadata size)
1405 validate_new_relation(&relation)?;
1406
1407 // Load source and target experiences to verify existence
1408 let source = self
1409 .storage
1410 .get_experience(relation.source_id)?
1411 .ok_or_else(|| PulseDBError::from(NotFoundError::experience(relation.source_id)))?;
1412 let target = self
1413 .storage
1414 .get_experience(relation.target_id)?
1415 .ok_or_else(|| PulseDBError::from(NotFoundError::experience(relation.target_id)))?;
1416
1417 // Verify same collective
1418 if source.collective_id != target.collective_id {
1419 return Err(PulseDBError::from(ValidationError::invalid_field(
1420 "target_id",
1421 "source and target experiences must belong to the same collective",
1422 )));
1423 }
1424
1425 // Check for duplicate (same source, target, type)
1426 if self.storage.relation_exists(
1427 relation.source_id,
1428 relation.target_id,
1429 relation.relation_type,
1430 )? {
1431 return Err(PulseDBError::from(ValidationError::invalid_field(
1432 "relation_type",
1433 "a relation with this source, target, and type already exists",
1434 )));
1435 }
1436
1437 // Construct the full relation
1438 let id = RelationId::new();
1439 let full_relation = ExperienceRelation {
1440 id,
1441 source_id: relation.source_id,
1442 target_id: relation.target_id,
1443 relation_type: relation.relation_type,
1444 strength: relation.strength,
1445 metadata: relation.metadata,
1446 created_at: Timestamp::now(),
1447 };
1448
1449 self.storage.save_relation(&full_relation)?;
1450
1451 info!(
1452 id = %id,
1453 source = %relation.source_id,
1454 target = %relation.target_id,
1455 relation_type = ?full_relation.relation_type,
1456 "Relation stored"
1457 );
1458 Ok(id)
1459 }
1460
1461 /// Retrieves experiences related to the given experience.
1462 ///
1463 /// Returns pairs of `(Experience, ExperienceRelation)` based on the
1464 /// requested direction:
1465 /// - `Outgoing`: experiences that this experience points TO (as source)
1466 /// - `Incoming`: experiences that point TO this experience (as target)
1467 /// - `Both`: union of outgoing and incoming
1468 ///
1469 /// To filter by relation type, use
1470 /// [`get_related_experiences_filtered`](Self::get_related_experiences_filtered).
1471 ///
1472 /// Silently skips relations where the related experience no longer exists
1473 /// (orphan tolerance).
1474 ///
1475 /// # Errors
1476 ///
1477 /// Returns a storage error if the read transaction fails.
1478 #[instrument(skip(self))]
1479 pub fn get_related_experiences(
1480 &self,
1481 experience_id: ExperienceId,
1482 direction: crate::relation::RelationDirection,
1483 ) -> Result<Vec<(Experience, crate::relation::ExperienceRelation)>> {
1484 self.get_related_experiences_filtered(experience_id, direction, None)
1485 }
1486
1487 /// Retrieves experiences related to the given experience, with optional
1488 /// type filtering.
1489 ///
1490 /// Like [`get_related_experiences()`](Self::get_related_experiences), but
1491 /// accepts an optional [`RelationType`](crate::RelationType) filter.
1492 /// When `Some(rt)`, only relations matching that type are returned.
1493 ///
1494 /// # Arguments
1495 ///
1496 /// * `experience_id` - The experience to query relations for
1497 /// * `direction` - Which direction(s) to traverse
1498 /// * `relation_type` - If `Some`, only return relations of this type
1499 ///
1500 /// # Example
1501 ///
1502 /// ```rust
1503 /// # fn main() -> pulsedb::Result<()> {
1504 /// # let dir = tempfile::tempdir().unwrap();
1505 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
1506 /// # let cid = db.create_collective("example")?;
1507 /// # let exp_a = db.record_experience(pulsedb::NewExperience {
1508 /// # collective_id: cid,
1509 /// # content: "a".into(),
1510 /// # embedding: Some(vec![0.1f32; 384]),
1511 /// # ..Default::default()
1512 /// # })?;
1513 /// use pulsedb::{RelationType, RelationDirection};
1514 ///
1515 /// // Only "Supports" relations outgoing from exp_a
1516 /// let supports = db.get_related_experiences_filtered(
1517 /// exp_a,
1518 /// RelationDirection::Outgoing,
1519 /// Some(RelationType::Supports),
1520 /// )?;
1521 /// # Ok(())
1522 /// # }
1523 /// ```
1524 #[instrument(skip(self))]
1525 pub fn get_related_experiences_filtered(
1526 &self,
1527 experience_id: ExperienceId,
1528 direction: crate::relation::RelationDirection,
1529 relation_type: Option<crate::relation::RelationType>,
1530 ) -> Result<Vec<(Experience, crate::relation::ExperienceRelation)>> {
1531 use crate::relation::RelationDirection;
1532
1533 let mut results = Vec::new();
1534
1535 // Outgoing: this experience is the source → fetch target experiences
1536 if matches!(
1537 direction,
1538 RelationDirection::Outgoing | RelationDirection::Both
1539 ) {
1540 let rel_ids = self.storage.get_relation_ids_by_source(experience_id)?;
1541 for rel_id in rel_ids {
1542 if let Some(relation) = self.storage.get_relation(rel_id)? {
1543 if relation_type.is_some_and(|rt| rt != relation.relation_type) {
1544 continue;
1545 }
1546 if let Some(experience) = self.storage.get_experience(relation.target_id)? {
1547 results.push((experience, relation));
1548 }
1549 }
1550 }
1551 }
1552
1553 // Incoming: this experience is the target → fetch source experiences
1554 if matches!(
1555 direction,
1556 RelationDirection::Incoming | RelationDirection::Both
1557 ) {
1558 let rel_ids = self.storage.get_relation_ids_by_target(experience_id)?;
1559 for rel_id in rel_ids {
1560 if let Some(relation) = self.storage.get_relation(rel_id)? {
1561 if relation_type.is_some_and(|rt| rt != relation.relation_type) {
1562 continue;
1563 }
1564 if let Some(experience) = self.storage.get_experience(relation.source_id)? {
1565 results.push((experience, relation));
1566 }
1567 }
1568 }
1569 }
1570
1571 Ok(results)
1572 }
1573
1574 /// Retrieves a relation by ID.
1575 ///
1576 /// Returns `None` if no relation with the given ID exists.
1577 pub fn get_relation(
1578 &self,
1579 id: crate::types::RelationId,
1580 ) -> Result<Option<crate::relation::ExperienceRelation>> {
1581 self.storage.get_relation(id)
1582 }
1583
1584 /// Deletes a relation by ID.
1585 ///
1586 /// # Errors
1587 ///
1588 /// Returns [`NotFoundError::Relation`] if no relation with the given ID exists.
1589 #[instrument(skip(self))]
1590 pub fn delete_relation(&self, id: crate::types::RelationId) -> Result<()> {
1591 self.check_writable()?;
1592 let deleted = self.storage.delete_relation(id)?;
1593 if !deleted {
1594 return Err(PulseDBError::from(NotFoundError::relation(id)));
1595 }
1596 info!(id = %id, "Relation deleted");
1597 Ok(())
1598 }
1599
1600 // =========================================================================
1601 // Derived Insights (E3-S02)
1602 // =========================================================================
1603
1604 /// Stores a new derived insight.
1605 ///
1606 /// Creates a synthesized knowledge record from multiple source experiences.
1607 /// The method:
1608 /// 1. Validates the input (content, confidence, sources)
1609 /// 2. Verifies the collective exists
1610 /// 3. Verifies all source experiences exist and belong to the same collective
1611 /// 4. Resolves the embedding (generates if Builtin, requires if External)
1612 /// 5. Stores the insight with inline embedding
1613 /// 6. Inserts into the insight HNSW index
1614 ///
1615 /// # Arguments
1616 ///
1617 /// * `insight` - The insight to store (see [`NewDerivedInsight`])
1618 ///
1619 /// # Errors
1620 ///
1621 /// - [`ValidationError`](crate::ValidationError) if input is invalid
1622 /// - [`NotFoundError::Collective`] if the collective doesn't exist
1623 /// - [`NotFoundError::Experience`] if any source experience doesn't exist
1624 /// - [`ValidationError::InvalidField`] if source experiences belong to
1625 /// different collectives
1626 /// - [`ValidationError::DimensionMismatch`] if embedding dimension is wrong
1627 #[instrument(skip(self, insight), fields(collective_id = %insight.collective_id))]
1628 pub fn store_insight(&self, insight: NewDerivedInsight) -> Result<InsightId> {
1629 self.check_writable()?;
1630 let is_external = matches!(self.config.embedding_provider, EmbeddingProvider::External);
1631
1632 // Validate input fields
1633 validate_new_insight(&insight)?;
1634
1635 // Verify collective exists
1636 let collective = self
1637 .storage
1638 .get_collective(insight.collective_id)?
1639 .ok_or_else(|| PulseDBError::from(NotFoundError::collective(insight.collective_id)))?;
1640
1641 // Verify all source experiences exist and belong to this collective
1642 for source_id in &insight.source_experience_ids {
1643 let source_exp = self
1644 .storage
1645 .get_experience(*source_id)?
1646 .ok_or_else(|| PulseDBError::from(NotFoundError::experience(*source_id)))?;
1647 if source_exp.collective_id != insight.collective_id {
1648 return Err(PulseDBError::from(ValidationError::invalid_field(
1649 "source_experience_ids",
1650 format!(
1651 "experience {} belongs to collective {}, not {}",
1652 source_id, source_exp.collective_id, insight.collective_id
1653 ),
1654 )));
1655 }
1656 }
1657
1658 // Resolve embedding
1659 let embedding = match insight.embedding {
1660 Some(ref emb) => {
1661 // Validate dimension
1662 let expected_dim = collective.embedding_dimension as usize;
1663 if emb.len() != expected_dim {
1664 return Err(ValidationError::dimension_mismatch(expected_dim, emb.len()).into());
1665 }
1666 emb.clone()
1667 }
1668 None => {
1669 if is_external {
1670 return Err(PulseDBError::embedding(
1671 "embedding is required when using External embedding provider",
1672 ));
1673 }
1674 self.embedding.embed(&insight.content)?
1675 }
1676 };
1677
1678 let embedding_for_hnsw = embedding.clone();
1679 let now = Timestamp::now();
1680 let id = InsightId::new();
1681
1682 // Construct the full insight record
1683 let derived_insight = DerivedInsight {
1684 id,
1685 collective_id: insight.collective_id,
1686 content: insight.content,
1687 embedding,
1688 source_experience_ids: insight.source_experience_ids,
1689 insight_type: insight.insight_type,
1690 confidence: insight.confidence,
1691 domain: insight.domain,
1692 created_at: now,
1693 updated_at: now,
1694 };
1695
1696 // Write to redb FIRST (source of truth)
1697 self.storage.save_insight(&derived_insight)?;
1698
1699 // Insert into insight HNSW index (using InsightId→ExperienceId byte conversion)
1700 let exp_id = ExperienceId::from_bytes(*id.as_bytes());
1701 let insight_vectors = self
1702 .insight_vectors
1703 .read()
1704 .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?;
1705 if let Some(index) = insight_vectors.get(&insight.collective_id) {
1706 index.insert_experience(exp_id, &embedding_for_hnsw)?;
1707 }
1708
1709 info!(id = %id, "Insight stored");
1710 Ok(id)
1711 }
1712
1713 /// Retrieves a derived insight by ID.
1714 ///
1715 /// Returns `None` if no insight with the given ID exists.
1716 #[instrument(skip(self))]
1717 pub fn get_insight(&self, id: InsightId) -> Result<Option<DerivedInsight>> {
1718 self.storage.get_insight(id)
1719 }
1720
1721 /// Searches for insights semantically similar to the query embedding.
1722 ///
1723 /// Uses the insight-specific HNSW index for approximate nearest neighbor
1724 /// search, then fetches full insight records from storage.
1725 ///
1726 /// # Arguments
1727 ///
1728 /// * `collective_id` - The collective to search within
1729 /// * `query` - Query embedding vector (must match collective's dimension)
1730 /// * `k` - Maximum number of results to return
1731 ///
1732 /// # Errors
1733 ///
1734 /// - [`ValidationError::DimensionMismatch`] if `query.len()` doesn't match
1735 /// - [`NotFoundError::Collective`] if the collective doesn't exist
1736 #[instrument(skip(self, query))]
1737 pub fn get_insights(
1738 &self,
1739 collective_id: CollectiveId,
1740 query: &[f32],
1741 k: usize,
1742 ) -> Result<Vec<(DerivedInsight, f32)>> {
1743 // Verify collective exists and check embedding dimension
1744 let collective = self
1745 .storage
1746 .get_collective(collective_id)?
1747 .ok_or_else(|| PulseDBError::from(NotFoundError::collective(collective_id)))?;
1748
1749 let expected_dim = collective.embedding_dimension as usize;
1750 if query.len() != expected_dim {
1751 return Err(ValidationError::dimension_mismatch(expected_dim, query.len()).into());
1752 }
1753
1754 let ef_search = self.config.hnsw.ef_search;
1755
1756 // Search insight HNSW — returns (ExperienceId, distance) pairs
1757 let insight_vectors = self
1758 .insight_vectors
1759 .read()
1760 .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?;
1761
1762 let candidates = match insight_vectors.get(&collective_id) {
1763 Some(index) => index.search_experiences(query, k, ef_search)?,
1764 None => return Ok(vec![]),
1765 };
1766 drop(insight_vectors);
1767
1768 // Convert ExperienceId back to InsightId and fetch records
1769 let mut results = Vec::with_capacity(candidates.len());
1770 for (exp_id, distance) in candidates {
1771 let insight_id = InsightId::from_bytes(*exp_id.as_bytes());
1772 if let Some(insight) = self.storage.get_insight(insight_id)? {
1773 // Convert HNSW distance to similarity (1.0 - distance), matching search_similar pattern
1774 results.push((insight, 1.0 - distance));
1775 }
1776 }
1777
1778 Ok(results)
1779 }
1780
1781 /// Deletes a derived insight by ID.
1782 ///
1783 /// Removes the insight from storage and soft-deletes it from the HNSW index.
1784 ///
1785 /// # Errors
1786 ///
1787 /// Returns [`NotFoundError::Insight`] if no insight with the given ID exists.
1788 #[instrument(skip(self))]
1789 pub fn delete_insight(&self, id: InsightId) -> Result<()> {
1790 self.check_writable()?;
1791 // Read insight first to get collective_id for HNSW lookup
1792 let insight = self
1793 .storage
1794 .get_insight(id)?
1795 .ok_or_else(|| PulseDBError::from(NotFoundError::insight(id)))?;
1796
1797 // Delete from redb FIRST (source of truth)
1798 self.storage.delete_insight(id)?;
1799
1800 // Soft-delete from insight HNSW (using InsightId→ExperienceId byte conversion)
1801 let exp_id = ExperienceId::from_bytes(*id.as_bytes());
1802 let insight_vectors = self
1803 .insight_vectors
1804 .read()
1805 .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?;
1806 if let Some(index) = insight_vectors.get(&insight.collective_id) {
1807 index.delete_experience(exp_id)?;
1808 }
1809
1810 info!(id = %id, "Insight deleted");
1811 Ok(())
1812 }
1813
1814 // =========================================================================
1815 // Activity Tracking (E3-S03)
1816 // =========================================================================
1817
1818 /// Registers an agent's presence in a collective.
1819 ///
1820 /// Creates a new activity record or replaces an existing one for the
1821 /// same `(collective_id, agent_id)` pair (upsert semantics). Both
1822 /// `started_at` and `last_heartbeat` are set to `Timestamp::now()`.
1823 ///
1824 /// # Arguments
1825 ///
1826 /// * `activity` - The activity registration (see [`NewActivity`])
1827 ///
1828 /// # Errors
1829 ///
1830 /// - [`ValidationError`] if agent_id is empty or fields exceed size limits
1831 /// - [`NotFoundError::Collective`] if the collective doesn't exist
1832 ///
1833 /// # Example
1834 ///
1835 /// ```rust
1836 /// # fn main() -> pulsedb::Result<()> {
1837 /// # let dir = tempfile::tempdir().unwrap();
1838 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
1839 /// # let collective_id = db.create_collective("example")?;
1840 /// use pulsedb::NewActivity;
1841 ///
1842 /// db.register_activity(NewActivity {
1843 /// agent_id: "claude-opus".to_string(),
1844 /// collective_id,
1845 /// current_task: Some("Reviewing pull request".to_string()),
1846 /// context_summary: None,
1847 /// })?;
1848 /// # Ok(())
1849 /// # }
1850 /// ```
1851 #[instrument(skip(self, activity), fields(agent_id = %activity.agent_id, collective_id = %activity.collective_id))]
1852 pub fn register_activity(&self, activity: NewActivity) -> Result<()> {
1853 // Validate input
1854 validate_new_activity(&activity)?;
1855
1856 // Verify collective exists
1857 self.storage
1858 .get_collective(activity.collective_id)?
1859 .ok_or_else(|| PulseDBError::from(NotFoundError::collective(activity.collective_id)))?;
1860
1861 // Build stored activity with timestamps
1862 let now = Timestamp::now();
1863 let stored = Activity {
1864 agent_id: activity.agent_id,
1865 collective_id: activity.collective_id,
1866 current_task: activity.current_task,
1867 context_summary: activity.context_summary,
1868 started_at: now,
1869 last_heartbeat: now,
1870 };
1871
1872 self.storage.save_activity(&stored)?;
1873
1874 info!(
1875 agent_id = %stored.agent_id,
1876 collective_id = %stored.collective_id,
1877 "Activity registered"
1878 );
1879 Ok(())
1880 }
1881
1882 /// Updates an agent's heartbeat timestamp.
1883 ///
1884 /// Refreshes the `last_heartbeat` to `Timestamp::now()` without changing
1885 /// any other fields. The agent must have an existing activity registered.
1886 ///
1887 /// # Errors
1888 ///
1889 /// - [`NotFoundError::Activity`] if no activity exists for the agent/collective pair
1890 #[instrument(skip(self))]
1891 pub fn update_heartbeat(&self, agent_id: &str, collective_id: CollectiveId) -> Result<()> {
1892 self.check_writable()?;
1893 let mut activity = self
1894 .storage
1895 .get_activity(agent_id, collective_id)?
1896 .ok_or_else(|| {
1897 PulseDBError::from(NotFoundError::activity(format!(
1898 "{} in {}",
1899 agent_id, collective_id
1900 )))
1901 })?;
1902
1903 activity.last_heartbeat = Timestamp::now();
1904 self.storage.save_activity(&activity)?;
1905
1906 info!(agent_id = %agent_id, collective_id = %collective_id, "Heartbeat updated");
1907 Ok(())
1908 }
1909
1910 /// Ends an agent's activity in a collective.
1911 ///
1912 /// Removes the activity record. After calling this, the agent will no
1913 /// longer appear in `get_active_agents()` results.
1914 ///
1915 /// # Errors
1916 ///
1917 /// - [`NotFoundError::Activity`] if no activity exists for the agent/collective pair
1918 #[instrument(skip(self))]
1919 pub fn end_activity(&self, agent_id: &str, collective_id: CollectiveId) -> Result<()> {
1920 let deleted = self.storage.delete_activity(agent_id, collective_id)?;
1921
1922 if !deleted {
1923 return Err(PulseDBError::from(NotFoundError::activity(format!(
1924 "{} in {}",
1925 agent_id, collective_id
1926 ))));
1927 }
1928
1929 info!(agent_id = %agent_id, collective_id = %collective_id, "Activity ended");
1930 Ok(())
1931 }
1932
1933 /// Returns all active (non-stale) agents in a collective.
1934 ///
1935 /// Fetches all activities, filters out those whose `last_heartbeat` is
1936 /// older than `config.activity.stale_threshold`, and returns the rest
1937 /// sorted by `last_heartbeat` descending (most recently active first).
1938 ///
1939 /// # Errors
1940 ///
1941 /// - [`NotFoundError::Collective`] if the collective doesn't exist
1942 #[instrument(skip(self))]
1943 pub fn get_active_agents(&self, collective_id: CollectiveId) -> Result<Vec<Activity>> {
1944 // Verify collective exists
1945 self.storage
1946 .get_collective(collective_id)?
1947 .ok_or_else(|| PulseDBError::from(NotFoundError::collective(collective_id)))?;
1948
1949 let all_activities = self.storage.list_activities_in_collective(collective_id)?;
1950
1951 // Filter stale activities
1952 let now = Timestamp::now();
1953 let threshold_ms = self.config.activity.stale_threshold.as_millis() as i64;
1954 let cutoff = now.as_millis() - threshold_ms;
1955
1956 let mut active: Vec<Activity> = all_activities
1957 .into_iter()
1958 .filter(|a| a.last_heartbeat.as_millis() >= cutoff)
1959 .collect();
1960
1961 // Sort by last_heartbeat descending (most recently active first)
1962 active.sort_by(|a, b| b.last_heartbeat.cmp(&a.last_heartbeat));
1963
1964 Ok(active)
1965 }
1966
1967 // =========================================================================
1968 // Context Candidates (E2-S04)
1969 // =========================================================================
1970
1971 /// Retrieves unified context candidates from all retrieval primitives.
1972 ///
1973 /// This is the primary API for context assembly. It orchestrates:
1974 /// 1. Similarity search ([`search_similar_filtered`](Self::search_similar_filtered))
1975 /// 2. Recent experiences ([`get_recent_experiences_filtered`](Self::get_recent_experiences_filtered))
1976 /// 3. Insight search ([`get_insights`](Self::get_insights)) — if requested
1977 /// 4. Relation collection ([`get_related_experiences`](Self::get_related_experiences)) — if requested
1978 /// 5. Active agents ([`get_active_agents`](Self::get_active_agents)) — if requested
1979 ///
1980 /// # Arguments
1981 ///
1982 /// * `request` - Configuration for which primitives to query and limits
1983 ///
1984 /// # Errors
1985 ///
1986 /// - [`ValidationError::InvalidField`] if `max_similar` or `max_recent` is 0 or > 1000
1987 /// - [`ValidationError::DimensionMismatch`] if `query_embedding.len()` doesn't match
1988 /// the collective's embedding dimension
1989 /// - [`NotFoundError::Collective`] if the collective doesn't exist
1990 ///
1991 /// # Performance
1992 ///
1993 /// Target: < 100ms at 100K experiences. The similarity search (~50ms) dominates;
1994 /// all other sub-calls are < 10ms each.
1995 ///
1996 /// # Example
1997 ///
1998 /// ```rust
1999 /// # fn main() -> pulsedb::Result<()> {
2000 /// # let dir = tempfile::tempdir().unwrap();
2001 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
2002 /// # let collective_id = db.create_collective("example")?;
2003 /// # let query_vec = vec![0.1f32; 384];
2004 /// use pulsedb::{ContextRequest, SearchFilter};
2005 ///
2006 /// let candidates = db.get_context_candidates(ContextRequest {
2007 /// collective_id,
2008 /// query_embedding: query_vec,
2009 /// max_similar: 10,
2010 /// max_recent: 5,
2011 /// include_insights: true,
2012 /// include_relations: true,
2013 /// include_active_agents: true,
2014 /// filter: SearchFilter {
2015 /// domains: Some(vec!["rust".to_string()]),
2016 /// ..SearchFilter::default()
2017 /// },
2018 /// ..ContextRequest::default()
2019 /// })?;
2020 /// # Ok(())
2021 /// # }
2022 /// ```
2023 #[instrument(skip(self, request), fields(collective_id = %request.collective_id))]
2024 pub fn get_context_candidates(&self, request: ContextRequest) -> Result<ContextCandidates> {
2025 // ── Validate limits ──────────────────────────────────────
2026 if request.max_similar == 0 || request.max_similar > 1000 {
2027 return Err(ValidationError::invalid_field(
2028 "max_similar",
2029 "must be between 1 and 1000",
2030 )
2031 .into());
2032 }
2033 if request.max_recent == 0 || request.max_recent > 1000 {
2034 return Err(
2035 ValidationError::invalid_field("max_recent", "must be between 1 and 1000").into(),
2036 );
2037 }
2038
2039 // ── Verify collective exists and check dimension ─────────
2040 let collective = self
2041 .storage
2042 .get_collective(request.collective_id)?
2043 .ok_or_else(|| PulseDBError::from(NotFoundError::collective(request.collective_id)))?;
2044
2045 let expected_dim = collective.embedding_dimension as usize;
2046 if request.query_embedding.len() != expected_dim {
2047 return Err(ValidationError::dimension_mismatch(
2048 expected_dim,
2049 request.query_embedding.len(),
2050 )
2051 .into());
2052 }
2053
2054 // ── 1. Similar experiences (HNSW vector search) ──────────
2055 let similar_experiences = self.search_similar_filtered(
2056 request.collective_id,
2057 &request.query_embedding,
2058 request.max_similar,
2059 request.filter.clone(),
2060 )?;
2061
2062 // ── 2. Recent experiences (timestamp index scan) ─────────
2063 let recent_experiences = self.get_recent_experiences_filtered(
2064 request.collective_id,
2065 request.max_recent,
2066 request.filter,
2067 )?;
2068
2069 // ── 3. Insights (HNSW vector search on insight index) ────
2070 let insights = if request.include_insights {
2071 self.get_insights(
2072 request.collective_id,
2073 &request.query_embedding,
2074 request.max_similar,
2075 )?
2076 .into_iter()
2077 .map(|(insight, _score)| insight)
2078 .collect()
2079 } else {
2080 vec![]
2081 };
2082
2083 // ── 4. Relations (graph traversal from result experiences) ─
2084 let relations = if request.include_relations {
2085 use std::collections::HashSet;
2086
2087 let mut seen = HashSet::new();
2088 let mut all_relations = Vec::new();
2089
2090 // Collect unique experience IDs from both result sets
2091 let exp_ids: Vec<_> = similar_experiences
2092 .iter()
2093 .map(|r| r.experience.id)
2094 .chain(recent_experiences.iter().map(|e| e.id))
2095 .collect();
2096
2097 for exp_id in exp_ids {
2098 let related =
2099 self.get_related_experiences(exp_id, crate::relation::RelationDirection::Both)?;
2100
2101 for (_experience, relation) in related {
2102 if seen.insert(relation.id) {
2103 all_relations.push(relation);
2104 }
2105 }
2106 }
2107
2108 all_relations
2109 } else {
2110 vec![]
2111 };
2112
2113 // ── 5. Active agents (staleness-filtered activity records) ─
2114 let active_agents = if request.include_active_agents {
2115 self.get_active_agents(request.collective_id)?
2116 } else {
2117 vec![]
2118 };
2119
2120 Ok(ContextCandidates {
2121 similar_experiences,
2122 recent_experiences,
2123 insights,
2124 relations,
2125 active_agents,
2126 })
2127 }
2128
2129 // =========================================================================
2130 // Watch System (E4-S01)
2131 // =========================================================================
2132
2133 /// Subscribes to all experience changes in a collective.
2134 ///
2135 /// Returns a [`WatchStream`] that yields [`WatchEvent`] values for every
2136 /// create, update, archive, and delete operation. The stream ends when
2137 /// dropped or when the `PulseDB` instance is closed.
2138 ///
2139 /// Multiple subscribers per collective are supported. Each gets an
2140 /// independent copy of every event.
2141 ///
2142 /// # Example
2143 ///
2144 /// ```rust,no_run
2145 /// # #[tokio::main]
2146 /// # async fn main() -> pulsedb::Result<()> {
2147 /// # let dir = tempfile::tempdir().unwrap();
2148 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
2149 /// # let collective_id = db.create_collective("example")?;
2150 /// use futures::StreamExt;
2151 ///
2152 /// let mut stream = db.watch_experiences(collective_id)?;
2153 /// while let Some(event) = stream.next().await {
2154 /// println!("{:?}: {}", event.event_type, event.experience_id);
2155 /// }
2156 /// # Ok(())
2157 /// # }
2158 /// ```
2159 pub fn watch_experiences(&self, collective_id: CollectiveId) -> Result<WatchStream> {
2160 self.watch.subscribe(collective_id, None)
2161 }
2162
2163 /// Subscribes to filtered experience changes in a collective.
2164 ///
2165 /// Like [`watch_experiences`](Self::watch_experiences), but only delivers
2166 /// events that match the filter criteria. Filters are applied on the
2167 /// sender side before channel delivery.
2168 ///
2169 /// # Example
2170 ///
2171 /// ```rust
2172 /// # fn main() -> pulsedb::Result<()> {
2173 /// # let dir = tempfile::tempdir().unwrap();
2174 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
2175 /// # let collective_id = db.create_collective("example")?;
2176 /// use pulsedb::WatchFilter;
2177 ///
2178 /// let filter = WatchFilter {
2179 /// domains: Some(vec!["security".to_string()]),
2180 /// min_importance: Some(0.7),
2181 /// ..Default::default()
2182 /// };
2183 /// let mut stream = db.watch_experiences_filtered(collective_id, filter)?;
2184 /// # Ok(())
2185 /// # }
2186 /// ```
2187 pub fn watch_experiences_filtered(
2188 &self,
2189 collective_id: CollectiveId,
2190 filter: WatchFilter,
2191 ) -> Result<WatchStream> {
2192 self.watch.subscribe(collective_id, Some(filter))
2193 }
2194
2195 // =========================================================================
2196 // Cross-Process Watch (E4-S02)
2197 // =========================================================================
2198
2199 /// Returns the current WAL sequence number.
2200 ///
2201 /// Use this to establish a baseline before starting to poll for changes.
2202 /// Returns 0 if no experience writes have occurred yet.
2203 ///
2204 /// # Example
2205 ///
2206 /// ```rust
2207 /// # fn main() -> pulsedb::Result<()> {
2208 /// # let dir = tempfile::tempdir().unwrap();
2209 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
2210 /// let seq = db.get_current_sequence()?;
2211 /// // ... later ...
2212 /// let (events, new_seq) = db.poll_changes(seq)?;
2213 /// # Ok(())
2214 /// # }
2215 /// ```
2216 pub fn get_current_sequence(&self) -> Result<u64> {
2217 self.storage.get_wal_sequence()
2218 }
2219
2220 /// Polls for experience changes since the given sequence number.
2221 ///
2222 /// Returns a tuple of `(events, new_sequence)`:
2223 /// - `events`: New [`WatchEvent`]s in sequence order
2224 /// - `new_sequence`: Pass this value back on the next call
2225 ///
2226 /// Returns an empty vec and the same sequence if no changes exist.
2227 ///
2228 /// # Arguments
2229 ///
2230 /// * `since_seq` - The last sequence number you received (0 for first call)
2231 ///
2232 /// # Performance
2233 ///
2234 /// Target: < 10ms per call. Internally performs a range scan on the
2235 /// watch_events table, O(k) where k is the number of new events.
2236 ///
2237 /// # Example
2238 ///
2239 /// ```rust,no_run
2240 /// # fn main() -> pulsedb::Result<()> {
2241 /// # let dir = tempfile::tempdir().unwrap();
2242 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
2243 /// use std::time::Duration;
2244 ///
2245 /// let mut seq = 0u64;
2246 /// loop {
2247 /// let (events, new_seq) = db.poll_changes(seq)?;
2248 /// seq = new_seq;
2249 /// for event in events {
2250 /// println!("{:?}: {}", event.event_type, event.experience_id);
2251 /// }
2252 /// std::thread::sleep(Duration::from_millis(100));
2253 /// }
2254 /// # }
2255 /// ```
2256 pub fn poll_changes(&self, since_seq: u64) -> Result<(Vec<WatchEvent>, u64)> {
2257 use crate::storage::schema::EntityTypeTag;
2258 let (records, new_seq) = self.storage.poll_watch_events(since_seq, 1000)?;
2259 let events = records
2260 .into_iter()
2261 .filter(|r| r.entity_type == EntityTypeTag::Experience)
2262 .map(WatchEvent::from)
2263 .collect();
2264 Ok((events, new_seq))
2265 }
2266
2267 /// Polls for changes with a custom batch size limit.
2268 ///
2269 /// Same as [`poll_changes`](Self::poll_changes) but returns at most
2270 /// `limit` events per call. Use this for backpressure control.
2271 pub fn poll_changes_batch(
2272 &self,
2273 since_seq: u64,
2274 limit: usize,
2275 ) -> Result<(Vec<WatchEvent>, u64)> {
2276 use crate::storage::schema::EntityTypeTag;
2277 let (records, new_seq) = self.storage.poll_watch_events(since_seq, limit)?;
2278 let events = records
2279 .into_iter()
2280 .filter(|r| r.entity_type == EntityTypeTag::Experience)
2281 .map(WatchEvent::from)
2282 .collect();
2283 Ok((events, new_seq))
2284 }
2285
2286 // =========================================================================
2287 // Sync WAL Compaction (feature: sync)
2288 // =========================================================================
2289
2290 /// Compacts the WAL by removing events that all peers have already synced.
2291 ///
2292 /// Finds the minimum cursor across all known peers and deletes WAL events
2293 /// up to that sequence. If no peers exist, no compaction occurs (events
2294 /// may be needed when a peer connects later).
2295 ///
2296 /// Call this periodically (e.g., daily) to reclaim disk space.
2297 /// Returns the number of WAL events deleted.
2298 ///
2299 /// # Example
2300 ///
2301 /// ```rust,no_run
2302 /// # fn main() -> pulsedb::Result<()> {
2303 /// # let dir = tempfile::tempdir().unwrap();
2304 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
2305 /// let deleted = db.compact_wal()?;
2306 /// println!("Compacted {} WAL events", deleted);
2307 /// # Ok(())
2308 /// # }
2309 /// ```
2310 #[cfg(feature = "sync")]
2311 pub fn compact_wal(&self) -> Result<u64> {
2312 let cursors = self
2313 .storage
2314 .list_sync_cursors()
2315 .map_err(|e| PulseDBError::internal(format!("Failed to list sync cursors: {}", e)))?;
2316
2317 if cursors.is_empty() {
2318 // No peers — don't compact (events may be needed later)
2319 return Ok(0);
2320 }
2321
2322 let min_seq = cursors.iter().map(|c| c.last_sequence).min().unwrap_or(0);
2323
2324 if min_seq == 0 {
2325 return Ok(0);
2326 }
2327
2328 let deleted = self.storage.compact_wal_events(min_seq)?;
2329 info!(deleted, min_seq, "WAL compacted");
2330 Ok(deleted)
2331 }
2332
2333 // =========================================================================
2334 // Sync Apply Methods (feature: sync)
2335 // =========================================================================
2336 //
2337 // These methods apply remote changes received via sync. They bypass
2338 // validation and embedding generation (data was validated on the source).
2339 // WAL recording is suppressed by the SyncApplyGuard (entered by the caller).
2340 // Watch emit is skipped (no in-process notifications for sync changes).
2341 //
2342 // These are pub(crate) and will be called by the sync applier in Phase 3.
2343
2344 /// Applies a synced experience from a remote peer.
2345 ///
2346 /// Writes the full experience to storage and inserts into HNSW.
2347 /// Caller must hold `SyncApplyGuard` to suppress WAL recording.
2348 #[cfg(feature = "sync")]
2349 #[allow(dead_code)] // Called by sync applier (Phase 3)
2350 pub fn apply_synced_experience(&self, experience: Experience) -> Result<()> {
2351 let collective_id = experience.collective_id;
2352 let id = experience.id;
2353 let embedding = experience.embedding.clone();
2354
2355 self.storage.save_experience(&experience)?;
2356
2357 // Insert into HNSW index
2358 let vectors = self
2359 .vectors
2360 .read()
2361 .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?;
2362 if let Some(index) = vectors.get(&collective_id) {
2363 index.insert_experience(id, &embedding)?;
2364 }
2365
2366 debug!(id = %id, "Synced experience applied");
2367 Ok(())
2368 }
2369
2370 /// Applies a synced experience update from a remote peer.
2371 ///
2372 /// Caller must hold `SyncApplyGuard` to suppress WAL recording.
2373 #[cfg(feature = "sync")]
2374 #[allow(dead_code)] // Called by sync applier (Phase 3)
2375 pub fn apply_synced_experience_update(
2376 &self,
2377 id: ExperienceId,
2378 update: ExperienceUpdate,
2379 ) -> Result<()> {
2380 self.storage.update_experience(id, &update)?;
2381 debug!(id = %id, "Synced experience update applied");
2382 Ok(())
2383 }
2384
2385 /// Applies a synced experience deletion from a remote peer.
2386 ///
2387 /// Removes from storage and soft-deletes from HNSW.
2388 /// Caller must hold `SyncApplyGuard` to suppress WAL recording.
2389 #[cfg(feature = "sync")]
2390 #[allow(dead_code)] // Called by sync applier (Phase 3)
2391 pub fn apply_synced_experience_delete(&self, id: ExperienceId) -> Result<()> {
2392 // Get collective_id for HNSW lookup before deleting
2393 if let Some(exp) = self.storage.get_experience(id)? {
2394 let collective_id = exp.collective_id;
2395
2396 // Cascade delete relations
2397 self.storage.delete_relations_for_experience(id)?;
2398
2399 self.storage.delete_experience(id)?;
2400
2401 // Soft-delete from HNSW
2402 let vectors = self
2403 .vectors
2404 .read()
2405 .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?;
2406 if let Some(index) = vectors.get(&collective_id) {
2407 index.delete_experience(id)?;
2408 }
2409 }
2410
2411 debug!(id = %id, "Synced experience delete applied");
2412 Ok(())
2413 }
2414
2415 /// Applies a synced relation from a remote peer.
2416 ///
2417 /// Caller must hold `SyncApplyGuard` to suppress WAL recording.
2418 #[cfg(feature = "sync")]
2419 #[allow(dead_code)] // Called by sync applier (Phase 3)
2420 pub fn apply_synced_relation(&self, relation: ExperienceRelation) -> Result<()> {
2421 let id = relation.id;
2422 self.storage.save_relation(&relation)?;
2423 debug!(id = %id, "Synced relation applied");
2424 Ok(())
2425 }
2426
2427 /// Applies a synced relation deletion from a remote peer.
2428 ///
2429 /// Caller must hold `SyncApplyGuard` to suppress WAL recording.
2430 #[cfg(feature = "sync")]
2431 #[allow(dead_code)] // Called by sync applier (Phase 3)
2432 pub fn apply_synced_relation_delete(&self, id: RelationId) -> Result<()> {
2433 self.storage.delete_relation(id)?;
2434 debug!(id = %id, "Synced relation delete applied");
2435 Ok(())
2436 }
2437
2438 /// Applies a synced insight from a remote peer.
2439 ///
2440 /// Writes to storage and inserts into insight HNSW index.
2441 /// Caller must hold `SyncApplyGuard` to suppress WAL recording.
2442 #[cfg(feature = "sync")]
2443 #[allow(dead_code)] // Called by sync applier (Phase 3)
2444 pub fn apply_synced_insight(&self, insight: DerivedInsight) -> Result<()> {
2445 let id = insight.id;
2446 let collective_id = insight.collective_id;
2447 let embedding = insight.embedding.clone();
2448
2449 self.storage.save_insight(&insight)?;
2450
2451 // Insert into insight HNSW (using InsightId→ExperienceId byte conversion)
2452 let exp_id = ExperienceId::from_bytes(*id.as_bytes());
2453 let insight_vectors = self
2454 .insight_vectors
2455 .read()
2456 .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?;
2457 if let Some(index) = insight_vectors.get(&collective_id) {
2458 index.insert_experience(exp_id, &embedding)?;
2459 }
2460
2461 debug!(id = %id, "Synced insight applied");
2462 Ok(())
2463 }
2464
2465 /// Applies a synced insight deletion from a remote peer.
2466 ///
2467 /// Removes from storage and soft-deletes from insight HNSW.
2468 /// Caller must hold `SyncApplyGuard` to suppress WAL recording.
2469 #[cfg(feature = "sync")]
2470 #[allow(dead_code)] // Called by sync applier (Phase 3)
2471 pub fn apply_synced_insight_delete(&self, id: InsightId) -> Result<()> {
2472 if let Some(insight) = self.storage.get_insight(id)? {
2473 self.storage.delete_insight(id)?;
2474
2475 // Soft-delete from insight HNSW
2476 let exp_id = ExperienceId::from_bytes(*id.as_bytes());
2477 let insight_vectors = self
2478 .insight_vectors
2479 .read()
2480 .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?;
2481 if let Some(index) = insight_vectors.get(&insight.collective_id) {
2482 index.delete_experience(exp_id)?;
2483 }
2484 }
2485
2486 debug!(id = %id, "Synced insight delete applied");
2487 Ok(())
2488 }
2489
2490 /// Applies a synced collective from a remote peer.
2491 ///
2492 /// Writes to storage and creates HNSW indexes for the collective.
2493 /// Caller must hold `SyncApplyGuard` to suppress WAL recording.
2494 #[cfg(feature = "sync")]
2495 #[allow(dead_code)] // Called by sync applier (Phase 3)
2496 pub fn apply_synced_collective(&self, collective: Collective) -> Result<()> {
2497 let id = collective.id;
2498 let dimension = collective.embedding_dimension as usize;
2499
2500 self.storage.save_collective(&collective)?;
2501
2502 // Create HNSW indexes (same as create_collective)
2503 let exp_index = crate::vector::HnswIndex::new(dimension, &self.config.hnsw);
2504 let insight_index = crate::vector::HnswIndex::new(dimension, &self.config.hnsw);
2505 self.vectors
2506 .write()
2507 .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?
2508 .insert(id, exp_index);
2509 self.insight_vectors
2510 .write()
2511 .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?
2512 .insert(id, insight_index);
2513
2514 debug!(id = %id, "Synced collective applied");
2515 Ok(())
2516 }
2517}
2518
2519// PulseDB is auto Send + Sync: Box<dyn StorageEngine + Send + Sync>,
2520// Box<dyn EmbeddingService + Send + Sync>, and Config are all Send + Sync.
2521
2522#[cfg(test)]
2523mod tests {
2524 use super::*;
2525 use crate::config::EmbeddingDimension;
2526 use tempfile::tempdir;
2527
2528 #[test]
2529 fn test_open_creates_database() {
2530 let dir = tempdir().unwrap();
2531 let path = dir.path().join("test.db");
2532
2533 let db = PulseDB::open(&path, Config::default()).unwrap();
2534
2535 assert!(path.exists());
2536 assert_eq!(db.embedding_dimension(), 384);
2537
2538 db.close().unwrap();
2539 }
2540
2541 #[test]
2542 fn test_open_existing_database() {
2543 let dir = tempdir().unwrap();
2544 let path = dir.path().join("test.db");
2545
2546 // Create
2547 let db = PulseDB::open(&path, Config::default()).unwrap();
2548 db.close().unwrap();
2549
2550 // Reopen
2551 let db = PulseDB::open(&path, Config::default()).unwrap();
2552 assert_eq!(db.embedding_dimension(), 384);
2553 db.close().unwrap();
2554 }
2555
2556 #[test]
2557 fn test_config_validation() {
2558 let dir = tempdir().unwrap();
2559 let path = dir.path().join("test.db");
2560
2561 let invalid_config = Config {
2562 cache_size_mb: 0, // Invalid
2563 ..Default::default()
2564 };
2565
2566 let result = PulseDB::open(&path, invalid_config);
2567 assert!(result.is_err());
2568 }
2569
2570 #[test]
2571 fn test_dimension_mismatch() {
2572 let dir = tempdir().unwrap();
2573 let path = dir.path().join("test.db");
2574
2575 // Create with D384
2576 let db = PulseDB::open(
2577 &path,
2578 Config {
2579 embedding_dimension: EmbeddingDimension::D384,
2580 ..Default::default()
2581 },
2582 )
2583 .unwrap();
2584 db.close().unwrap();
2585
2586 // Try to reopen with D768
2587 let result = PulseDB::open(
2588 &path,
2589 Config {
2590 embedding_dimension: EmbeddingDimension::D768,
2591 ..Default::default()
2592 },
2593 );
2594
2595 assert!(result.is_err());
2596 }
2597
2598 #[test]
2599 fn test_metadata_access() {
2600 let dir = tempdir().unwrap();
2601 let path = dir.path().join("test.db");
2602
2603 let db = PulseDB::open(&path, Config::default()).unwrap();
2604
2605 let metadata = db.metadata();
2606 assert_eq!(metadata.embedding_dimension, EmbeddingDimension::D384);
2607
2608 db.close().unwrap();
2609 }
2610
2611 #[test]
2612 fn test_pulsedb_is_send_sync() {
2613 fn assert_send_sync<T: Send + Sync>() {}
2614 assert_send_sync::<PulseDB>();
2615 }
2616}