pulsedb/db.rs
1//! PulseDB main struct and lifecycle operations.
2//!
3//! The [`PulseDB`] struct is the primary interface for interacting with
4//! the database. It provides methods for:
5//!
6//! - Opening and closing the database
7//! - Managing collectives (isolation units)
8//! - Recording and querying experiences
9//! - Semantic search and context retrieval
10//!
11//! # Quick Start
12//!
13//! ```rust
14//! # fn main() -> pulsedb::Result<()> {
15//! # let dir = tempfile::tempdir().unwrap();
16//! use pulsedb::{PulseDB, Config, NewExperience};
17//!
18//! // Open or create a database
19//! let db = PulseDB::open(dir.path().join("test.db"), Config::default())?;
20//!
21//! // Create a collective for your project
22//! let collective = db.create_collective("my-project")?;
23//!
24//! // Record an experience
25//! db.record_experience(NewExperience {
26//! collective_id: collective,
27//! content: "Always validate user input".to_string(),
28//! embedding: Some(vec![0.1f32; 384]),
29//! ..Default::default()
30//! })?;
31//!
32//! // Close when done
33//! db.close()?;
34//! # Ok(())
35//! # }
36//! ```
37//!
38//! # Thread Safety
39//!
40//! `PulseDB` is `Send + Sync` and can be shared across threads using `Arc`.
41//! The underlying storage uses MVCC for concurrent reads with exclusive
42//! write locking.
43//!
44//! ```rust
45//! # fn main() -> pulsedb::Result<()> {
46//! # let dir = tempfile::tempdir().unwrap();
47//! use std::sync::Arc;
48//! use pulsedb::{PulseDB, Config};
49//!
50//! let db = Arc::new(PulseDB::open(dir.path().join("test.db"), Config::default())?);
51//!
52//! // Clone Arc for use in another thread
53//! let db_clone = Arc::clone(&db);
54//! std::thread::spawn(move || {
55//! // Safe to use db_clone here
56//! });
57//! # Ok(())
58//! # }
59//! ```
60
61use std::collections::HashMap;
62use std::path::{Path, PathBuf};
63use std::sync::{Arc, RwLock};
64
65#[cfg(feature = "sync")]
66use tracing::debug;
67use tracing::{info, instrument, warn};
68
69use crate::activity::{validate_new_activity, Activity, NewActivity};
70use crate::collective::types::CollectiveStats;
71use crate::collective::{validate_collective_name, Collective};
72use crate::config::{Config, EmbeddingProvider};
73use crate::embedding::{create_embedding_service, EmbeddingService};
74use crate::error::{NotFoundError, PulseDBError, Result, ValidationError};
75use crate::experience::{
76 validate_experience_update, validate_new_experience, Experience, ExperienceUpdate,
77 NewExperience,
78};
79use crate::insight::{validate_new_insight, DerivedInsight, NewDerivedInsight};
80#[cfg(feature = "sync")]
81use crate::relation::ExperienceRelation;
82use crate::search::{ContextCandidates, ContextRequest, SearchFilter, SearchResult};
83use crate::storage::{open_storage, DatabaseMetadata, StorageEngine};
84#[cfg(feature = "sync")]
85use crate::types::RelationId;
86use crate::types::{CollectiveId, ExperienceId, InsightId, Timestamp};
87use crate::vector::HnswIndex;
88use crate::watch::{WatchEvent, WatchEventType, WatchFilter, WatchService, WatchStream};
89
90/// The main PulseDB database handle.
91///
92/// This is the primary interface for all database operations. Create an
93/// instance with [`PulseDB::open()`] and close it with [`PulseDB::close()`].
94///
95/// # Ownership
96///
97/// `PulseDB` owns its storage and embedding service. When you call `close()`,
98/// the database is consumed and cannot be used afterward. This ensures
99/// resources are properly released.
100pub struct PulseDB {
101 /// Storage engine (redb or mock for testing).
102 storage: Box<dyn StorageEngine>,
103
104 /// Embedding service (external or ONNX).
105 embedding: Box<dyn EmbeddingService>,
106
107 /// Configuration used to open this database.
108 config: Config,
109
110 /// Per-collective HNSW vector indexes for experience semantic search.
111 ///
112 /// Outer RwLock protects the HashMap (add/remove collectives).
113 /// Each HnswIndex has its own internal RwLock for concurrent search+insert.
114 vectors: RwLock<HashMap<CollectiveId, HnswIndex>>,
115
116 /// Per-collective HNSW vector indexes for insight semantic search.
117 ///
118 /// Separate from `vectors` to prevent ID collisions between experiences
119 /// and insights. Uses InsightId→ExperienceId byte conversion for the
120 /// HNSW API (safe because indexes are isolated per collective).
121 insight_vectors: RwLock<HashMap<CollectiveId, HnswIndex>>,
122
123 /// Watch service for real-time experience change notifications.
124 ///
125 /// Arc-wrapped because [`WatchStream`] holds a weak reference for
126 /// cleanup on drop.
127 watch: Arc<WatchService>,
128}
129
130impl std::fmt::Debug for PulseDB {
131 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132 let vector_count = self.vectors.read().map(|v| v.len()).unwrap_or(0);
133 let insight_vector_count = self.insight_vectors.read().map(|v| v.len()).unwrap_or(0);
134 f.debug_struct("PulseDB")
135 .field("config", &self.config)
136 .field("embedding_dimension", &self.embedding_dimension())
137 .field("vector_indexes", &vector_count)
138 .field("insight_vector_indexes", &insight_vector_count)
139 .finish_non_exhaustive()
140 }
141}
142
143impl PulseDB {
144 /// Opens or creates a PulseDB database at the specified path.
145 ///
146 /// If the database doesn't exist, it will be created with the given
147 /// configuration. If it exists, the configuration will be validated
148 /// against the stored settings (e.g., embedding dimension must match).
149 ///
150 /// # Arguments
151 ///
152 /// * `path` - Path to the database file (created if it doesn't exist)
153 /// * `config` - Configuration options for the database
154 ///
155 /// # Errors
156 ///
157 /// Returns an error if:
158 /// - Configuration is invalid (see [`Config::validate`])
159 /// - Database file is corrupted
160 /// - Database is locked by another process
161 /// - Schema version doesn't match (needs migration)
162 /// - Embedding dimension doesn't match existing database
163 ///
164 /// # Example
165 ///
166 /// ```rust
167 /// # fn main() -> pulsedb::Result<()> {
168 /// # let dir = tempfile::tempdir().unwrap();
169 /// use pulsedb::{PulseDB, Config, EmbeddingDimension};
170 ///
171 /// // Open with default configuration
172 /// let db = PulseDB::open(dir.path().join("default.db"), Config::default())?;
173 /// # drop(db);
174 ///
175 /// // Open with custom embedding dimension
176 /// let db = PulseDB::open(dir.path().join("custom.db"), Config {
177 /// embedding_dimension: EmbeddingDimension::D768,
178 /// ..Default::default()
179 /// })?;
180 /// # Ok(())
181 /// # }
182 /// ```
183 #[instrument(skip(config), fields(path = %path.as_ref().display()))]
184 pub fn open(path: impl AsRef<Path>, config: Config) -> Result<Self> {
185 // Validate configuration first
186 config.validate().map_err(PulseDBError::from)?;
187
188 info!("Opening PulseDB");
189
190 // Open storage engine
191 let storage = open_storage(&path, &config)?;
192
193 // Create embedding service
194 let embedding = create_embedding_service(&config)?;
195
196 // Load or rebuild HNSW indexes for all existing collectives
197 let vectors = Self::load_all_indexes(&*storage, &config)?;
198 let insight_vectors = Self::load_all_insight_indexes(&*storage, &config)?;
199
200 info!(
201 dimension = config.embedding_dimension.size(),
202 sync_mode = ?config.sync_mode,
203 collectives = vectors.len(),
204 "PulseDB opened successfully"
205 );
206
207 let watch = Arc::new(WatchService::new(
208 config.watch.buffer_size,
209 config.watch.in_process,
210 ));
211
212 Ok(Self {
213 storage,
214 embedding,
215 config,
216 vectors: RwLock::new(vectors),
217 insight_vectors: RwLock::new(insight_vectors),
218 watch,
219 })
220 }
221
222 /// Closes the database, flushing all pending writes.
223 ///
224 /// This method consumes the `PulseDB` instance, ensuring it cannot
225 /// be used after closing. The underlying storage engine flushes all
226 /// buffered data to disk.
227 ///
228 /// # Errors
229 ///
230 /// Returns an error if the storage backend reports a flush failure.
231 /// Note: the current redb backend flushes durably on drop, so this
232 /// always returns `Ok(())` in practice.
233 ///
234 /// # Example
235 ///
236 /// ```rust
237 /// # fn main() -> pulsedb::Result<()> {
238 /// # let dir = tempfile::tempdir().unwrap();
239 /// use pulsedb::{PulseDB, Config};
240 ///
241 /// let db = PulseDB::open(dir.path().join("test.db"), Config::default())?;
242 /// // ... use the database ...
243 /// db.close()?; // db is consumed here
244 /// // db.something() // Compile error: db was moved
245 /// # Ok(())
246 /// # }
247 /// ```
248 #[instrument(skip(self))]
249 pub fn close(self) -> Result<()> {
250 info!("Closing PulseDB");
251
252 // Persist HNSW indexes BEFORE closing storage.
253 // If HNSW save fails, storage is still open for potential recovery.
254 // On next open(), stale/missing HNSW files trigger a rebuild from redb.
255 if let Some(hnsw_dir) = self.hnsw_dir() {
256 // Experience HNSW indexes
257 let vectors = self
258 .vectors
259 .read()
260 .map_err(|_| PulseDBError::vector("Vectors lock poisoned during close"))?;
261 for (collective_id, index) in vectors.iter() {
262 if let Err(e) = index.save_to_dir(&hnsw_dir, &collective_id.to_string()) {
263 warn!(
264 collective = %collective_id,
265 error = %e,
266 "Failed to save HNSW index (will rebuild on next open)"
267 );
268 }
269 }
270 drop(vectors);
271
272 // Insight HNSW indexes (separate files with _insights suffix)
273 let insight_vectors = self
274 .insight_vectors
275 .read()
276 .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned during close"))?;
277 for (collective_id, index) in insight_vectors.iter() {
278 let name = format!("{}_insights", collective_id);
279 if let Err(e) = index.save_to_dir(&hnsw_dir, &name) {
280 warn!(
281 collective = %collective_id,
282 error = %e,
283 "Failed to save insight HNSW index (will rebuild on next open)"
284 );
285 }
286 }
287 }
288
289 // Close storage (flushes pending writes)
290 self.storage.close()?;
291
292 info!("PulseDB closed successfully");
293 Ok(())
294 }
295
296 /// Returns a reference to the database configuration.
297 ///
298 /// This is the configuration that was used to open the database.
299 /// Note that some settings (like embedding dimension) are locked
300 /// on database creation and cannot be changed.
301 #[inline]
302 pub fn config(&self) -> &Config {
303 &self.config
304 }
305
306 /// Returns the database metadata.
307 ///
308 /// Metadata includes schema version, embedding dimension, and timestamps
309 /// for when the database was created and last opened.
310 #[inline]
311 pub fn metadata(&self) -> &DatabaseMetadata {
312 self.storage.metadata()
313 }
314
315 /// Returns the embedding dimension configured for this database.
316 ///
317 /// All embeddings stored in this database must have exactly this
318 /// many dimensions.
319 #[inline]
320 pub fn embedding_dimension(&self) -> usize {
321 self.config.embedding_dimension.size()
322 }
323
324 // =========================================================================
325 // Internal Accessors (for use by feature modules)
326 // =========================================================================
327
328 /// Returns a reference to the storage engine.
329 ///
330 /// This is for internal use by other PulseDB modules.
331 #[inline]
332 #[allow(dead_code)] // Will be used by search (Phase 2) and other modules
333 pub(crate) fn storage(&self) -> &dyn StorageEngine {
334 self.storage.as_ref()
335 }
336
337 /// Returns a reference to the embedding service.
338 ///
339 /// This is for internal use by other PulseDB modules.
340 #[inline]
341 #[allow(dead_code)] // Will be used by search (Phase 2) and other modules
342 pub(crate) fn embedding(&self) -> &dyn EmbeddingService {
343 self.embedding.as_ref()
344 }
345
346 // =========================================================================
347 // HNSW Index Lifecycle
348 // =========================================================================
349
350 /// Returns the directory for HNSW index files.
351 ///
352 /// Derives `{db_path}.hnsw/` from the storage path. Returns `None` if
353 /// the storage has no file path (e.g., in-memory tests).
354 fn hnsw_dir(&self) -> Option<PathBuf> {
355 self.storage.path().map(|p| {
356 let mut hnsw_path = p.as_os_str().to_owned();
357 hnsw_path.push(".hnsw");
358 PathBuf::from(hnsw_path)
359 })
360 }
361
362 /// Loads or rebuilds HNSW indexes for all existing collectives.
363 ///
364 /// For each collective in storage:
365 /// 1. Try loading metadata from `.hnsw.meta` file
366 /// 2. Rebuild the graph from redb embeddings (always, since we can't
367 /// load the graph due to hnsw_rs lifetime constraints)
368 /// 3. Restore deleted set from metadata if available
369 fn load_all_indexes(
370 storage: &dyn StorageEngine,
371 config: &Config,
372 ) -> Result<HashMap<CollectiveId, HnswIndex>> {
373 let collectives = storage.list_collectives()?;
374 let mut vectors = HashMap::with_capacity(collectives.len());
375
376 let hnsw_dir = storage.path().map(|p| {
377 let mut hnsw_path = p.as_os_str().to_owned();
378 hnsw_path.push(".hnsw");
379 PathBuf::from(hnsw_path)
380 });
381
382 for collective in &collectives {
383 let dimension = collective.embedding_dimension as usize;
384
385 // List all experience IDs in this collective
386 let exp_ids = storage.list_experience_ids_in_collective(collective.id)?;
387
388 // Load embeddings from redb (source of truth)
389 let mut embeddings = Vec::with_capacity(exp_ids.len());
390 for exp_id in &exp_ids {
391 if let Some(embedding) = storage.get_embedding(*exp_id)? {
392 embeddings.push((*exp_id, embedding));
393 }
394 }
395
396 // Try loading metadata (for deleted set and ID mappings)
397 let metadata = hnsw_dir
398 .as_ref()
399 .and_then(|dir| HnswIndex::load_metadata(dir, &collective.id.to_string()).ok())
400 .flatten();
401
402 // Rebuild the HNSW graph from embeddings
403 let index = if embeddings.is_empty() {
404 HnswIndex::new(dimension, &config.hnsw)
405 } else {
406 let start = std::time::Instant::now();
407 let idx = HnswIndex::rebuild_from_embeddings(dimension, &config.hnsw, embeddings)?;
408 info!(
409 collective = %collective.id,
410 vectors = idx.active_count(),
411 elapsed_ms = start.elapsed().as_millis() as u64,
412 "Rebuilt HNSW index from redb embeddings"
413 );
414 idx
415 };
416
417 // Restore deleted set from metadata if available
418 if let Some(meta) = metadata {
419 index.restore_deleted_set(&meta.deleted)?;
420 }
421
422 vectors.insert(collective.id, index);
423 }
424
425 Ok(vectors)
426 }
427
428 /// Loads or rebuilds insight HNSW indexes for all existing collectives.
429 ///
430 /// For each collective, loads all insights from storage and rebuilds
431 /// the HNSW graph from their inline embeddings. Uses InsightId→ExperienceId
432 /// byte conversion for the HNSW API.
433 fn load_all_insight_indexes(
434 storage: &dyn StorageEngine,
435 config: &Config,
436 ) -> Result<HashMap<CollectiveId, HnswIndex>> {
437 let collectives = storage.list_collectives()?;
438 let mut insight_vectors = HashMap::with_capacity(collectives.len());
439
440 let hnsw_dir = storage.path().map(|p| {
441 let mut hnsw_path = p.as_os_str().to_owned();
442 hnsw_path.push(".hnsw");
443 PathBuf::from(hnsw_path)
444 });
445
446 for collective in &collectives {
447 let dimension = collective.embedding_dimension as usize;
448
449 // List all insight IDs in this collective
450 let insight_ids = storage.list_insight_ids_in_collective(collective.id)?;
451
452 // Load insights and extract embeddings (converting InsightId → ExperienceId)
453 let mut embeddings = Vec::with_capacity(insight_ids.len());
454 for insight_id in &insight_ids {
455 if let Some(insight) = storage.get_insight(*insight_id)? {
456 let exp_id = ExperienceId::from_bytes(*insight_id.as_bytes());
457 embeddings.push((exp_id, insight.embedding));
458 }
459 }
460
461 // Try loading metadata (for deleted set)
462 let name = format!("{}_insights", collective.id);
463 let metadata = hnsw_dir
464 .as_ref()
465 .and_then(|dir| HnswIndex::load_metadata(dir, &name).ok())
466 .flatten();
467
468 // Rebuild HNSW graph from embeddings
469 let index = if embeddings.is_empty() {
470 HnswIndex::new(dimension, &config.hnsw)
471 } else {
472 let start = std::time::Instant::now();
473 let idx = HnswIndex::rebuild_from_embeddings(dimension, &config.hnsw, embeddings)?;
474 info!(
475 collective = %collective.id,
476 insights = idx.active_count(),
477 elapsed_ms = start.elapsed().as_millis() as u64,
478 "Rebuilt insight HNSW index from stored insights"
479 );
480 idx
481 };
482
483 // Restore deleted set from metadata if available
484 if let Some(meta) = metadata {
485 index.restore_deleted_set(&meta.deleted)?;
486 }
487
488 insight_vectors.insert(collective.id, index);
489 }
490
491 Ok(insight_vectors)
492 }
493
494 /// Executes a closure with the HNSW index for a collective.
495 ///
496 /// This is the primary accessor for vector search operations (used by
497 /// `search_similar()`). The closure runs while the outer RwLock guard
498 /// is held (read lock), so the HnswIndex reference stays valid.
499 /// Returns `None` if no index exists for the collective.
500 #[doc(hidden)]
501 pub fn with_vector_index<F, R>(&self, collective_id: CollectiveId, f: F) -> Result<Option<R>>
502 where
503 F: FnOnce(&HnswIndex) -> Result<R>,
504 {
505 let vectors = self
506 .vectors
507 .read()
508 .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?;
509 match vectors.get(&collective_id) {
510 Some(index) => Ok(Some(f(index)?)),
511 None => Ok(None),
512 }
513 }
514
515 // =========================================================================
516 // Test Helpers
517 // =========================================================================
518
519 /// Returns a reference to the storage engine for integration testing.
520 ///
521 /// This method is intentionally hidden from documentation. It provides
522 /// test-only access to the storage layer for verifying ACID guarantees
523 /// and crash recovery. Production code should use the public PulseDB API.
524 #[doc(hidden)]
525 #[inline]
526 pub fn storage_for_test(&self) -> &dyn StorageEngine {
527 self.storage.as_ref()
528 }
529
530 // =========================================================================
531 // Collective Management (E1-S02)
532 // =========================================================================
533
534 /// Creates a new collective with the given name.
535 ///
536 /// The collective's embedding dimension is locked to the database's
537 /// configured dimension at creation time.
538 ///
539 /// # Arguments
540 ///
541 /// * `name` - Human-readable name (1-255 characters, not whitespace-only)
542 ///
543 /// # Errors
544 ///
545 /// Returns a validation error if the name is empty, whitespace-only,
546 /// or exceeds 255 characters.
547 ///
548 /// # Example
549 ///
550 /// ```rust
551 /// # fn main() -> pulsedb::Result<()> {
552 /// # let dir = tempfile::tempdir().unwrap();
553 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
554 /// let id = db.create_collective("my-project")?;
555 /// # Ok(())
556 /// # }
557 /// ```
558 #[instrument(skip(self))]
559 pub fn create_collective(&self, name: &str) -> Result<CollectiveId> {
560 validate_collective_name(name)?;
561
562 let dimension = self.config.embedding_dimension.size() as u16;
563 let collective = Collective::new(name, dimension);
564 let id = collective.id;
565
566 // Persist to redb first (source of truth)
567 self.storage.save_collective(&collective)?;
568
569 // Create empty HNSW indexes for this collective
570 let exp_index = HnswIndex::new(dimension as usize, &self.config.hnsw);
571 let insight_index = HnswIndex::new(dimension as usize, &self.config.hnsw);
572 self.vectors
573 .write()
574 .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?
575 .insert(id, exp_index);
576 self.insight_vectors
577 .write()
578 .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?
579 .insert(id, insight_index);
580
581 info!(id = %id, name = %name, "Collective created");
582 Ok(id)
583 }
584
585 /// Creates a new collective with an owner for multi-tenancy.
586 ///
587 /// Same as [`create_collective`](Self::create_collective) but assigns
588 /// an owner ID, enabling filtering with
589 /// [`list_collectives_by_owner`](Self::list_collectives_by_owner).
590 ///
591 /// # Arguments
592 ///
593 /// * `name` - Human-readable name (1-255 characters)
594 /// * `owner_id` - Owner identifier (must not be empty)
595 ///
596 /// # Errors
597 ///
598 /// Returns a validation error if the name or owner_id is invalid.
599 #[instrument(skip(self))]
600 pub fn create_collective_with_owner(&self, name: &str, owner_id: &str) -> Result<CollectiveId> {
601 validate_collective_name(name)?;
602
603 if owner_id.is_empty() {
604 return Err(PulseDBError::from(
605 crate::error::ValidationError::required_field("owner_id"),
606 ));
607 }
608
609 let dimension = self.config.embedding_dimension.size() as u16;
610 let collective = Collective::with_owner(name, owner_id, dimension);
611 let id = collective.id;
612
613 // Persist to redb first (source of truth)
614 self.storage.save_collective(&collective)?;
615
616 // Create empty HNSW indexes for this collective
617 let exp_index = HnswIndex::new(dimension as usize, &self.config.hnsw);
618 let insight_index = HnswIndex::new(dimension as usize, &self.config.hnsw);
619 self.vectors
620 .write()
621 .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?
622 .insert(id, exp_index);
623 self.insight_vectors
624 .write()
625 .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?
626 .insert(id, insight_index);
627
628 info!(id = %id, name = %name, owner = %owner_id, "Collective created with owner");
629 Ok(id)
630 }
631
632 /// Returns a collective by ID, or `None` if not found.
633 ///
634 /// # Example
635 ///
636 /// ```rust
637 /// # fn main() -> pulsedb::Result<()> {
638 /// # let dir = tempfile::tempdir().unwrap();
639 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
640 /// # let id = db.create_collective("example")?;
641 /// if let Some(collective) = db.get_collective(id)? {
642 /// println!("Found: {}", collective.name);
643 /// }
644 /// # Ok(())
645 /// # }
646 /// ```
647 #[instrument(skip(self))]
648 pub fn get_collective(&self, id: CollectiveId) -> Result<Option<Collective>> {
649 self.storage.get_collective(id)
650 }
651
652 /// Lists all collectives in the database.
653 ///
654 /// Returns an empty vector if no collectives exist.
655 pub fn list_collectives(&self) -> Result<Vec<Collective>> {
656 self.storage.list_collectives()
657 }
658
659 /// Lists collectives filtered by owner ID.
660 ///
661 /// Returns only collectives whose `owner_id` matches the given value.
662 /// Returns an empty vector if no matching collectives exist.
663 pub fn list_collectives_by_owner(&self, owner_id: &str) -> Result<Vec<Collective>> {
664 let all = self.storage.list_collectives()?;
665 Ok(all
666 .into_iter()
667 .filter(|c| c.owner_id.as_deref() == Some(owner_id))
668 .collect())
669 }
670
671 /// Returns statistics for a collective.
672 ///
673 /// # Errors
674 ///
675 /// Returns [`NotFoundError::Collective`] if the collective doesn't exist.
676 #[instrument(skip(self))]
677 pub fn get_collective_stats(&self, id: CollectiveId) -> Result<CollectiveStats> {
678 // Verify collective exists
679 self.storage
680 .get_collective(id)?
681 .ok_or_else(|| PulseDBError::from(NotFoundError::collective(id)))?;
682
683 let experience_count = self.storage.count_experiences_in_collective(id)?;
684
685 Ok(CollectiveStats {
686 experience_count,
687 storage_bytes: 0,
688 oldest_experience: None,
689 newest_experience: None,
690 })
691 }
692
693 /// Deletes a collective and all its associated data.
694 ///
695 /// Performs cascade deletion: removes all experiences belonging to the
696 /// collective before removing the collective record itself.
697 ///
698 /// # Errors
699 ///
700 /// Returns [`NotFoundError::Collective`] if the collective doesn't exist.
701 ///
702 /// # Example
703 ///
704 /// ```rust
705 /// # fn main() -> pulsedb::Result<()> {
706 /// # let dir = tempfile::tempdir().unwrap();
707 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
708 /// # let collective_id = db.create_collective("to-delete")?;
709 /// db.delete_collective(collective_id)?;
710 /// assert!(db.get_collective(collective_id)?.is_none());
711 /// # Ok(())
712 /// # }
713 /// ```
714 #[instrument(skip(self))]
715 pub fn delete_collective(&self, id: CollectiveId) -> Result<()> {
716 // Verify collective exists
717 self.storage
718 .get_collective(id)?
719 .ok_or_else(|| PulseDBError::from(NotFoundError::collective(id)))?;
720
721 // Cascade: delete all experiences for this collective
722 let deleted_count = self.storage.delete_experiences_by_collective(id)?;
723 if deleted_count > 0 {
724 info!(count = deleted_count, "Cascade-deleted experiences");
725 }
726
727 // Cascade: delete all insights for this collective
728 let deleted_insights = self.storage.delete_insights_by_collective(id)?;
729 if deleted_insights > 0 {
730 info!(count = deleted_insights, "Cascade-deleted insights");
731 }
732
733 // Cascade: delete all activities for this collective
734 let deleted_activities = self.storage.delete_activities_by_collective(id)?;
735 if deleted_activities > 0 {
736 info!(count = deleted_activities, "Cascade-deleted activities");
737 }
738
739 // Delete the collective record from storage
740 self.storage.delete_collective(id)?;
741
742 // Remove HNSW indexes from memory
743 self.vectors
744 .write()
745 .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?
746 .remove(&id);
747 self.insight_vectors
748 .write()
749 .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?
750 .remove(&id);
751
752 // Remove HNSW files from disk (non-fatal if fails)
753 if let Some(hnsw_dir) = self.hnsw_dir() {
754 if let Err(e) = HnswIndex::remove_files(&hnsw_dir, &id.to_string()) {
755 warn!(
756 collective = %id,
757 error = %e,
758 "Failed to remove experience HNSW files (non-fatal)"
759 );
760 }
761 let insight_name = format!("{}_insights", id);
762 if let Err(e) = HnswIndex::remove_files(&hnsw_dir, &insight_name) {
763 warn!(
764 collective = %id,
765 error = %e,
766 "Failed to remove insight HNSW files (non-fatal)"
767 );
768 }
769 }
770
771 info!(id = %id, "Collective deleted");
772 Ok(())
773 }
774
775 // =========================================================================
776 // Experience CRUD (E1-S03)
777 // =========================================================================
778
779 /// Records a new experience in the database.
780 ///
781 /// This is the primary method for storing agent-learned knowledge. The method:
782 /// 1. Validates the input (content, scores, tags, embedding)
783 /// 2. Verifies the collective exists
784 /// 3. Resolves the embedding (generates if Builtin, requires if External)
785 /// 4. Stores the experience atomically across 4 tables
786 ///
787 /// # Arguments
788 ///
789 /// * `exp` - The experience to record (see [`NewExperience`])
790 ///
791 /// # Errors
792 ///
793 /// - [`ValidationError`](crate::ValidationError) if input is invalid
794 /// - [`NotFoundError::Collective`] if the collective doesn't exist
795 /// - [`PulseDBError::Embedding`] if embedding generation fails (Builtin mode)
796 #[instrument(skip(self, exp), fields(collective_id = %exp.collective_id))]
797 pub fn record_experience(&self, exp: NewExperience) -> Result<ExperienceId> {
798 let is_external = matches!(self.config.embedding_provider, EmbeddingProvider::External);
799
800 // Verify collective exists and get its dimension
801 let collective = self
802 .storage
803 .get_collective(exp.collective_id)?
804 .ok_or_else(|| PulseDBError::from(NotFoundError::collective(exp.collective_id)))?;
805
806 // Validate input
807 validate_new_experience(&exp, collective.embedding_dimension, is_external)?;
808
809 // Resolve embedding
810 let embedding = match exp.embedding {
811 Some(emb) => emb,
812 None => {
813 // Builtin mode: generate embedding from content
814 self.embedding.embed(&exp.content)?
815 }
816 };
817
818 // Clone embedding for HNSW insertion (~1.5KB for 384d, negligible vs I/O)
819 let embedding_for_hnsw = embedding.clone();
820 let collective_id = exp.collective_id;
821
822 // Construct the full experience record
823 let experience = Experience {
824 id: ExperienceId::new(),
825 collective_id,
826 content: exp.content,
827 embedding,
828 experience_type: exp.experience_type,
829 importance: exp.importance,
830 confidence: exp.confidence,
831 applications: 0,
832 domain: exp.domain,
833 related_files: exp.related_files,
834 source_agent: exp.source_agent,
835 source_task: exp.source_task,
836 timestamp: Timestamp::now(),
837 archived: false,
838 };
839
840 let id = experience.id;
841
842 // Write to redb FIRST (source of truth). If crash happens after
843 // this but before HNSW insert, rebuild on next open will include it.
844 self.storage.save_experience(&experience)?;
845
846 // Insert into HNSW index (derived structure)
847 let vectors = self
848 .vectors
849 .read()
850 .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?;
851 if let Some(index) = vectors.get(&collective_id) {
852 index.insert_experience(id, &embedding_for_hnsw)?;
853 }
854
855 // Emit watch event after both storage and HNSW succeed
856 self.watch.emit(
857 WatchEvent {
858 experience_id: id,
859 collective_id,
860 event_type: WatchEventType::Created,
861 timestamp: experience.timestamp,
862 },
863 &experience,
864 )?;
865
866 info!(id = %id, "Experience recorded");
867 Ok(id)
868 }
869
870 /// Retrieves an experience by ID, including its embedding.
871 ///
872 /// Returns `None` if no experience with the given ID exists.
873 #[instrument(skip(self))]
874 pub fn get_experience(&self, id: ExperienceId) -> Result<Option<Experience>> {
875 self.storage.get_experience(id)
876 }
877
878 /// Updates mutable fields of an experience.
879 ///
880 /// Only fields set to `Some(...)` in the update are changed.
881 /// Content and embedding are immutable — create a new experience instead.
882 ///
883 /// # Errors
884 ///
885 /// - [`ValidationError`](crate::ValidationError) if updated values are invalid
886 /// - [`NotFoundError::Experience`] if the experience doesn't exist
887 #[instrument(skip(self, update))]
888 pub fn update_experience(&self, id: ExperienceId, update: ExperienceUpdate) -> Result<()> {
889 validate_experience_update(&update)?;
890
891 let updated = self.storage.update_experience(id, &update)?;
892 if !updated {
893 return Err(PulseDBError::from(NotFoundError::experience(id)));
894 }
895
896 // Emit watch event (fetch experience for collective_id + filter matching)
897 if self.watch.has_subscribers() {
898 if let Ok(Some(exp)) = self.storage.get_experience(id) {
899 let event_type = if update.archived == Some(true) {
900 WatchEventType::Archived
901 } else {
902 WatchEventType::Updated
903 };
904 self.watch.emit(
905 WatchEvent {
906 experience_id: id,
907 collective_id: exp.collective_id,
908 event_type,
909 timestamp: Timestamp::now(),
910 },
911 &exp,
912 )?;
913 }
914 }
915
916 info!(id = %id, "Experience updated");
917 Ok(())
918 }
919
920 /// Archives an experience (soft-delete).
921 ///
922 /// Archived experiences remain in storage but are excluded from search
923 /// results. Use [`unarchive_experience`](Self::unarchive_experience) to restore.
924 ///
925 /// # Errors
926 ///
927 /// Returns [`NotFoundError::Experience`] if the experience doesn't exist.
928 #[instrument(skip(self))]
929 pub fn archive_experience(&self, id: ExperienceId) -> Result<()> {
930 self.update_experience(
931 id,
932 ExperienceUpdate {
933 archived: Some(true),
934 ..Default::default()
935 },
936 )
937 }
938
939 /// Restores an archived experience.
940 ///
941 /// The experience will once again appear in search results.
942 ///
943 /// # Errors
944 ///
945 /// Returns [`NotFoundError::Experience`] if the experience doesn't exist.
946 #[instrument(skip(self))]
947 pub fn unarchive_experience(&self, id: ExperienceId) -> Result<()> {
948 self.update_experience(
949 id,
950 ExperienceUpdate {
951 archived: Some(false),
952 ..Default::default()
953 },
954 )
955 }
956
957 /// Permanently deletes an experience and its embedding.
958 ///
959 /// This removes the experience from all tables and indices.
960 /// Unlike archiving, this is irreversible.
961 ///
962 /// # Errors
963 ///
964 /// Returns [`NotFoundError::Experience`] if the experience doesn't exist.
965 #[instrument(skip(self))]
966 pub fn delete_experience(&self, id: ExperienceId) -> Result<()> {
967 // Read experience first to get collective_id for HNSW lookup.
968 // This adds one extra read, but delete is not a hot path.
969 let experience = self
970 .storage
971 .get_experience(id)?
972 .ok_or_else(|| PulseDBError::from(NotFoundError::experience(id)))?;
973
974 // Cascade-delete any relations involving this experience.
975 // Done before experience deletion so we can still look up relation data.
976 let rel_count = self.storage.delete_relations_for_experience(id)?;
977 if rel_count > 0 {
978 info!(
979 count = rel_count,
980 "Cascade-deleted relations for experience"
981 );
982 }
983
984 // Delete from redb FIRST (source of truth). If crash happens after
985 // this but before HNSW soft-delete, on reopen the experience won't be
986 // loaded from redb, so it's automatically excluded from the rebuilt index.
987 self.storage.delete_experience(id)?;
988
989 // Soft-delete from HNSW index (mark as deleted, not removed from graph).
990 // This takes effect immediately for the current session's searches.
991 let vectors = self
992 .vectors
993 .read()
994 .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?;
995 if let Some(index) = vectors.get(&experience.collective_id) {
996 index.delete_experience(id)?;
997 }
998
999 // Emit watch event after storage + HNSW deletion
1000 self.watch.emit(
1001 WatchEvent {
1002 experience_id: id,
1003 collective_id: experience.collective_id,
1004 event_type: WatchEventType::Deleted,
1005 timestamp: Timestamp::now(),
1006 },
1007 &experience,
1008 )?;
1009
1010 info!(id = %id, "Experience deleted");
1011 Ok(())
1012 }
1013
1014 /// Reinforces an experience by incrementing its application count.
1015 ///
1016 /// Each call atomically increments the `applications` counter by 1.
1017 /// Returns the new application count.
1018 ///
1019 /// # Errors
1020 ///
1021 /// Returns [`NotFoundError::Experience`] if the experience doesn't exist.
1022 #[instrument(skip(self))]
1023 pub fn reinforce_experience(&self, id: ExperienceId) -> Result<u32> {
1024 let new_count = self
1025 .storage
1026 .reinforce_experience(id)?
1027 .ok_or_else(|| PulseDBError::from(NotFoundError::experience(id)))?;
1028
1029 // Emit watch event (fetch experience for collective_id + filter matching)
1030 if self.watch.has_subscribers() {
1031 if let Ok(Some(exp)) = self.storage.get_experience(id) {
1032 self.watch.emit(
1033 WatchEvent {
1034 experience_id: id,
1035 collective_id: exp.collective_id,
1036 event_type: WatchEventType::Updated,
1037 timestamp: Timestamp::now(),
1038 },
1039 &exp,
1040 )?;
1041 }
1042 }
1043
1044 info!(id = %id, applications = new_count, "Experience reinforced");
1045 Ok(new_count)
1046 }
1047
1048 // =========================================================================
1049 // Recent Experiences
1050 // =========================================================================
1051
1052 /// Retrieves the most recent experiences in a collective, newest first.
1053 ///
1054 /// Returns up to `limit` non-archived experiences ordered by timestamp
1055 /// descending (newest first). Uses the by-collective timestamp index
1056 /// for efficient retrieval.
1057 ///
1058 /// # Arguments
1059 ///
1060 /// * `collective_id` - The collective to query
1061 /// * `limit` - Maximum number of experiences to return (1-1000)
1062 ///
1063 /// # Errors
1064 ///
1065 /// - [`ValidationError::InvalidField`] if `limit` is 0 or > 1000
1066 /// - [`NotFoundError::Collective`] if the collective doesn't exist
1067 ///
1068 /// # Example
1069 ///
1070 /// ```rust
1071 /// # fn main() -> pulsedb::Result<()> {
1072 /// # let dir = tempfile::tempdir().unwrap();
1073 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
1074 /// # let collective_id = db.create_collective("example")?;
1075 /// let recent = db.get_recent_experiences(collective_id, 50)?;
1076 /// for exp in &recent {
1077 /// println!("{}: {}", exp.timestamp, exp.content);
1078 /// }
1079 /// # Ok(())
1080 /// # }
1081 /// ```
1082 #[instrument(skip(self))]
1083 pub fn get_recent_experiences(
1084 &self,
1085 collective_id: CollectiveId,
1086 limit: usize,
1087 ) -> Result<Vec<Experience>> {
1088 self.get_recent_experiences_filtered(collective_id, limit, SearchFilter::default())
1089 }
1090
1091 /// Retrieves the most recent experiences in a collective with filtering.
1092 ///
1093 /// Like [`get_recent_experiences()`](Self::get_recent_experiences), but
1094 /// applies additional filters on domain, experience type, importance,
1095 /// confidence, and timestamp.
1096 ///
1097 /// Over-fetches from storage (2x `limit`) to account for entries removed
1098 /// by post-filtering, then truncates to the requested `limit`.
1099 ///
1100 /// # Arguments
1101 ///
1102 /// * `collective_id` - The collective to query
1103 /// * `limit` - Maximum number of experiences to return (1-1000)
1104 /// * `filter` - Filter criteria to apply
1105 ///
1106 /// # Errors
1107 ///
1108 /// - [`ValidationError::InvalidField`] if `limit` is 0 or > 1000
1109 /// - [`NotFoundError::Collective`] if the collective doesn't exist
1110 #[instrument(skip(self, filter))]
1111 pub fn get_recent_experiences_filtered(
1112 &self,
1113 collective_id: CollectiveId,
1114 limit: usize,
1115 filter: SearchFilter,
1116 ) -> Result<Vec<Experience>> {
1117 // Validate limit
1118 if limit == 0 || limit > 1000 {
1119 return Err(
1120 ValidationError::invalid_field("limit", "must be between 1 and 1000").into(),
1121 );
1122 }
1123
1124 // Verify collective exists
1125 self.storage
1126 .get_collective(collective_id)?
1127 .ok_or_else(|| PulseDBError::from(NotFoundError::collective(collective_id)))?;
1128
1129 // Over-fetch IDs to account for post-filtering losses
1130 let over_fetch = limit.saturating_mul(2).min(2000);
1131 let recent_ids = self
1132 .storage
1133 .get_recent_experience_ids(collective_id, over_fetch)?;
1134
1135 // Load full experiences and apply filter
1136 let mut results = Vec::with_capacity(limit);
1137 for (exp_id, _timestamp) in recent_ids {
1138 if results.len() >= limit {
1139 break;
1140 }
1141
1142 if let Some(experience) = self.storage.get_experience(exp_id)? {
1143 if filter.matches(&experience) {
1144 results.push(experience);
1145 }
1146 }
1147 }
1148
1149 Ok(results)
1150 }
1151
1152 // =========================================================================
1153 // Similarity Search (E2-S02)
1154 // =========================================================================
1155
1156 /// Searches for experiences semantically similar to the query embedding.
1157 ///
1158 /// Uses the HNSW vector index for approximate nearest neighbor search,
1159 /// then fetches full experience records from storage. Archived experiences
1160 /// are excluded by default.
1161 ///
1162 /// Results are sorted by similarity descending (most similar first).
1163 /// Similarity is computed as `1.0 - cosine_distance`.
1164 ///
1165 /// # Arguments
1166 ///
1167 /// * `collective_id` - The collective to search within
1168 /// * `query` - Query embedding vector (must match collective's dimension)
1169 /// * `k` - Maximum number of results to return (1-1000)
1170 ///
1171 /// # Errors
1172 ///
1173 /// - [`ValidationError::InvalidField`] if `k` is 0 or > 1000
1174 /// - [`ValidationError::DimensionMismatch`] if `query.len()` doesn't match
1175 /// the collective's embedding dimension
1176 /// - [`NotFoundError::Collective`] if the collective doesn't exist
1177 ///
1178 /// # Example
1179 ///
1180 /// ```rust
1181 /// # fn main() -> pulsedb::Result<()> {
1182 /// # let dir = tempfile::tempdir().unwrap();
1183 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
1184 /// # let collective_id = db.create_collective("example")?;
1185 /// let query = vec![0.1f32; 384]; // Your query embedding
1186 /// let results = db.search_similar(collective_id, &query, 10)?;
1187 /// for result in &results {
1188 /// println!(
1189 /// "[{:.3}] {}",
1190 /// result.similarity, result.experience.content
1191 /// );
1192 /// }
1193 /// # Ok(())
1194 /// # }
1195 /// ```
1196 #[instrument(skip(self, query))]
1197 pub fn search_similar(
1198 &self,
1199 collective_id: CollectiveId,
1200 query: &[f32],
1201 k: usize,
1202 ) -> Result<Vec<SearchResult>> {
1203 self.search_similar_filtered(collective_id, query, k, SearchFilter::default())
1204 }
1205
1206 /// Searches for semantically similar experiences with additional filtering.
1207 ///
1208 /// Like [`search_similar()`](Self::search_similar), but applies additional
1209 /// filters on domain, experience type, importance, confidence, and timestamp.
1210 ///
1211 /// Over-fetches from the HNSW index (2x `k`) to account for entries removed
1212 /// by post-filtering, then truncates to the requested `k`.
1213 ///
1214 /// # Arguments
1215 ///
1216 /// * `collective_id` - The collective to search within
1217 /// * `query` - Query embedding vector (must match collective's dimension)
1218 /// * `k` - Maximum number of results to return (1-1000)
1219 /// * `filter` - Filter criteria to apply after vector search
1220 ///
1221 /// # Errors
1222 ///
1223 /// - [`ValidationError::InvalidField`] if `k` is 0 or > 1000
1224 /// - [`ValidationError::DimensionMismatch`] if `query.len()` doesn't match
1225 /// the collective's embedding dimension
1226 /// - [`NotFoundError::Collective`] if the collective doesn't exist
1227 ///
1228 /// # Example
1229 ///
1230 /// ```rust
1231 /// # fn main() -> pulsedb::Result<()> {
1232 /// # let dir = tempfile::tempdir().unwrap();
1233 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
1234 /// # let collective_id = db.create_collective("example")?;
1235 /// # let query_embedding = vec![0.1f32; 384];
1236 /// use pulsedb::SearchFilter;
1237 ///
1238 /// let filter = SearchFilter {
1239 /// domains: Some(vec!["rust".to_string()]),
1240 /// min_importance: Some(0.5),
1241 /// ..SearchFilter::default()
1242 /// };
1243 /// let results = db.search_similar_filtered(
1244 /// collective_id,
1245 /// &query_embedding,
1246 /// 10,
1247 /// filter,
1248 /// )?;
1249 /// # Ok(())
1250 /// # }
1251 /// ```
1252 #[instrument(skip(self, query, filter))]
1253 pub fn search_similar_filtered(
1254 &self,
1255 collective_id: CollectiveId,
1256 query: &[f32],
1257 k: usize,
1258 filter: SearchFilter,
1259 ) -> Result<Vec<SearchResult>> {
1260 // Validate k
1261 if k == 0 || k > 1000 {
1262 return Err(ValidationError::invalid_field("k", "must be between 1 and 1000").into());
1263 }
1264
1265 // Verify collective exists and check embedding dimension
1266 let collective = self
1267 .storage
1268 .get_collective(collective_id)?
1269 .ok_or_else(|| PulseDBError::from(NotFoundError::collective(collective_id)))?;
1270
1271 let expected_dim = collective.embedding_dimension as usize;
1272 if query.len() != expected_dim {
1273 return Err(ValidationError::dimension_mismatch(expected_dim, query.len()).into());
1274 }
1275
1276 // Over-fetch from HNSW to compensate for post-filtering losses
1277 let over_fetch = k.saturating_mul(2).min(2000);
1278 let ef_search = self.config.hnsw.ef_search;
1279
1280 // Search HNSW index — returns (ExperienceId, cosine_distance) sorted
1281 // by distance ascending (closest first)
1282 let candidates = self
1283 .with_vector_index(collective_id, |index| {
1284 index.search_experiences(query, over_fetch, ef_search)
1285 })?
1286 .unwrap_or_default();
1287
1288 // Fetch full experiences, apply filter, convert distance → similarity
1289 let mut results = Vec::with_capacity(k);
1290 for (exp_id, distance) in candidates {
1291 if results.len() >= k {
1292 break;
1293 }
1294
1295 if let Some(experience) = self.storage.get_experience(exp_id)? {
1296 if filter.matches(&experience) {
1297 results.push(SearchResult {
1298 experience,
1299 similarity: 1.0 - distance,
1300 });
1301 }
1302 }
1303 }
1304
1305 Ok(results)
1306 }
1307
1308 // =========================================================================
1309 // Experience Relations (E3-S01)
1310 // =========================================================================
1311
1312 /// Stores a new relation between two experiences.
1313 ///
1314 /// Relations are typed, directed edges connecting a source experience to a
1315 /// target experience. Both experiences must exist and belong to the same
1316 /// collective. Duplicate relations (same source, target, and type) are
1317 /// rejected.
1318 ///
1319 /// # Arguments
1320 ///
1321 /// * `relation` - The relation to create (source, target, type, strength)
1322 ///
1323 /// # Errors
1324 ///
1325 /// Returns an error if:
1326 /// - Source or target experience doesn't exist ([`NotFoundError::Experience`])
1327 /// - Experiences belong to different collectives ([`ValidationError::InvalidField`])
1328 /// - A relation with the same (source, target, type) already exists
1329 /// - Self-relation attempted (source == target)
1330 /// - Strength is out of range `[0.0, 1.0]`
1331 #[instrument(skip(self, relation))]
1332 pub fn store_relation(
1333 &self,
1334 relation: crate::relation::NewExperienceRelation,
1335 ) -> Result<crate::types::RelationId> {
1336 use crate::relation::{validate_new_relation, ExperienceRelation};
1337 use crate::types::RelationId;
1338
1339 // Validate input fields (self-relation, strength bounds, metadata size)
1340 validate_new_relation(&relation)?;
1341
1342 // Load source and target experiences to verify existence
1343 let source = self
1344 .storage
1345 .get_experience(relation.source_id)?
1346 .ok_or_else(|| PulseDBError::from(NotFoundError::experience(relation.source_id)))?;
1347 let target = self
1348 .storage
1349 .get_experience(relation.target_id)?
1350 .ok_or_else(|| PulseDBError::from(NotFoundError::experience(relation.target_id)))?;
1351
1352 // Verify same collective
1353 if source.collective_id != target.collective_id {
1354 return Err(PulseDBError::from(ValidationError::invalid_field(
1355 "target_id",
1356 "source and target experiences must belong to the same collective",
1357 )));
1358 }
1359
1360 // Check for duplicate (same source, target, type)
1361 if self.storage.relation_exists(
1362 relation.source_id,
1363 relation.target_id,
1364 relation.relation_type,
1365 )? {
1366 return Err(PulseDBError::from(ValidationError::invalid_field(
1367 "relation_type",
1368 "a relation with this source, target, and type already exists",
1369 )));
1370 }
1371
1372 // Construct the full relation
1373 let id = RelationId::new();
1374 let full_relation = ExperienceRelation {
1375 id,
1376 source_id: relation.source_id,
1377 target_id: relation.target_id,
1378 relation_type: relation.relation_type,
1379 strength: relation.strength,
1380 metadata: relation.metadata,
1381 created_at: Timestamp::now(),
1382 };
1383
1384 self.storage.save_relation(&full_relation)?;
1385
1386 info!(
1387 id = %id,
1388 source = %relation.source_id,
1389 target = %relation.target_id,
1390 relation_type = ?full_relation.relation_type,
1391 "Relation stored"
1392 );
1393 Ok(id)
1394 }
1395
1396 /// Retrieves experiences related to the given experience.
1397 ///
1398 /// Returns pairs of `(Experience, ExperienceRelation)` based on the
1399 /// requested direction:
1400 /// - `Outgoing`: experiences that this experience points TO (as source)
1401 /// - `Incoming`: experiences that point TO this experience (as target)
1402 /// - `Both`: union of outgoing and incoming
1403 ///
1404 /// To filter by relation type, use
1405 /// [`get_related_experiences_filtered`](Self::get_related_experiences_filtered).
1406 ///
1407 /// Silently skips relations where the related experience no longer exists
1408 /// (orphan tolerance).
1409 ///
1410 /// # Errors
1411 ///
1412 /// Returns a storage error if the read transaction fails.
1413 #[instrument(skip(self))]
1414 pub fn get_related_experiences(
1415 &self,
1416 experience_id: ExperienceId,
1417 direction: crate::relation::RelationDirection,
1418 ) -> Result<Vec<(Experience, crate::relation::ExperienceRelation)>> {
1419 self.get_related_experiences_filtered(experience_id, direction, None)
1420 }
1421
1422 /// Retrieves experiences related to the given experience, with optional
1423 /// type filtering.
1424 ///
1425 /// Like [`get_related_experiences()`](Self::get_related_experiences), but
1426 /// accepts an optional [`RelationType`](crate::RelationType) filter.
1427 /// When `Some(rt)`, only relations matching that type are returned.
1428 ///
1429 /// # Arguments
1430 ///
1431 /// * `experience_id` - The experience to query relations for
1432 /// * `direction` - Which direction(s) to traverse
1433 /// * `relation_type` - If `Some`, only return relations of this type
1434 ///
1435 /// # Example
1436 ///
1437 /// ```rust
1438 /// # fn main() -> pulsedb::Result<()> {
1439 /// # let dir = tempfile::tempdir().unwrap();
1440 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
1441 /// # let cid = db.create_collective("example")?;
1442 /// # let exp_a = db.record_experience(pulsedb::NewExperience {
1443 /// # collective_id: cid,
1444 /// # content: "a".into(),
1445 /// # embedding: Some(vec![0.1f32; 384]),
1446 /// # ..Default::default()
1447 /// # })?;
1448 /// use pulsedb::{RelationType, RelationDirection};
1449 ///
1450 /// // Only "Supports" relations outgoing from exp_a
1451 /// let supports = db.get_related_experiences_filtered(
1452 /// exp_a,
1453 /// RelationDirection::Outgoing,
1454 /// Some(RelationType::Supports),
1455 /// )?;
1456 /// # Ok(())
1457 /// # }
1458 /// ```
1459 #[instrument(skip(self))]
1460 pub fn get_related_experiences_filtered(
1461 &self,
1462 experience_id: ExperienceId,
1463 direction: crate::relation::RelationDirection,
1464 relation_type: Option<crate::relation::RelationType>,
1465 ) -> Result<Vec<(Experience, crate::relation::ExperienceRelation)>> {
1466 use crate::relation::RelationDirection;
1467
1468 let mut results = Vec::new();
1469
1470 // Outgoing: this experience is the source → fetch target experiences
1471 if matches!(
1472 direction,
1473 RelationDirection::Outgoing | RelationDirection::Both
1474 ) {
1475 let rel_ids = self.storage.get_relation_ids_by_source(experience_id)?;
1476 for rel_id in rel_ids {
1477 if let Some(relation) = self.storage.get_relation(rel_id)? {
1478 if relation_type.is_some_and(|rt| rt != relation.relation_type) {
1479 continue;
1480 }
1481 if let Some(experience) = self.storage.get_experience(relation.target_id)? {
1482 results.push((experience, relation));
1483 }
1484 }
1485 }
1486 }
1487
1488 // Incoming: this experience is the target → fetch source experiences
1489 if matches!(
1490 direction,
1491 RelationDirection::Incoming | RelationDirection::Both
1492 ) {
1493 let rel_ids = self.storage.get_relation_ids_by_target(experience_id)?;
1494 for rel_id in rel_ids {
1495 if let Some(relation) = self.storage.get_relation(rel_id)? {
1496 if relation_type.is_some_and(|rt| rt != relation.relation_type) {
1497 continue;
1498 }
1499 if let Some(experience) = self.storage.get_experience(relation.source_id)? {
1500 results.push((experience, relation));
1501 }
1502 }
1503 }
1504 }
1505
1506 Ok(results)
1507 }
1508
1509 /// Retrieves a relation by ID.
1510 ///
1511 /// Returns `None` if no relation with the given ID exists.
1512 pub fn get_relation(
1513 &self,
1514 id: crate::types::RelationId,
1515 ) -> Result<Option<crate::relation::ExperienceRelation>> {
1516 self.storage.get_relation(id)
1517 }
1518
1519 /// Deletes a relation by ID.
1520 ///
1521 /// # Errors
1522 ///
1523 /// Returns [`NotFoundError::Relation`] if no relation with the given ID exists.
1524 #[instrument(skip(self))]
1525 pub fn delete_relation(&self, id: crate::types::RelationId) -> Result<()> {
1526 let deleted = self.storage.delete_relation(id)?;
1527 if !deleted {
1528 return Err(PulseDBError::from(NotFoundError::relation(id)));
1529 }
1530 info!(id = %id, "Relation deleted");
1531 Ok(())
1532 }
1533
1534 // =========================================================================
1535 // Derived Insights (E3-S02)
1536 // =========================================================================
1537
1538 /// Stores a new derived insight.
1539 ///
1540 /// Creates a synthesized knowledge record from multiple source experiences.
1541 /// The method:
1542 /// 1. Validates the input (content, confidence, sources)
1543 /// 2. Verifies the collective exists
1544 /// 3. Verifies all source experiences exist and belong to the same collective
1545 /// 4. Resolves the embedding (generates if Builtin, requires if External)
1546 /// 5. Stores the insight with inline embedding
1547 /// 6. Inserts into the insight HNSW index
1548 ///
1549 /// # Arguments
1550 ///
1551 /// * `insight` - The insight to store (see [`NewDerivedInsight`])
1552 ///
1553 /// # Errors
1554 ///
1555 /// - [`ValidationError`](crate::ValidationError) if input is invalid
1556 /// - [`NotFoundError::Collective`] if the collective doesn't exist
1557 /// - [`NotFoundError::Experience`] if any source experience doesn't exist
1558 /// - [`ValidationError::InvalidField`] if source experiences belong to
1559 /// different collectives
1560 /// - [`ValidationError::DimensionMismatch`] if embedding dimension is wrong
1561 #[instrument(skip(self, insight), fields(collective_id = %insight.collective_id))]
1562 pub fn store_insight(&self, insight: NewDerivedInsight) -> Result<InsightId> {
1563 let is_external = matches!(self.config.embedding_provider, EmbeddingProvider::External);
1564
1565 // Validate input fields
1566 validate_new_insight(&insight)?;
1567
1568 // Verify collective exists
1569 let collective = self
1570 .storage
1571 .get_collective(insight.collective_id)?
1572 .ok_or_else(|| PulseDBError::from(NotFoundError::collective(insight.collective_id)))?;
1573
1574 // Verify all source experiences exist and belong to this collective
1575 for source_id in &insight.source_experience_ids {
1576 let source_exp = self
1577 .storage
1578 .get_experience(*source_id)?
1579 .ok_or_else(|| PulseDBError::from(NotFoundError::experience(*source_id)))?;
1580 if source_exp.collective_id != insight.collective_id {
1581 return Err(PulseDBError::from(ValidationError::invalid_field(
1582 "source_experience_ids",
1583 format!(
1584 "experience {} belongs to collective {}, not {}",
1585 source_id, source_exp.collective_id, insight.collective_id
1586 ),
1587 )));
1588 }
1589 }
1590
1591 // Resolve embedding
1592 let embedding = match insight.embedding {
1593 Some(ref emb) => {
1594 // Validate dimension
1595 let expected_dim = collective.embedding_dimension as usize;
1596 if emb.len() != expected_dim {
1597 return Err(ValidationError::dimension_mismatch(expected_dim, emb.len()).into());
1598 }
1599 emb.clone()
1600 }
1601 None => {
1602 if is_external {
1603 return Err(PulseDBError::embedding(
1604 "embedding is required when using External embedding provider",
1605 ));
1606 }
1607 self.embedding.embed(&insight.content)?
1608 }
1609 };
1610
1611 let embedding_for_hnsw = embedding.clone();
1612 let now = Timestamp::now();
1613 let id = InsightId::new();
1614
1615 // Construct the full insight record
1616 let derived_insight = DerivedInsight {
1617 id,
1618 collective_id: insight.collective_id,
1619 content: insight.content,
1620 embedding,
1621 source_experience_ids: insight.source_experience_ids,
1622 insight_type: insight.insight_type,
1623 confidence: insight.confidence,
1624 domain: insight.domain,
1625 created_at: now,
1626 updated_at: now,
1627 };
1628
1629 // Write to redb FIRST (source of truth)
1630 self.storage.save_insight(&derived_insight)?;
1631
1632 // Insert into insight HNSW index (using InsightId→ExperienceId byte conversion)
1633 let exp_id = ExperienceId::from_bytes(*id.as_bytes());
1634 let insight_vectors = self
1635 .insight_vectors
1636 .read()
1637 .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?;
1638 if let Some(index) = insight_vectors.get(&insight.collective_id) {
1639 index.insert_experience(exp_id, &embedding_for_hnsw)?;
1640 }
1641
1642 info!(id = %id, "Insight stored");
1643 Ok(id)
1644 }
1645
1646 /// Retrieves a derived insight by ID.
1647 ///
1648 /// Returns `None` if no insight with the given ID exists.
1649 #[instrument(skip(self))]
1650 pub fn get_insight(&self, id: InsightId) -> Result<Option<DerivedInsight>> {
1651 self.storage.get_insight(id)
1652 }
1653
1654 /// Searches for insights semantically similar to the query embedding.
1655 ///
1656 /// Uses the insight-specific HNSW index for approximate nearest neighbor
1657 /// search, then fetches full insight records from storage.
1658 ///
1659 /// # Arguments
1660 ///
1661 /// * `collective_id` - The collective to search within
1662 /// * `query` - Query embedding vector (must match collective's dimension)
1663 /// * `k` - Maximum number of results to return
1664 ///
1665 /// # Errors
1666 ///
1667 /// - [`ValidationError::DimensionMismatch`] if `query.len()` doesn't match
1668 /// - [`NotFoundError::Collective`] if the collective doesn't exist
1669 #[instrument(skip(self, query))]
1670 pub fn get_insights(
1671 &self,
1672 collective_id: CollectiveId,
1673 query: &[f32],
1674 k: usize,
1675 ) -> Result<Vec<(DerivedInsight, f32)>> {
1676 // Verify collective exists and check embedding dimension
1677 let collective = self
1678 .storage
1679 .get_collective(collective_id)?
1680 .ok_or_else(|| PulseDBError::from(NotFoundError::collective(collective_id)))?;
1681
1682 let expected_dim = collective.embedding_dimension as usize;
1683 if query.len() != expected_dim {
1684 return Err(ValidationError::dimension_mismatch(expected_dim, query.len()).into());
1685 }
1686
1687 let ef_search = self.config.hnsw.ef_search;
1688
1689 // Search insight HNSW — returns (ExperienceId, distance) pairs
1690 let insight_vectors = self
1691 .insight_vectors
1692 .read()
1693 .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?;
1694
1695 let candidates = match insight_vectors.get(&collective_id) {
1696 Some(index) => index.search_experiences(query, k, ef_search)?,
1697 None => return Ok(vec![]),
1698 };
1699 drop(insight_vectors);
1700
1701 // Convert ExperienceId back to InsightId and fetch records
1702 let mut results = Vec::with_capacity(candidates.len());
1703 for (exp_id, distance) in candidates {
1704 let insight_id = InsightId::from_bytes(*exp_id.as_bytes());
1705 if let Some(insight) = self.storage.get_insight(insight_id)? {
1706 // Convert HNSW distance to similarity (1.0 - distance), matching search_similar pattern
1707 results.push((insight, 1.0 - distance));
1708 }
1709 }
1710
1711 Ok(results)
1712 }
1713
1714 /// Deletes a derived insight by ID.
1715 ///
1716 /// Removes the insight from storage and soft-deletes it from the HNSW index.
1717 ///
1718 /// # Errors
1719 ///
1720 /// Returns [`NotFoundError::Insight`] if no insight with the given ID exists.
1721 #[instrument(skip(self))]
1722 pub fn delete_insight(&self, id: InsightId) -> Result<()> {
1723 // Read insight first to get collective_id for HNSW lookup
1724 let insight = self
1725 .storage
1726 .get_insight(id)?
1727 .ok_or_else(|| PulseDBError::from(NotFoundError::insight(id)))?;
1728
1729 // Delete from redb FIRST (source of truth)
1730 self.storage.delete_insight(id)?;
1731
1732 // Soft-delete from insight HNSW (using InsightId→ExperienceId byte conversion)
1733 let exp_id = ExperienceId::from_bytes(*id.as_bytes());
1734 let insight_vectors = self
1735 .insight_vectors
1736 .read()
1737 .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?;
1738 if let Some(index) = insight_vectors.get(&insight.collective_id) {
1739 index.delete_experience(exp_id)?;
1740 }
1741
1742 info!(id = %id, "Insight deleted");
1743 Ok(())
1744 }
1745
1746 // =========================================================================
1747 // Activity Tracking (E3-S03)
1748 // =========================================================================
1749
1750 /// Registers an agent's presence in a collective.
1751 ///
1752 /// Creates a new activity record or replaces an existing one for the
1753 /// same `(collective_id, agent_id)` pair (upsert semantics). Both
1754 /// `started_at` and `last_heartbeat` are set to `Timestamp::now()`.
1755 ///
1756 /// # Arguments
1757 ///
1758 /// * `activity` - The activity registration (see [`NewActivity`])
1759 ///
1760 /// # Errors
1761 ///
1762 /// - [`ValidationError`] if agent_id is empty or fields exceed size limits
1763 /// - [`NotFoundError::Collective`] if the collective doesn't exist
1764 ///
1765 /// # Example
1766 ///
1767 /// ```rust
1768 /// # fn main() -> pulsedb::Result<()> {
1769 /// # let dir = tempfile::tempdir().unwrap();
1770 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
1771 /// # let collective_id = db.create_collective("example")?;
1772 /// use pulsedb::NewActivity;
1773 ///
1774 /// db.register_activity(NewActivity {
1775 /// agent_id: "claude-opus".to_string(),
1776 /// collective_id,
1777 /// current_task: Some("Reviewing pull request".to_string()),
1778 /// context_summary: None,
1779 /// })?;
1780 /// # Ok(())
1781 /// # }
1782 /// ```
1783 #[instrument(skip(self, activity), fields(agent_id = %activity.agent_id, collective_id = %activity.collective_id))]
1784 pub fn register_activity(&self, activity: NewActivity) -> Result<()> {
1785 // Validate input
1786 validate_new_activity(&activity)?;
1787
1788 // Verify collective exists
1789 self.storage
1790 .get_collective(activity.collective_id)?
1791 .ok_or_else(|| PulseDBError::from(NotFoundError::collective(activity.collective_id)))?;
1792
1793 // Build stored activity with timestamps
1794 let now = Timestamp::now();
1795 let stored = Activity {
1796 agent_id: activity.agent_id,
1797 collective_id: activity.collective_id,
1798 current_task: activity.current_task,
1799 context_summary: activity.context_summary,
1800 started_at: now,
1801 last_heartbeat: now,
1802 };
1803
1804 self.storage.save_activity(&stored)?;
1805
1806 info!(
1807 agent_id = %stored.agent_id,
1808 collective_id = %stored.collective_id,
1809 "Activity registered"
1810 );
1811 Ok(())
1812 }
1813
1814 /// Updates an agent's heartbeat timestamp.
1815 ///
1816 /// Refreshes the `last_heartbeat` to `Timestamp::now()` without changing
1817 /// any other fields. The agent must have an existing activity registered.
1818 ///
1819 /// # Errors
1820 ///
1821 /// - [`NotFoundError::Activity`] if no activity exists for the agent/collective pair
1822 #[instrument(skip(self))]
1823 pub fn update_heartbeat(&self, agent_id: &str, collective_id: CollectiveId) -> Result<()> {
1824 let mut activity = self
1825 .storage
1826 .get_activity(agent_id, collective_id)?
1827 .ok_or_else(|| {
1828 PulseDBError::from(NotFoundError::activity(format!(
1829 "{} in {}",
1830 agent_id, collective_id
1831 )))
1832 })?;
1833
1834 activity.last_heartbeat = Timestamp::now();
1835 self.storage.save_activity(&activity)?;
1836
1837 info!(agent_id = %agent_id, collective_id = %collective_id, "Heartbeat updated");
1838 Ok(())
1839 }
1840
1841 /// Ends an agent's activity in a collective.
1842 ///
1843 /// Removes the activity record. After calling this, the agent will no
1844 /// longer appear in `get_active_agents()` results.
1845 ///
1846 /// # Errors
1847 ///
1848 /// - [`NotFoundError::Activity`] if no activity exists for the agent/collective pair
1849 #[instrument(skip(self))]
1850 pub fn end_activity(&self, agent_id: &str, collective_id: CollectiveId) -> Result<()> {
1851 let deleted = self.storage.delete_activity(agent_id, collective_id)?;
1852
1853 if !deleted {
1854 return Err(PulseDBError::from(NotFoundError::activity(format!(
1855 "{} in {}",
1856 agent_id, collective_id
1857 ))));
1858 }
1859
1860 info!(agent_id = %agent_id, collective_id = %collective_id, "Activity ended");
1861 Ok(())
1862 }
1863
1864 /// Returns all active (non-stale) agents in a collective.
1865 ///
1866 /// Fetches all activities, filters out those whose `last_heartbeat` is
1867 /// older than `config.activity.stale_threshold`, and returns the rest
1868 /// sorted by `last_heartbeat` descending (most recently active first).
1869 ///
1870 /// # Errors
1871 ///
1872 /// - [`NotFoundError::Collective`] if the collective doesn't exist
1873 #[instrument(skip(self))]
1874 pub fn get_active_agents(&self, collective_id: CollectiveId) -> Result<Vec<Activity>> {
1875 // Verify collective exists
1876 self.storage
1877 .get_collective(collective_id)?
1878 .ok_or_else(|| PulseDBError::from(NotFoundError::collective(collective_id)))?;
1879
1880 let all_activities = self.storage.list_activities_in_collective(collective_id)?;
1881
1882 // Filter stale activities
1883 let now = Timestamp::now();
1884 let threshold_ms = self.config.activity.stale_threshold.as_millis() as i64;
1885 let cutoff = now.as_millis() - threshold_ms;
1886
1887 let mut active: Vec<Activity> = all_activities
1888 .into_iter()
1889 .filter(|a| a.last_heartbeat.as_millis() >= cutoff)
1890 .collect();
1891
1892 // Sort by last_heartbeat descending (most recently active first)
1893 active.sort_by(|a, b| b.last_heartbeat.cmp(&a.last_heartbeat));
1894
1895 Ok(active)
1896 }
1897
1898 // =========================================================================
1899 // Context Candidates (E2-S04)
1900 // =========================================================================
1901
1902 /// Retrieves unified context candidates from all retrieval primitives.
1903 ///
1904 /// This is the primary API for context assembly. It orchestrates:
1905 /// 1. Similarity search ([`search_similar_filtered`](Self::search_similar_filtered))
1906 /// 2. Recent experiences ([`get_recent_experiences_filtered`](Self::get_recent_experiences_filtered))
1907 /// 3. Insight search ([`get_insights`](Self::get_insights)) — if requested
1908 /// 4. Relation collection ([`get_related_experiences`](Self::get_related_experiences)) — if requested
1909 /// 5. Active agents ([`get_active_agents`](Self::get_active_agents)) — if requested
1910 ///
1911 /// # Arguments
1912 ///
1913 /// * `request` - Configuration for which primitives to query and limits
1914 ///
1915 /// # Errors
1916 ///
1917 /// - [`ValidationError::InvalidField`] if `max_similar` or `max_recent` is 0 or > 1000
1918 /// - [`ValidationError::DimensionMismatch`] if `query_embedding.len()` doesn't match
1919 /// the collective's embedding dimension
1920 /// - [`NotFoundError::Collective`] if the collective doesn't exist
1921 ///
1922 /// # Performance
1923 ///
1924 /// Target: < 100ms at 100K experiences. The similarity search (~50ms) dominates;
1925 /// all other sub-calls are < 10ms each.
1926 ///
1927 /// # Example
1928 ///
1929 /// ```rust
1930 /// # fn main() -> pulsedb::Result<()> {
1931 /// # let dir = tempfile::tempdir().unwrap();
1932 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
1933 /// # let collective_id = db.create_collective("example")?;
1934 /// # let query_vec = vec![0.1f32; 384];
1935 /// use pulsedb::{ContextRequest, SearchFilter};
1936 ///
1937 /// let candidates = db.get_context_candidates(ContextRequest {
1938 /// collective_id,
1939 /// query_embedding: query_vec,
1940 /// max_similar: 10,
1941 /// max_recent: 5,
1942 /// include_insights: true,
1943 /// include_relations: true,
1944 /// include_active_agents: true,
1945 /// filter: SearchFilter {
1946 /// domains: Some(vec!["rust".to_string()]),
1947 /// ..SearchFilter::default()
1948 /// },
1949 /// ..ContextRequest::default()
1950 /// })?;
1951 /// # Ok(())
1952 /// # }
1953 /// ```
1954 #[instrument(skip(self, request), fields(collective_id = %request.collective_id))]
1955 pub fn get_context_candidates(&self, request: ContextRequest) -> Result<ContextCandidates> {
1956 // ── Validate limits ──────────────────────────────────────
1957 if request.max_similar == 0 || request.max_similar > 1000 {
1958 return Err(ValidationError::invalid_field(
1959 "max_similar",
1960 "must be between 1 and 1000",
1961 )
1962 .into());
1963 }
1964 if request.max_recent == 0 || request.max_recent > 1000 {
1965 return Err(
1966 ValidationError::invalid_field("max_recent", "must be between 1 and 1000").into(),
1967 );
1968 }
1969
1970 // ── Verify collective exists and check dimension ─────────
1971 let collective = self
1972 .storage
1973 .get_collective(request.collective_id)?
1974 .ok_or_else(|| PulseDBError::from(NotFoundError::collective(request.collective_id)))?;
1975
1976 let expected_dim = collective.embedding_dimension as usize;
1977 if request.query_embedding.len() != expected_dim {
1978 return Err(ValidationError::dimension_mismatch(
1979 expected_dim,
1980 request.query_embedding.len(),
1981 )
1982 .into());
1983 }
1984
1985 // ── 1. Similar experiences (HNSW vector search) ──────────
1986 let similar_experiences = self.search_similar_filtered(
1987 request.collective_id,
1988 &request.query_embedding,
1989 request.max_similar,
1990 request.filter.clone(),
1991 )?;
1992
1993 // ── 2. Recent experiences (timestamp index scan) ─────────
1994 let recent_experiences = self.get_recent_experiences_filtered(
1995 request.collective_id,
1996 request.max_recent,
1997 request.filter,
1998 )?;
1999
2000 // ── 3. Insights (HNSW vector search on insight index) ────
2001 let insights = if request.include_insights {
2002 self.get_insights(
2003 request.collective_id,
2004 &request.query_embedding,
2005 request.max_similar,
2006 )?
2007 .into_iter()
2008 .map(|(insight, _score)| insight)
2009 .collect()
2010 } else {
2011 vec![]
2012 };
2013
2014 // ── 4. Relations (graph traversal from result experiences) ─
2015 let relations = if request.include_relations {
2016 use std::collections::HashSet;
2017
2018 let mut seen = HashSet::new();
2019 let mut all_relations = Vec::new();
2020
2021 // Collect unique experience IDs from both result sets
2022 let exp_ids: Vec<_> = similar_experiences
2023 .iter()
2024 .map(|r| r.experience.id)
2025 .chain(recent_experiences.iter().map(|e| e.id))
2026 .collect();
2027
2028 for exp_id in exp_ids {
2029 let related =
2030 self.get_related_experiences(exp_id, crate::relation::RelationDirection::Both)?;
2031
2032 for (_experience, relation) in related {
2033 if seen.insert(relation.id) {
2034 all_relations.push(relation);
2035 }
2036 }
2037 }
2038
2039 all_relations
2040 } else {
2041 vec![]
2042 };
2043
2044 // ── 5. Active agents (staleness-filtered activity records) ─
2045 let active_agents = if request.include_active_agents {
2046 self.get_active_agents(request.collective_id)?
2047 } else {
2048 vec![]
2049 };
2050
2051 Ok(ContextCandidates {
2052 similar_experiences,
2053 recent_experiences,
2054 insights,
2055 relations,
2056 active_agents,
2057 })
2058 }
2059
2060 // =========================================================================
2061 // Watch System (E4-S01)
2062 // =========================================================================
2063
2064 /// Subscribes to all experience changes in a collective.
2065 ///
2066 /// Returns a [`WatchStream`] that yields [`WatchEvent`] values for every
2067 /// create, update, archive, and delete operation. The stream ends when
2068 /// dropped or when the `PulseDB` instance is closed.
2069 ///
2070 /// Multiple subscribers per collective are supported. Each gets an
2071 /// independent copy of every event.
2072 ///
2073 /// # Example
2074 ///
2075 /// ```rust,no_run
2076 /// # #[tokio::main]
2077 /// # async fn main() -> pulsedb::Result<()> {
2078 /// # let dir = tempfile::tempdir().unwrap();
2079 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
2080 /// # let collective_id = db.create_collective("example")?;
2081 /// use futures::StreamExt;
2082 ///
2083 /// let mut stream = db.watch_experiences(collective_id)?;
2084 /// while let Some(event) = stream.next().await {
2085 /// println!("{:?}: {}", event.event_type, event.experience_id);
2086 /// }
2087 /// # Ok(())
2088 /// # }
2089 /// ```
2090 pub fn watch_experiences(&self, collective_id: CollectiveId) -> Result<WatchStream> {
2091 self.watch.subscribe(collective_id, None)
2092 }
2093
2094 /// Subscribes to filtered experience changes in a collective.
2095 ///
2096 /// Like [`watch_experiences`](Self::watch_experiences), but only delivers
2097 /// events that match the filter criteria. Filters are applied on the
2098 /// sender side before channel delivery.
2099 ///
2100 /// # Example
2101 ///
2102 /// ```rust
2103 /// # fn main() -> pulsedb::Result<()> {
2104 /// # let dir = tempfile::tempdir().unwrap();
2105 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
2106 /// # let collective_id = db.create_collective("example")?;
2107 /// use pulsedb::WatchFilter;
2108 ///
2109 /// let filter = WatchFilter {
2110 /// domains: Some(vec!["security".to_string()]),
2111 /// min_importance: Some(0.7),
2112 /// ..Default::default()
2113 /// };
2114 /// let mut stream = db.watch_experiences_filtered(collective_id, filter)?;
2115 /// # Ok(())
2116 /// # }
2117 /// ```
2118 pub fn watch_experiences_filtered(
2119 &self,
2120 collective_id: CollectiveId,
2121 filter: WatchFilter,
2122 ) -> Result<WatchStream> {
2123 self.watch.subscribe(collective_id, Some(filter))
2124 }
2125
2126 // =========================================================================
2127 // Cross-Process Watch (E4-S02)
2128 // =========================================================================
2129
2130 /// Returns the current WAL sequence number.
2131 ///
2132 /// Use this to establish a baseline before starting to poll for changes.
2133 /// Returns 0 if no experience writes have occurred yet.
2134 ///
2135 /// # Example
2136 ///
2137 /// ```rust
2138 /// # fn main() -> pulsedb::Result<()> {
2139 /// # let dir = tempfile::tempdir().unwrap();
2140 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
2141 /// let seq = db.get_current_sequence()?;
2142 /// // ... later ...
2143 /// let (events, new_seq) = db.poll_changes(seq)?;
2144 /// # Ok(())
2145 /// # }
2146 /// ```
2147 pub fn get_current_sequence(&self) -> Result<u64> {
2148 self.storage.get_wal_sequence()
2149 }
2150
2151 /// Polls for experience changes since the given sequence number.
2152 ///
2153 /// Returns a tuple of `(events, new_sequence)`:
2154 /// - `events`: New [`WatchEvent`]s in sequence order
2155 /// - `new_sequence`: Pass this value back on the next call
2156 ///
2157 /// Returns an empty vec and the same sequence if no changes exist.
2158 ///
2159 /// # Arguments
2160 ///
2161 /// * `since_seq` - The last sequence number you received (0 for first call)
2162 ///
2163 /// # Performance
2164 ///
2165 /// Target: < 10ms per call. Internally performs a range scan on the
2166 /// watch_events table, O(k) where k is the number of new events.
2167 ///
2168 /// # Example
2169 ///
2170 /// ```rust,no_run
2171 /// # fn main() -> pulsedb::Result<()> {
2172 /// # let dir = tempfile::tempdir().unwrap();
2173 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
2174 /// use std::time::Duration;
2175 ///
2176 /// let mut seq = 0u64;
2177 /// loop {
2178 /// let (events, new_seq) = db.poll_changes(seq)?;
2179 /// seq = new_seq;
2180 /// for event in events {
2181 /// println!("{:?}: {}", event.event_type, event.experience_id);
2182 /// }
2183 /// std::thread::sleep(Duration::from_millis(100));
2184 /// }
2185 /// # }
2186 /// ```
2187 pub fn poll_changes(&self, since_seq: u64) -> Result<(Vec<WatchEvent>, u64)> {
2188 use crate::storage::schema::EntityTypeTag;
2189 let (records, new_seq) = self.storage.poll_watch_events(since_seq, 1000)?;
2190 let events = records
2191 .into_iter()
2192 .filter(|r| r.entity_type == EntityTypeTag::Experience)
2193 .map(WatchEvent::from)
2194 .collect();
2195 Ok((events, new_seq))
2196 }
2197
2198 /// Polls for changes with a custom batch size limit.
2199 ///
2200 /// Same as [`poll_changes`](Self::poll_changes) but returns at most
2201 /// `limit` events per call. Use this for backpressure control.
2202 pub fn poll_changes_batch(
2203 &self,
2204 since_seq: u64,
2205 limit: usize,
2206 ) -> Result<(Vec<WatchEvent>, u64)> {
2207 use crate::storage::schema::EntityTypeTag;
2208 let (records, new_seq) = self.storage.poll_watch_events(since_seq, limit)?;
2209 let events = records
2210 .into_iter()
2211 .filter(|r| r.entity_type == EntityTypeTag::Experience)
2212 .map(WatchEvent::from)
2213 .collect();
2214 Ok((events, new_seq))
2215 }
2216
2217 // =========================================================================
2218 // Sync WAL Compaction (feature: sync)
2219 // =========================================================================
2220
2221 /// Compacts the WAL by removing events that all peers have already synced.
2222 ///
2223 /// Finds the minimum cursor across all known peers and deletes WAL events
2224 /// up to that sequence. If no peers exist, no compaction occurs (events
2225 /// may be needed when a peer connects later).
2226 ///
2227 /// Call this periodically (e.g., daily) to reclaim disk space.
2228 /// Returns the number of WAL events deleted.
2229 ///
2230 /// # Example
2231 ///
2232 /// ```rust,no_run
2233 /// # fn main() -> pulsedb::Result<()> {
2234 /// # let dir = tempfile::tempdir().unwrap();
2235 /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
2236 /// let deleted = db.compact_wal()?;
2237 /// println!("Compacted {} WAL events", deleted);
2238 /// # Ok(())
2239 /// # }
2240 /// ```
2241 #[cfg(feature = "sync")]
2242 pub fn compact_wal(&self) -> Result<u64> {
2243 let cursors = self
2244 .storage
2245 .list_sync_cursors()
2246 .map_err(|e| PulseDBError::internal(format!("Failed to list sync cursors: {}", e)))?;
2247
2248 if cursors.is_empty() {
2249 // No peers — don't compact (events may be needed later)
2250 return Ok(0);
2251 }
2252
2253 let min_seq = cursors.iter().map(|c| c.last_sequence).min().unwrap_or(0);
2254
2255 if min_seq == 0 {
2256 return Ok(0);
2257 }
2258
2259 let deleted = self.storage.compact_wal_events(min_seq)?;
2260 info!(deleted, min_seq, "WAL compacted");
2261 Ok(deleted)
2262 }
2263
2264 // =========================================================================
2265 // Sync Apply Methods (feature: sync)
2266 // =========================================================================
2267 //
2268 // These methods apply remote changes received via sync. They bypass
2269 // validation and embedding generation (data was validated on the source).
2270 // WAL recording is suppressed by the SyncApplyGuard (entered by the caller).
2271 // Watch emit is skipped (no in-process notifications for sync changes).
2272 //
2273 // These are pub(crate) and will be called by the sync applier in Phase 3.
2274
2275 /// Applies a synced experience from a remote peer.
2276 ///
2277 /// Writes the full experience to storage and inserts into HNSW.
2278 /// Caller must hold `SyncApplyGuard` to suppress WAL recording.
2279 #[cfg(feature = "sync")]
2280 #[allow(dead_code)] // Called by sync applier (Phase 3)
2281 pub fn apply_synced_experience(&self, experience: Experience) -> Result<()> {
2282 let collective_id = experience.collective_id;
2283 let id = experience.id;
2284 let embedding = experience.embedding.clone();
2285
2286 self.storage.save_experience(&experience)?;
2287
2288 // Insert into HNSW index
2289 let vectors = self
2290 .vectors
2291 .read()
2292 .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?;
2293 if let Some(index) = vectors.get(&collective_id) {
2294 index.insert_experience(id, &embedding)?;
2295 }
2296
2297 debug!(id = %id, "Synced experience applied");
2298 Ok(())
2299 }
2300
2301 /// Applies a synced experience update from a remote peer.
2302 ///
2303 /// Caller must hold `SyncApplyGuard` to suppress WAL recording.
2304 #[cfg(feature = "sync")]
2305 #[allow(dead_code)] // Called by sync applier (Phase 3)
2306 pub fn apply_synced_experience_update(
2307 &self,
2308 id: ExperienceId,
2309 update: ExperienceUpdate,
2310 ) -> Result<()> {
2311 self.storage.update_experience(id, &update)?;
2312 debug!(id = %id, "Synced experience update applied");
2313 Ok(())
2314 }
2315
2316 /// Applies a synced experience deletion from a remote peer.
2317 ///
2318 /// Removes from storage and soft-deletes from HNSW.
2319 /// Caller must hold `SyncApplyGuard` to suppress WAL recording.
2320 #[cfg(feature = "sync")]
2321 #[allow(dead_code)] // Called by sync applier (Phase 3)
2322 pub fn apply_synced_experience_delete(&self, id: ExperienceId) -> Result<()> {
2323 // Get collective_id for HNSW lookup before deleting
2324 if let Some(exp) = self.storage.get_experience(id)? {
2325 let collective_id = exp.collective_id;
2326
2327 // Cascade delete relations
2328 self.storage.delete_relations_for_experience(id)?;
2329
2330 self.storage.delete_experience(id)?;
2331
2332 // Soft-delete from HNSW
2333 let vectors = self
2334 .vectors
2335 .read()
2336 .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?;
2337 if let Some(index) = vectors.get(&collective_id) {
2338 index.delete_experience(id)?;
2339 }
2340 }
2341
2342 debug!(id = %id, "Synced experience delete applied");
2343 Ok(())
2344 }
2345
2346 /// Applies a synced relation from a remote peer.
2347 ///
2348 /// Caller must hold `SyncApplyGuard` to suppress WAL recording.
2349 #[cfg(feature = "sync")]
2350 #[allow(dead_code)] // Called by sync applier (Phase 3)
2351 pub fn apply_synced_relation(&self, relation: ExperienceRelation) -> Result<()> {
2352 let id = relation.id;
2353 self.storage.save_relation(&relation)?;
2354 debug!(id = %id, "Synced relation applied");
2355 Ok(())
2356 }
2357
2358 /// Applies a synced relation deletion from a remote peer.
2359 ///
2360 /// Caller must hold `SyncApplyGuard` to suppress WAL recording.
2361 #[cfg(feature = "sync")]
2362 #[allow(dead_code)] // Called by sync applier (Phase 3)
2363 pub fn apply_synced_relation_delete(&self, id: RelationId) -> Result<()> {
2364 self.storage.delete_relation(id)?;
2365 debug!(id = %id, "Synced relation delete applied");
2366 Ok(())
2367 }
2368
2369 /// Applies a synced insight from a remote peer.
2370 ///
2371 /// Writes to storage and inserts into insight HNSW index.
2372 /// Caller must hold `SyncApplyGuard` to suppress WAL recording.
2373 #[cfg(feature = "sync")]
2374 #[allow(dead_code)] // Called by sync applier (Phase 3)
2375 pub fn apply_synced_insight(&self, insight: DerivedInsight) -> Result<()> {
2376 let id = insight.id;
2377 let collective_id = insight.collective_id;
2378 let embedding = insight.embedding.clone();
2379
2380 self.storage.save_insight(&insight)?;
2381
2382 // Insert into insight HNSW (using InsightId→ExperienceId byte conversion)
2383 let exp_id = ExperienceId::from_bytes(*id.as_bytes());
2384 let insight_vectors = self
2385 .insight_vectors
2386 .read()
2387 .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?;
2388 if let Some(index) = insight_vectors.get(&collective_id) {
2389 index.insert_experience(exp_id, &embedding)?;
2390 }
2391
2392 debug!(id = %id, "Synced insight applied");
2393 Ok(())
2394 }
2395
2396 /// Applies a synced insight deletion from a remote peer.
2397 ///
2398 /// Removes from storage and soft-deletes from insight HNSW.
2399 /// Caller must hold `SyncApplyGuard` to suppress WAL recording.
2400 #[cfg(feature = "sync")]
2401 #[allow(dead_code)] // Called by sync applier (Phase 3)
2402 pub fn apply_synced_insight_delete(&self, id: InsightId) -> Result<()> {
2403 if let Some(insight) = self.storage.get_insight(id)? {
2404 self.storage.delete_insight(id)?;
2405
2406 // Soft-delete from insight HNSW
2407 let exp_id = ExperienceId::from_bytes(*id.as_bytes());
2408 let insight_vectors = self
2409 .insight_vectors
2410 .read()
2411 .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?;
2412 if let Some(index) = insight_vectors.get(&insight.collective_id) {
2413 index.delete_experience(exp_id)?;
2414 }
2415 }
2416
2417 debug!(id = %id, "Synced insight delete applied");
2418 Ok(())
2419 }
2420
2421 /// Applies a synced collective from a remote peer.
2422 ///
2423 /// Writes to storage and creates HNSW indexes for the collective.
2424 /// Caller must hold `SyncApplyGuard` to suppress WAL recording.
2425 #[cfg(feature = "sync")]
2426 #[allow(dead_code)] // Called by sync applier (Phase 3)
2427 pub fn apply_synced_collective(&self, collective: Collective) -> Result<()> {
2428 let id = collective.id;
2429 let dimension = collective.embedding_dimension as usize;
2430
2431 self.storage.save_collective(&collective)?;
2432
2433 // Create HNSW indexes (same as create_collective)
2434 let exp_index = crate::vector::HnswIndex::new(dimension, &self.config.hnsw);
2435 let insight_index = crate::vector::HnswIndex::new(dimension, &self.config.hnsw);
2436 self.vectors
2437 .write()
2438 .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?
2439 .insert(id, exp_index);
2440 self.insight_vectors
2441 .write()
2442 .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?
2443 .insert(id, insight_index);
2444
2445 debug!(id = %id, "Synced collective applied");
2446 Ok(())
2447 }
2448}
2449
2450// PulseDB is auto Send + Sync: Box<dyn StorageEngine + Send + Sync>,
2451// Box<dyn EmbeddingService + Send + Sync>, and Config are all Send + Sync.
2452
2453#[cfg(test)]
2454mod tests {
2455 use super::*;
2456 use crate::config::EmbeddingDimension;
2457 use tempfile::tempdir;
2458
2459 #[test]
2460 fn test_open_creates_database() {
2461 let dir = tempdir().unwrap();
2462 let path = dir.path().join("test.db");
2463
2464 let db = PulseDB::open(&path, Config::default()).unwrap();
2465
2466 assert!(path.exists());
2467 assert_eq!(db.embedding_dimension(), 384);
2468
2469 db.close().unwrap();
2470 }
2471
2472 #[test]
2473 fn test_open_existing_database() {
2474 let dir = tempdir().unwrap();
2475 let path = dir.path().join("test.db");
2476
2477 // Create
2478 let db = PulseDB::open(&path, Config::default()).unwrap();
2479 db.close().unwrap();
2480
2481 // Reopen
2482 let db = PulseDB::open(&path, Config::default()).unwrap();
2483 assert_eq!(db.embedding_dimension(), 384);
2484 db.close().unwrap();
2485 }
2486
2487 #[test]
2488 fn test_config_validation() {
2489 let dir = tempdir().unwrap();
2490 let path = dir.path().join("test.db");
2491
2492 let invalid_config = Config {
2493 cache_size_mb: 0, // Invalid
2494 ..Default::default()
2495 };
2496
2497 let result = PulseDB::open(&path, invalid_config);
2498 assert!(result.is_err());
2499 }
2500
2501 #[test]
2502 fn test_dimension_mismatch() {
2503 let dir = tempdir().unwrap();
2504 let path = dir.path().join("test.db");
2505
2506 // Create with D384
2507 let db = PulseDB::open(
2508 &path,
2509 Config {
2510 embedding_dimension: EmbeddingDimension::D384,
2511 ..Default::default()
2512 },
2513 )
2514 .unwrap();
2515 db.close().unwrap();
2516
2517 // Try to reopen with D768
2518 let result = PulseDB::open(
2519 &path,
2520 Config {
2521 embedding_dimension: EmbeddingDimension::D768,
2522 ..Default::default()
2523 },
2524 );
2525
2526 assert!(result.is_err());
2527 }
2528
2529 #[test]
2530 fn test_metadata_access() {
2531 let dir = tempdir().unwrap();
2532 let path = dir.path().join("test.db");
2533
2534 let db = PulseDB::open(&path, Config::default()).unwrap();
2535
2536 let metadata = db.metadata();
2537 assert_eq!(metadata.embedding_dimension, EmbeddingDimension::D384);
2538
2539 db.close().unwrap();
2540 }
2541
2542 #[test]
2543 fn test_pulsedb_is_send_sync() {
2544 fn assert_send_sync<T: Send + Sync>() {}
2545 assert_send_sync::<PulseDB>();
2546 }
2547}