Skip to main content

pulsedb/
db.rs

1//! PulseDB main struct and lifecycle operations.
2//!
3//! The [`PulseDB`] struct is the primary interface for interacting with
4//! the database. It provides methods for:
5//!
6//! - Opening and closing the database
7//! - Managing collectives (isolation units)
8//! - Recording and querying experiences
9//! - Semantic search and context retrieval
10//!
11//! # Quick Start
12//!
13//! ```rust
14//! # fn main() -> pulsedb::Result<()> {
15//! # let dir = tempfile::tempdir().unwrap();
16//! use pulsedb::{PulseDB, Config, NewExperience};
17//!
18//! // Open or create a database
19//! let db = PulseDB::open(dir.path().join("test.db"), Config::default())?;
20//!
21//! // Create a collective for your project
22//! let collective = db.create_collective("my-project")?;
23//!
24//! // Record an experience
25//! db.record_experience(NewExperience {
26//!     collective_id: collective,
27//!     content: "Always validate user input".to_string(),
28//!     embedding: Some(vec![0.1f32; 384]),
29//!     ..Default::default()
30//! })?;
31//!
32//! // Close when done
33//! db.close()?;
34//! # Ok(())
35//! # }
36//! ```
37//!
38//! # Thread Safety
39//!
40//! `PulseDB` is `Send + Sync` and can be shared across threads using `Arc`.
41//! The underlying storage uses MVCC for concurrent reads with exclusive
42//! write locking.
43//!
44//! ```rust
45//! # fn main() -> pulsedb::Result<()> {
46//! # let dir = tempfile::tempdir().unwrap();
47//! use std::sync::Arc;
48//! use pulsedb::{PulseDB, Config};
49//!
50//! let db = Arc::new(PulseDB::open(dir.path().join("test.db"), Config::default())?);
51//!
52//! // Clone Arc for use in another thread
53//! let db_clone = Arc::clone(&db);
54//! std::thread::spawn(move || {
55//!     // Safe to use db_clone here
56//! });
57//! # Ok(())
58//! # }
59//! ```
60
61use std::collections::HashMap;
62use std::path::{Path, PathBuf};
63use std::sync::{Arc, RwLock};
64
65use tracing::{info, instrument, warn};
66
67use crate::activity::{validate_new_activity, Activity, NewActivity};
68use crate::collective::types::CollectiveStats;
69use crate::collective::{validate_collective_name, Collective};
70use crate::config::{Config, EmbeddingProvider};
71use crate::embedding::{create_embedding_service, EmbeddingService};
72use crate::error::{NotFoundError, PulseDBError, Result, ValidationError};
73use crate::experience::{
74    validate_experience_update, validate_new_experience, Experience, ExperienceUpdate,
75    NewExperience,
76};
77use crate::insight::{validate_new_insight, DerivedInsight, NewDerivedInsight};
78use crate::search::{ContextCandidates, ContextRequest, SearchFilter, SearchResult};
79use crate::storage::{open_storage, DatabaseMetadata, StorageEngine};
80use crate::types::{CollectiveId, ExperienceId, InsightId, Timestamp};
81use crate::vector::HnswIndex;
82use crate::watch::{WatchEvent, WatchEventType, WatchFilter, WatchService, WatchStream};
83
84/// The main PulseDB database handle.
85///
86/// This is the primary interface for all database operations. Create an
87/// instance with [`PulseDB::open()`] and close it with [`PulseDB::close()`].
88///
89/// # Ownership
90///
91/// `PulseDB` owns its storage and embedding service. When you call `close()`,
92/// the database is consumed and cannot be used afterward. This ensures
93/// resources are properly released.
94pub struct PulseDB {
95    /// Storage engine (redb or mock for testing).
96    storage: Box<dyn StorageEngine>,
97
98    /// Embedding service (external or ONNX).
99    embedding: Box<dyn EmbeddingService>,
100
101    /// Configuration used to open this database.
102    config: Config,
103
104    /// Per-collective HNSW vector indexes for experience semantic search.
105    ///
106    /// Outer RwLock protects the HashMap (add/remove collectives).
107    /// Each HnswIndex has its own internal RwLock for concurrent search+insert.
108    vectors: RwLock<HashMap<CollectiveId, HnswIndex>>,
109
110    /// Per-collective HNSW vector indexes for insight semantic search.
111    ///
112    /// Separate from `vectors` to prevent ID collisions between experiences
113    /// and insights. Uses InsightId→ExperienceId byte conversion for the
114    /// HNSW API (safe because indexes are isolated per collective).
115    insight_vectors: RwLock<HashMap<CollectiveId, HnswIndex>>,
116
117    /// Watch service for real-time experience change notifications.
118    ///
119    /// Arc-wrapped because [`WatchStream`] holds a weak reference for
120    /// cleanup on drop.
121    watch: Arc<WatchService>,
122}
123
124impl std::fmt::Debug for PulseDB {
125    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126        let vector_count = self.vectors.read().map(|v| v.len()).unwrap_or(0);
127        let insight_vector_count = self.insight_vectors.read().map(|v| v.len()).unwrap_or(0);
128        f.debug_struct("PulseDB")
129            .field("config", &self.config)
130            .field("embedding_dimension", &self.embedding_dimension())
131            .field("vector_indexes", &vector_count)
132            .field("insight_vector_indexes", &insight_vector_count)
133            .finish_non_exhaustive()
134    }
135}
136
137impl PulseDB {
138    /// Opens or creates a PulseDB database at the specified path.
139    ///
140    /// If the database doesn't exist, it will be created with the given
141    /// configuration. If it exists, the configuration will be validated
142    /// against the stored settings (e.g., embedding dimension must match).
143    ///
144    /// # Arguments
145    ///
146    /// * `path` - Path to the database file (created if it doesn't exist)
147    /// * `config` - Configuration options for the database
148    ///
149    /// # Errors
150    ///
151    /// Returns an error if:
152    /// - Configuration is invalid (see [`Config::validate`])
153    /// - Database file is corrupted
154    /// - Database is locked by another process
155    /// - Schema version doesn't match (needs migration)
156    /// - Embedding dimension doesn't match existing database
157    ///
158    /// # Example
159    ///
160    /// ```rust
161    /// # fn main() -> pulsedb::Result<()> {
162    /// # let dir = tempfile::tempdir().unwrap();
163    /// use pulsedb::{PulseDB, Config, EmbeddingDimension};
164    ///
165    /// // Open with default configuration
166    /// let db = PulseDB::open(dir.path().join("default.db"), Config::default())?;
167    /// # drop(db);
168    ///
169    /// // Open with custom embedding dimension
170    /// let db = PulseDB::open(dir.path().join("custom.db"), Config {
171    ///     embedding_dimension: EmbeddingDimension::D768,
172    ///     ..Default::default()
173    /// })?;
174    /// # Ok(())
175    /// # }
176    /// ```
177    #[instrument(skip(config), fields(path = %path.as_ref().display()))]
178    pub fn open(path: impl AsRef<Path>, config: Config) -> Result<Self> {
179        // Validate configuration first
180        config.validate().map_err(PulseDBError::from)?;
181
182        info!("Opening PulseDB");
183
184        // Open storage engine
185        let storage = open_storage(&path, &config)?;
186
187        // Create embedding service
188        let embedding = create_embedding_service(&config)?;
189
190        // Load or rebuild HNSW indexes for all existing collectives
191        let vectors = Self::load_all_indexes(&*storage, &config)?;
192        let insight_vectors = Self::load_all_insight_indexes(&*storage, &config)?;
193
194        info!(
195            dimension = config.embedding_dimension.size(),
196            sync_mode = ?config.sync_mode,
197            collectives = vectors.len(),
198            "PulseDB opened successfully"
199        );
200
201        let watch = Arc::new(WatchService::new(
202            config.watch.buffer_size,
203            config.watch.in_process,
204        ));
205
206        Ok(Self {
207            storage,
208            embedding,
209            config,
210            vectors: RwLock::new(vectors),
211            insight_vectors: RwLock::new(insight_vectors),
212            watch,
213        })
214    }
215
216    /// Closes the database, flushing all pending writes.
217    ///
218    /// This method consumes the `PulseDB` instance, ensuring it cannot
219    /// be used after closing. The underlying storage engine flushes all
220    /// buffered data to disk.
221    ///
222    /// # Errors
223    ///
224    /// Returns an error if the storage backend reports a flush failure.
225    /// Note: the current redb backend flushes durably on drop, so this
226    /// always returns `Ok(())` in practice.
227    ///
228    /// # Example
229    ///
230    /// ```rust
231    /// # fn main() -> pulsedb::Result<()> {
232    /// # let dir = tempfile::tempdir().unwrap();
233    /// use pulsedb::{PulseDB, Config};
234    ///
235    /// let db = PulseDB::open(dir.path().join("test.db"), Config::default())?;
236    /// // ... use the database ...
237    /// db.close()?;  // db is consumed here
238    /// // db.something() // Compile error: db was moved
239    /// # Ok(())
240    /// # }
241    /// ```
242    #[instrument(skip(self))]
243    pub fn close(self) -> Result<()> {
244        info!("Closing PulseDB");
245
246        // Persist HNSW indexes BEFORE closing storage.
247        // If HNSW save fails, storage is still open for potential recovery.
248        // On next open(), stale/missing HNSW files trigger a rebuild from redb.
249        if let Some(hnsw_dir) = self.hnsw_dir() {
250            // Experience HNSW indexes
251            let vectors = self
252                .vectors
253                .read()
254                .map_err(|_| PulseDBError::vector("Vectors lock poisoned during close"))?;
255            for (collective_id, index) in vectors.iter() {
256                if let Err(e) = index.save_to_dir(&hnsw_dir, &collective_id.to_string()) {
257                    warn!(
258                        collective = %collective_id,
259                        error = %e,
260                        "Failed to save HNSW index (will rebuild on next open)"
261                    );
262                }
263            }
264            drop(vectors);
265
266            // Insight HNSW indexes (separate files with _insights suffix)
267            let insight_vectors = self
268                .insight_vectors
269                .read()
270                .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned during close"))?;
271            for (collective_id, index) in insight_vectors.iter() {
272                let name = format!("{}_insights", collective_id);
273                if let Err(e) = index.save_to_dir(&hnsw_dir, &name) {
274                    warn!(
275                        collective = %collective_id,
276                        error = %e,
277                        "Failed to save insight HNSW index (will rebuild on next open)"
278                    );
279                }
280            }
281        }
282
283        // Close storage (flushes pending writes)
284        self.storage.close()?;
285
286        info!("PulseDB closed successfully");
287        Ok(())
288    }
289
290    /// Returns a reference to the database configuration.
291    ///
292    /// This is the configuration that was used to open the database.
293    /// Note that some settings (like embedding dimension) are locked
294    /// on database creation and cannot be changed.
295    #[inline]
296    pub fn config(&self) -> &Config {
297        &self.config
298    }
299
300    /// Returns the database metadata.
301    ///
302    /// Metadata includes schema version, embedding dimension, and timestamps
303    /// for when the database was created and last opened.
304    #[inline]
305    pub fn metadata(&self) -> &DatabaseMetadata {
306        self.storage.metadata()
307    }
308
309    /// Returns the embedding dimension configured for this database.
310    ///
311    /// All embeddings stored in this database must have exactly this
312    /// many dimensions.
313    #[inline]
314    pub fn embedding_dimension(&self) -> usize {
315        self.config.embedding_dimension.size()
316    }
317
318    // =========================================================================
319    // Internal Accessors (for use by feature modules)
320    // =========================================================================
321
322    /// Returns a reference to the storage engine.
323    ///
324    /// This is for internal use by other PulseDB modules.
325    #[inline]
326    #[allow(dead_code)] // Will be used by search (Phase 2) and other modules
327    pub(crate) fn storage(&self) -> &dyn StorageEngine {
328        self.storage.as_ref()
329    }
330
331    /// Returns a reference to the embedding service.
332    ///
333    /// This is for internal use by other PulseDB modules.
334    #[inline]
335    #[allow(dead_code)] // Will be used by search (Phase 2) and other modules
336    pub(crate) fn embedding(&self) -> &dyn EmbeddingService {
337        self.embedding.as_ref()
338    }
339
340    // =========================================================================
341    // HNSW Index Lifecycle
342    // =========================================================================
343
344    /// Returns the directory for HNSW index files.
345    ///
346    /// Derives `{db_path}.hnsw/` from the storage path. Returns `None` if
347    /// the storage has no file path (e.g., in-memory tests).
348    fn hnsw_dir(&self) -> Option<PathBuf> {
349        self.storage.path().map(|p| {
350            let mut hnsw_path = p.as_os_str().to_owned();
351            hnsw_path.push(".hnsw");
352            PathBuf::from(hnsw_path)
353        })
354    }
355
356    /// Loads or rebuilds HNSW indexes for all existing collectives.
357    ///
358    /// For each collective in storage:
359    /// 1. Try loading metadata from `.hnsw.meta` file
360    /// 2. Rebuild the graph from redb embeddings (always, since we can't
361    ///    load the graph due to hnsw_rs lifetime constraints)
362    /// 3. Restore deleted set from metadata if available
363    fn load_all_indexes(
364        storage: &dyn StorageEngine,
365        config: &Config,
366    ) -> Result<HashMap<CollectiveId, HnswIndex>> {
367        let collectives = storage.list_collectives()?;
368        let mut vectors = HashMap::with_capacity(collectives.len());
369
370        let hnsw_dir = storage.path().map(|p| {
371            let mut hnsw_path = p.as_os_str().to_owned();
372            hnsw_path.push(".hnsw");
373            PathBuf::from(hnsw_path)
374        });
375
376        for collective in &collectives {
377            let dimension = collective.embedding_dimension as usize;
378
379            // List all experience IDs in this collective
380            let exp_ids = storage.list_experience_ids_in_collective(collective.id)?;
381
382            // Load embeddings from redb (source of truth)
383            let mut embeddings = Vec::with_capacity(exp_ids.len());
384            for exp_id in &exp_ids {
385                if let Some(embedding) = storage.get_embedding(*exp_id)? {
386                    embeddings.push((*exp_id, embedding));
387                }
388            }
389
390            // Try loading metadata (for deleted set and ID mappings)
391            let metadata = hnsw_dir
392                .as_ref()
393                .and_then(|dir| HnswIndex::load_metadata(dir, &collective.id.to_string()).ok())
394                .flatten();
395
396            // Rebuild the HNSW graph from embeddings
397            let index = if embeddings.is_empty() {
398                HnswIndex::new(dimension, &config.hnsw)
399            } else {
400                let start = std::time::Instant::now();
401                let idx = HnswIndex::rebuild_from_embeddings(dimension, &config.hnsw, embeddings)?;
402                info!(
403                    collective = %collective.id,
404                    vectors = idx.active_count(),
405                    elapsed_ms = start.elapsed().as_millis() as u64,
406                    "Rebuilt HNSW index from redb embeddings"
407                );
408                idx
409            };
410
411            // Restore deleted set from metadata if available
412            if let Some(meta) = metadata {
413                index.restore_deleted_set(&meta.deleted)?;
414            }
415
416            vectors.insert(collective.id, index);
417        }
418
419        Ok(vectors)
420    }
421
422    /// Loads or rebuilds insight HNSW indexes for all existing collectives.
423    ///
424    /// For each collective, loads all insights from storage and rebuilds
425    /// the HNSW graph from their inline embeddings. Uses InsightId→ExperienceId
426    /// byte conversion for the HNSW API.
427    fn load_all_insight_indexes(
428        storage: &dyn StorageEngine,
429        config: &Config,
430    ) -> Result<HashMap<CollectiveId, HnswIndex>> {
431        let collectives = storage.list_collectives()?;
432        let mut insight_vectors = HashMap::with_capacity(collectives.len());
433
434        let hnsw_dir = storage.path().map(|p| {
435            let mut hnsw_path = p.as_os_str().to_owned();
436            hnsw_path.push(".hnsw");
437            PathBuf::from(hnsw_path)
438        });
439
440        for collective in &collectives {
441            let dimension = collective.embedding_dimension as usize;
442
443            // List all insight IDs in this collective
444            let insight_ids = storage.list_insight_ids_in_collective(collective.id)?;
445
446            // Load insights and extract embeddings (converting InsightId → ExperienceId)
447            let mut embeddings = Vec::with_capacity(insight_ids.len());
448            for insight_id in &insight_ids {
449                if let Some(insight) = storage.get_insight(*insight_id)? {
450                    let exp_id = ExperienceId::from_bytes(*insight_id.as_bytes());
451                    embeddings.push((exp_id, insight.embedding));
452                }
453            }
454
455            // Try loading metadata (for deleted set)
456            let name = format!("{}_insights", collective.id);
457            let metadata = hnsw_dir
458                .as_ref()
459                .and_then(|dir| HnswIndex::load_metadata(dir, &name).ok())
460                .flatten();
461
462            // Rebuild HNSW graph from embeddings
463            let index = if embeddings.is_empty() {
464                HnswIndex::new(dimension, &config.hnsw)
465            } else {
466                let start = std::time::Instant::now();
467                let idx = HnswIndex::rebuild_from_embeddings(dimension, &config.hnsw, embeddings)?;
468                info!(
469                    collective = %collective.id,
470                    insights = idx.active_count(),
471                    elapsed_ms = start.elapsed().as_millis() as u64,
472                    "Rebuilt insight HNSW index from stored insights"
473                );
474                idx
475            };
476
477            // Restore deleted set from metadata if available
478            if let Some(meta) = metadata {
479                index.restore_deleted_set(&meta.deleted)?;
480            }
481
482            insight_vectors.insert(collective.id, index);
483        }
484
485        Ok(insight_vectors)
486    }
487
488    /// Executes a closure with the HNSW index for a collective.
489    ///
490    /// This is the primary accessor for vector search operations (used by
491    /// `search_similar()`). The closure runs while the outer RwLock guard
492    /// is held (read lock), so the HnswIndex reference stays valid.
493    /// Returns `None` if no index exists for the collective.
494    #[doc(hidden)]
495    pub fn with_vector_index<F, R>(&self, collective_id: CollectiveId, f: F) -> Result<Option<R>>
496    where
497        F: FnOnce(&HnswIndex) -> Result<R>,
498    {
499        let vectors = self
500            .vectors
501            .read()
502            .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?;
503        match vectors.get(&collective_id) {
504            Some(index) => Ok(Some(f(index)?)),
505            None => Ok(None),
506        }
507    }
508
509    // =========================================================================
510    // Test Helpers
511    // =========================================================================
512
513    /// Returns a reference to the storage engine for integration testing.
514    ///
515    /// This method is intentionally hidden from documentation. It provides
516    /// test-only access to the storage layer for verifying ACID guarantees
517    /// and crash recovery. Production code should use the public PulseDB API.
518    #[doc(hidden)]
519    #[inline]
520    pub fn storage_for_test(&self) -> &dyn StorageEngine {
521        self.storage.as_ref()
522    }
523
524    // =========================================================================
525    // Collective Management (E1-S02)
526    // =========================================================================
527
528    /// Creates a new collective with the given name.
529    ///
530    /// The collective's embedding dimension is locked to the database's
531    /// configured dimension at creation time.
532    ///
533    /// # Arguments
534    ///
535    /// * `name` - Human-readable name (1-255 characters, not whitespace-only)
536    ///
537    /// # Errors
538    ///
539    /// Returns a validation error if the name is empty, whitespace-only,
540    /// or exceeds 255 characters.
541    ///
542    /// # Example
543    ///
544    /// ```rust
545    /// # fn main() -> pulsedb::Result<()> {
546    /// # let dir = tempfile::tempdir().unwrap();
547    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
548    /// let id = db.create_collective("my-project")?;
549    /// # Ok(())
550    /// # }
551    /// ```
552    #[instrument(skip(self))]
553    pub fn create_collective(&self, name: &str) -> Result<CollectiveId> {
554        validate_collective_name(name)?;
555
556        let dimension = self.config.embedding_dimension.size() as u16;
557        let collective = Collective::new(name, dimension);
558        let id = collective.id;
559
560        // Persist to redb first (source of truth)
561        self.storage.save_collective(&collective)?;
562
563        // Create empty HNSW indexes for this collective
564        let exp_index = HnswIndex::new(dimension as usize, &self.config.hnsw);
565        let insight_index = HnswIndex::new(dimension as usize, &self.config.hnsw);
566        self.vectors
567            .write()
568            .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?
569            .insert(id, exp_index);
570        self.insight_vectors
571            .write()
572            .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?
573            .insert(id, insight_index);
574
575        info!(id = %id, name = %name, "Collective created");
576        Ok(id)
577    }
578
579    /// Creates a new collective with an owner for multi-tenancy.
580    ///
581    /// Same as [`create_collective`](Self::create_collective) but assigns
582    /// an owner ID, enabling filtering with
583    /// [`list_collectives_by_owner`](Self::list_collectives_by_owner).
584    ///
585    /// # Arguments
586    ///
587    /// * `name` - Human-readable name (1-255 characters)
588    /// * `owner_id` - Owner identifier (must not be empty)
589    ///
590    /// # Errors
591    ///
592    /// Returns a validation error if the name or owner_id is invalid.
593    #[instrument(skip(self))]
594    pub fn create_collective_with_owner(&self, name: &str, owner_id: &str) -> Result<CollectiveId> {
595        validate_collective_name(name)?;
596
597        if owner_id.is_empty() {
598            return Err(PulseDBError::from(
599                crate::error::ValidationError::required_field("owner_id"),
600            ));
601        }
602
603        let dimension = self.config.embedding_dimension.size() as u16;
604        let collective = Collective::with_owner(name, owner_id, dimension);
605        let id = collective.id;
606
607        // Persist to redb first (source of truth)
608        self.storage.save_collective(&collective)?;
609
610        // Create empty HNSW indexes for this collective
611        let exp_index = HnswIndex::new(dimension as usize, &self.config.hnsw);
612        let insight_index = HnswIndex::new(dimension as usize, &self.config.hnsw);
613        self.vectors
614            .write()
615            .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?
616            .insert(id, exp_index);
617        self.insight_vectors
618            .write()
619            .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?
620            .insert(id, insight_index);
621
622        info!(id = %id, name = %name, owner = %owner_id, "Collective created with owner");
623        Ok(id)
624    }
625
626    /// Returns a collective by ID, or `None` if not found.
627    ///
628    /// # Example
629    ///
630    /// ```rust
631    /// # fn main() -> pulsedb::Result<()> {
632    /// # let dir = tempfile::tempdir().unwrap();
633    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
634    /// # let id = db.create_collective("example")?;
635    /// if let Some(collective) = db.get_collective(id)? {
636    ///     println!("Found: {}", collective.name);
637    /// }
638    /// # Ok(())
639    /// # }
640    /// ```
641    #[instrument(skip(self))]
642    pub fn get_collective(&self, id: CollectiveId) -> Result<Option<Collective>> {
643        self.storage.get_collective(id)
644    }
645
646    /// Lists all collectives in the database.
647    ///
648    /// Returns an empty vector if no collectives exist.
649    pub fn list_collectives(&self) -> Result<Vec<Collective>> {
650        self.storage.list_collectives()
651    }
652
653    /// Lists collectives filtered by owner ID.
654    ///
655    /// Returns only collectives whose `owner_id` matches the given value.
656    /// Returns an empty vector if no matching collectives exist.
657    pub fn list_collectives_by_owner(&self, owner_id: &str) -> Result<Vec<Collective>> {
658        let all = self.storage.list_collectives()?;
659        Ok(all
660            .into_iter()
661            .filter(|c| c.owner_id.as_deref() == Some(owner_id))
662            .collect())
663    }
664
665    /// Returns statistics for a collective.
666    ///
667    /// # Errors
668    ///
669    /// Returns [`NotFoundError::Collective`] if the collective doesn't exist.
670    #[instrument(skip(self))]
671    pub fn get_collective_stats(&self, id: CollectiveId) -> Result<CollectiveStats> {
672        // Verify collective exists
673        self.storage
674            .get_collective(id)?
675            .ok_or_else(|| PulseDBError::from(NotFoundError::collective(id)))?;
676
677        let experience_count = self.storage.count_experiences_in_collective(id)?;
678
679        Ok(CollectiveStats {
680            experience_count,
681            storage_bytes: 0,
682            oldest_experience: None,
683            newest_experience: None,
684        })
685    }
686
687    /// Deletes a collective and all its associated data.
688    ///
689    /// Performs cascade deletion: removes all experiences belonging to the
690    /// collective before removing the collective record itself.
691    ///
692    /// # Errors
693    ///
694    /// Returns [`NotFoundError::Collective`] if the collective doesn't exist.
695    ///
696    /// # Example
697    ///
698    /// ```rust
699    /// # fn main() -> pulsedb::Result<()> {
700    /// # let dir = tempfile::tempdir().unwrap();
701    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
702    /// # let collective_id = db.create_collective("to-delete")?;
703    /// db.delete_collective(collective_id)?;
704    /// assert!(db.get_collective(collective_id)?.is_none());
705    /// # Ok(())
706    /// # }
707    /// ```
708    #[instrument(skip(self))]
709    pub fn delete_collective(&self, id: CollectiveId) -> Result<()> {
710        // Verify collective exists
711        self.storage
712            .get_collective(id)?
713            .ok_or_else(|| PulseDBError::from(NotFoundError::collective(id)))?;
714
715        // Cascade: delete all experiences for this collective
716        let deleted_count = self.storage.delete_experiences_by_collective(id)?;
717        if deleted_count > 0 {
718            info!(count = deleted_count, "Cascade-deleted experiences");
719        }
720
721        // Cascade: delete all insights for this collective
722        let deleted_insights = self.storage.delete_insights_by_collective(id)?;
723        if deleted_insights > 0 {
724            info!(count = deleted_insights, "Cascade-deleted insights");
725        }
726
727        // Cascade: delete all activities for this collective
728        let deleted_activities = self.storage.delete_activities_by_collective(id)?;
729        if deleted_activities > 0 {
730            info!(count = deleted_activities, "Cascade-deleted activities");
731        }
732
733        // Delete the collective record from storage
734        self.storage.delete_collective(id)?;
735
736        // Remove HNSW indexes from memory
737        self.vectors
738            .write()
739            .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?
740            .remove(&id);
741        self.insight_vectors
742            .write()
743            .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?
744            .remove(&id);
745
746        // Remove HNSW files from disk (non-fatal if fails)
747        if let Some(hnsw_dir) = self.hnsw_dir() {
748            if let Err(e) = HnswIndex::remove_files(&hnsw_dir, &id.to_string()) {
749                warn!(
750                    collective = %id,
751                    error = %e,
752                    "Failed to remove experience HNSW files (non-fatal)"
753                );
754            }
755            let insight_name = format!("{}_insights", id);
756            if let Err(e) = HnswIndex::remove_files(&hnsw_dir, &insight_name) {
757                warn!(
758                    collective = %id,
759                    error = %e,
760                    "Failed to remove insight HNSW files (non-fatal)"
761                );
762            }
763        }
764
765        info!(id = %id, "Collective deleted");
766        Ok(())
767    }
768
769    // =========================================================================
770    // Experience CRUD (E1-S03)
771    // =========================================================================
772
773    /// Records a new experience in the database.
774    ///
775    /// This is the primary method for storing agent-learned knowledge. The method:
776    /// 1. Validates the input (content, scores, tags, embedding)
777    /// 2. Verifies the collective exists
778    /// 3. Resolves the embedding (generates if Builtin, requires if External)
779    /// 4. Stores the experience atomically across 4 tables
780    ///
781    /// # Arguments
782    ///
783    /// * `exp` - The experience to record (see [`NewExperience`])
784    ///
785    /// # Errors
786    ///
787    /// - [`ValidationError`](crate::ValidationError) if input is invalid
788    /// - [`NotFoundError::Collective`] if the collective doesn't exist
789    /// - [`PulseDBError::Embedding`] if embedding generation fails (Builtin mode)
790    #[instrument(skip(self, exp), fields(collective_id = %exp.collective_id))]
791    pub fn record_experience(&self, exp: NewExperience) -> Result<ExperienceId> {
792        let is_external = matches!(self.config.embedding_provider, EmbeddingProvider::External);
793
794        // Verify collective exists and get its dimension
795        let collective = self
796            .storage
797            .get_collective(exp.collective_id)?
798            .ok_or_else(|| PulseDBError::from(NotFoundError::collective(exp.collective_id)))?;
799
800        // Validate input
801        validate_new_experience(&exp, collective.embedding_dimension, is_external)?;
802
803        // Resolve embedding
804        let embedding = match exp.embedding {
805            Some(emb) => emb,
806            None => {
807                // Builtin mode: generate embedding from content
808                self.embedding.embed(&exp.content)?
809            }
810        };
811
812        // Clone embedding for HNSW insertion (~1.5KB for 384d, negligible vs I/O)
813        let embedding_for_hnsw = embedding.clone();
814        let collective_id = exp.collective_id;
815
816        // Construct the full experience record
817        let experience = Experience {
818            id: ExperienceId::new(),
819            collective_id,
820            content: exp.content,
821            embedding,
822            experience_type: exp.experience_type,
823            importance: exp.importance,
824            confidence: exp.confidence,
825            applications: 0,
826            domain: exp.domain,
827            related_files: exp.related_files,
828            source_agent: exp.source_agent,
829            source_task: exp.source_task,
830            timestamp: Timestamp::now(),
831            archived: false,
832        };
833
834        let id = experience.id;
835
836        // Write to redb FIRST (source of truth). If crash happens after
837        // this but before HNSW insert, rebuild on next open will include it.
838        self.storage.save_experience(&experience)?;
839
840        // Insert into HNSW index (derived structure)
841        let vectors = self
842            .vectors
843            .read()
844            .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?;
845        if let Some(index) = vectors.get(&collective_id) {
846            index.insert_experience(id, &embedding_for_hnsw)?;
847        }
848
849        // Emit watch event after both storage and HNSW succeed
850        self.watch.emit(
851            WatchEvent {
852                experience_id: id,
853                collective_id,
854                event_type: WatchEventType::Created,
855                timestamp: experience.timestamp,
856            },
857            &experience,
858        )?;
859
860        info!(id = %id, "Experience recorded");
861        Ok(id)
862    }
863
864    /// Retrieves an experience by ID, including its embedding.
865    ///
866    /// Returns `None` if no experience with the given ID exists.
867    #[instrument(skip(self))]
868    pub fn get_experience(&self, id: ExperienceId) -> Result<Option<Experience>> {
869        self.storage.get_experience(id)
870    }
871
872    /// Updates mutable fields of an experience.
873    ///
874    /// Only fields set to `Some(...)` in the update are changed.
875    /// Content and embedding are immutable — create a new experience instead.
876    ///
877    /// # Errors
878    ///
879    /// - [`ValidationError`](crate::ValidationError) if updated values are invalid
880    /// - [`NotFoundError::Experience`] if the experience doesn't exist
881    #[instrument(skip(self, update))]
882    pub fn update_experience(&self, id: ExperienceId, update: ExperienceUpdate) -> Result<()> {
883        validate_experience_update(&update)?;
884
885        let updated = self.storage.update_experience(id, &update)?;
886        if !updated {
887            return Err(PulseDBError::from(NotFoundError::experience(id)));
888        }
889
890        // Emit watch event (fetch experience for collective_id + filter matching)
891        if self.watch.has_subscribers() {
892            if let Ok(Some(exp)) = self.storage.get_experience(id) {
893                let event_type = if update.archived == Some(true) {
894                    WatchEventType::Archived
895                } else {
896                    WatchEventType::Updated
897                };
898                self.watch.emit(
899                    WatchEvent {
900                        experience_id: id,
901                        collective_id: exp.collective_id,
902                        event_type,
903                        timestamp: Timestamp::now(),
904                    },
905                    &exp,
906                )?;
907            }
908        }
909
910        info!(id = %id, "Experience updated");
911        Ok(())
912    }
913
914    /// Archives an experience (soft-delete).
915    ///
916    /// Archived experiences remain in storage but are excluded from search
917    /// results. Use [`unarchive_experience`](Self::unarchive_experience) to restore.
918    ///
919    /// # Errors
920    ///
921    /// Returns [`NotFoundError::Experience`] if the experience doesn't exist.
922    #[instrument(skip(self))]
923    pub fn archive_experience(&self, id: ExperienceId) -> Result<()> {
924        self.update_experience(
925            id,
926            ExperienceUpdate {
927                archived: Some(true),
928                ..Default::default()
929            },
930        )
931    }
932
933    /// Restores an archived experience.
934    ///
935    /// The experience will once again appear in search results.
936    ///
937    /// # Errors
938    ///
939    /// Returns [`NotFoundError::Experience`] if the experience doesn't exist.
940    #[instrument(skip(self))]
941    pub fn unarchive_experience(&self, id: ExperienceId) -> Result<()> {
942        self.update_experience(
943            id,
944            ExperienceUpdate {
945                archived: Some(false),
946                ..Default::default()
947            },
948        )
949    }
950
951    /// Permanently deletes an experience and its embedding.
952    ///
953    /// This removes the experience from all tables and indices.
954    /// Unlike archiving, this is irreversible.
955    ///
956    /// # Errors
957    ///
958    /// Returns [`NotFoundError::Experience`] if the experience doesn't exist.
959    #[instrument(skip(self))]
960    pub fn delete_experience(&self, id: ExperienceId) -> Result<()> {
961        // Read experience first to get collective_id for HNSW lookup.
962        // This adds one extra read, but delete is not a hot path.
963        let experience = self
964            .storage
965            .get_experience(id)?
966            .ok_or_else(|| PulseDBError::from(NotFoundError::experience(id)))?;
967
968        // Cascade-delete any relations involving this experience.
969        // Done before experience deletion so we can still look up relation data.
970        let rel_count = self.storage.delete_relations_for_experience(id)?;
971        if rel_count > 0 {
972            info!(
973                count = rel_count,
974                "Cascade-deleted relations for experience"
975            );
976        }
977
978        // Delete from redb FIRST (source of truth). If crash happens after
979        // this but before HNSW soft-delete, on reopen the experience won't be
980        // loaded from redb, so it's automatically excluded from the rebuilt index.
981        self.storage.delete_experience(id)?;
982
983        // Soft-delete from HNSW index (mark as deleted, not removed from graph).
984        // This takes effect immediately for the current session's searches.
985        let vectors = self
986            .vectors
987            .read()
988            .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?;
989        if let Some(index) = vectors.get(&experience.collective_id) {
990            index.delete_experience(id)?;
991        }
992
993        // Emit watch event after storage + HNSW deletion
994        self.watch.emit(
995            WatchEvent {
996                experience_id: id,
997                collective_id: experience.collective_id,
998                event_type: WatchEventType::Deleted,
999                timestamp: Timestamp::now(),
1000            },
1001            &experience,
1002        )?;
1003
1004        info!(id = %id, "Experience deleted");
1005        Ok(())
1006    }
1007
1008    /// Reinforces an experience by incrementing its application count.
1009    ///
1010    /// Each call atomically increments the `applications` counter by 1.
1011    /// Returns the new application count.
1012    ///
1013    /// # Errors
1014    ///
1015    /// Returns [`NotFoundError::Experience`] if the experience doesn't exist.
1016    #[instrument(skip(self))]
1017    pub fn reinforce_experience(&self, id: ExperienceId) -> Result<u32> {
1018        let new_count = self
1019            .storage
1020            .reinforce_experience(id)?
1021            .ok_or_else(|| PulseDBError::from(NotFoundError::experience(id)))?;
1022
1023        // Emit watch event (fetch experience for collective_id + filter matching)
1024        if self.watch.has_subscribers() {
1025            if let Ok(Some(exp)) = self.storage.get_experience(id) {
1026                self.watch.emit(
1027                    WatchEvent {
1028                        experience_id: id,
1029                        collective_id: exp.collective_id,
1030                        event_type: WatchEventType::Updated,
1031                        timestamp: Timestamp::now(),
1032                    },
1033                    &exp,
1034                )?;
1035            }
1036        }
1037
1038        info!(id = %id, applications = new_count, "Experience reinforced");
1039        Ok(new_count)
1040    }
1041
1042    // =========================================================================
1043    // Recent Experiences
1044    // =========================================================================
1045
1046    /// Retrieves the most recent experiences in a collective, newest first.
1047    ///
1048    /// Returns up to `limit` non-archived experiences ordered by timestamp
1049    /// descending (newest first). Uses the by-collective timestamp index
1050    /// for efficient retrieval.
1051    ///
1052    /// # Arguments
1053    ///
1054    /// * `collective_id` - The collective to query
1055    /// * `limit` - Maximum number of experiences to return (1-1000)
1056    ///
1057    /// # Errors
1058    ///
1059    /// - [`ValidationError::InvalidField`] if `limit` is 0 or > 1000
1060    /// - [`NotFoundError::Collective`] if the collective doesn't exist
1061    ///
1062    /// # Example
1063    ///
1064    /// ```rust
1065    /// # fn main() -> pulsedb::Result<()> {
1066    /// # let dir = tempfile::tempdir().unwrap();
1067    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
1068    /// # let collective_id = db.create_collective("example")?;
1069    /// let recent = db.get_recent_experiences(collective_id, 50)?;
1070    /// for exp in &recent {
1071    ///     println!("{}: {}", exp.timestamp, exp.content);
1072    /// }
1073    /// # Ok(())
1074    /// # }
1075    /// ```
1076    #[instrument(skip(self))]
1077    pub fn get_recent_experiences(
1078        &self,
1079        collective_id: CollectiveId,
1080        limit: usize,
1081    ) -> Result<Vec<Experience>> {
1082        self.get_recent_experiences_filtered(collective_id, limit, SearchFilter::default())
1083    }
1084
1085    /// Retrieves the most recent experiences in a collective with filtering.
1086    ///
1087    /// Like [`get_recent_experiences()`](Self::get_recent_experiences), but
1088    /// applies additional filters on domain, experience type, importance,
1089    /// confidence, and timestamp.
1090    ///
1091    /// Over-fetches from storage (2x `limit`) to account for entries removed
1092    /// by post-filtering, then truncates to the requested `limit`.
1093    ///
1094    /// # Arguments
1095    ///
1096    /// * `collective_id` - The collective to query
1097    /// * `limit` - Maximum number of experiences to return (1-1000)
1098    /// * `filter` - Filter criteria to apply
1099    ///
1100    /// # Errors
1101    ///
1102    /// - [`ValidationError::InvalidField`] if `limit` is 0 or > 1000
1103    /// - [`NotFoundError::Collective`] if the collective doesn't exist
1104    #[instrument(skip(self, filter))]
1105    pub fn get_recent_experiences_filtered(
1106        &self,
1107        collective_id: CollectiveId,
1108        limit: usize,
1109        filter: SearchFilter,
1110    ) -> Result<Vec<Experience>> {
1111        // Validate limit
1112        if limit == 0 || limit > 1000 {
1113            return Err(
1114                ValidationError::invalid_field("limit", "must be between 1 and 1000").into(),
1115            );
1116        }
1117
1118        // Verify collective exists
1119        self.storage
1120            .get_collective(collective_id)?
1121            .ok_or_else(|| PulseDBError::from(NotFoundError::collective(collective_id)))?;
1122
1123        // Over-fetch IDs to account for post-filtering losses
1124        let over_fetch = limit.saturating_mul(2).min(2000);
1125        let recent_ids = self
1126            .storage
1127            .get_recent_experience_ids(collective_id, over_fetch)?;
1128
1129        // Load full experiences and apply filter
1130        let mut results = Vec::with_capacity(limit);
1131        for (exp_id, _timestamp) in recent_ids {
1132            if results.len() >= limit {
1133                break;
1134            }
1135
1136            if let Some(experience) = self.storage.get_experience(exp_id)? {
1137                if filter.matches(&experience) {
1138                    results.push(experience);
1139                }
1140            }
1141        }
1142
1143        Ok(results)
1144    }
1145
1146    // =========================================================================
1147    // Similarity Search (E2-S02)
1148    // =========================================================================
1149
1150    /// Searches for experiences semantically similar to the query embedding.
1151    ///
1152    /// Uses the HNSW vector index for approximate nearest neighbor search,
1153    /// then fetches full experience records from storage. Archived experiences
1154    /// are excluded by default.
1155    ///
1156    /// Results are sorted by similarity descending (most similar first).
1157    /// Similarity is computed as `1.0 - cosine_distance`.
1158    ///
1159    /// # Arguments
1160    ///
1161    /// * `collective_id` - The collective to search within
1162    /// * `query` - Query embedding vector (must match collective's dimension)
1163    /// * `k` - Maximum number of results to return (1-1000)
1164    ///
1165    /// # Errors
1166    ///
1167    /// - [`ValidationError::InvalidField`] if `k` is 0 or > 1000
1168    /// - [`ValidationError::DimensionMismatch`] if `query.len()` doesn't match
1169    ///   the collective's embedding dimension
1170    /// - [`NotFoundError::Collective`] if the collective doesn't exist
1171    ///
1172    /// # Example
1173    ///
1174    /// ```rust
1175    /// # fn main() -> pulsedb::Result<()> {
1176    /// # let dir = tempfile::tempdir().unwrap();
1177    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
1178    /// # let collective_id = db.create_collective("example")?;
1179    /// let query = vec![0.1f32; 384]; // Your query embedding
1180    /// let results = db.search_similar(collective_id, &query, 10)?;
1181    /// for result in &results {
1182    ///     println!(
1183    ///         "[{:.3}] {}",
1184    ///         result.similarity, result.experience.content
1185    ///     );
1186    /// }
1187    /// # Ok(())
1188    /// # }
1189    /// ```
1190    #[instrument(skip(self, query))]
1191    pub fn search_similar(
1192        &self,
1193        collective_id: CollectiveId,
1194        query: &[f32],
1195        k: usize,
1196    ) -> Result<Vec<SearchResult>> {
1197        self.search_similar_filtered(collective_id, query, k, SearchFilter::default())
1198    }
1199
1200    /// Searches for semantically similar experiences with additional filtering.
1201    ///
1202    /// Like [`search_similar()`](Self::search_similar), but applies additional
1203    /// filters on domain, experience type, importance, confidence, and timestamp.
1204    ///
1205    /// Over-fetches from the HNSW index (2x `k`) to account for entries removed
1206    /// by post-filtering, then truncates to the requested `k`.
1207    ///
1208    /// # Arguments
1209    ///
1210    /// * `collective_id` - The collective to search within
1211    /// * `query` - Query embedding vector (must match collective's dimension)
1212    /// * `k` - Maximum number of results to return (1-1000)
1213    /// * `filter` - Filter criteria to apply after vector search
1214    ///
1215    /// # Errors
1216    ///
1217    /// - [`ValidationError::InvalidField`] if `k` is 0 or > 1000
1218    /// - [`ValidationError::DimensionMismatch`] if `query.len()` doesn't match
1219    ///   the collective's embedding dimension
1220    /// - [`NotFoundError::Collective`] if the collective doesn't exist
1221    ///
1222    /// # Example
1223    ///
1224    /// ```rust
1225    /// # fn main() -> pulsedb::Result<()> {
1226    /// # let dir = tempfile::tempdir().unwrap();
1227    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
1228    /// # let collective_id = db.create_collective("example")?;
1229    /// # let query_embedding = vec![0.1f32; 384];
1230    /// use pulsedb::SearchFilter;
1231    ///
1232    /// let filter = SearchFilter {
1233    ///     domains: Some(vec!["rust".to_string()]),
1234    ///     min_importance: Some(0.5),
1235    ///     ..SearchFilter::default()
1236    /// };
1237    /// let results = db.search_similar_filtered(
1238    ///     collective_id,
1239    ///     &query_embedding,
1240    ///     10,
1241    ///     filter,
1242    /// )?;
1243    /// # Ok(())
1244    /// # }
1245    /// ```
1246    #[instrument(skip(self, query, filter))]
1247    pub fn search_similar_filtered(
1248        &self,
1249        collective_id: CollectiveId,
1250        query: &[f32],
1251        k: usize,
1252        filter: SearchFilter,
1253    ) -> Result<Vec<SearchResult>> {
1254        // Validate k
1255        if k == 0 || k > 1000 {
1256            return Err(ValidationError::invalid_field("k", "must be between 1 and 1000").into());
1257        }
1258
1259        // Verify collective exists and check embedding dimension
1260        let collective = self
1261            .storage
1262            .get_collective(collective_id)?
1263            .ok_or_else(|| PulseDBError::from(NotFoundError::collective(collective_id)))?;
1264
1265        let expected_dim = collective.embedding_dimension as usize;
1266        if query.len() != expected_dim {
1267            return Err(ValidationError::dimension_mismatch(expected_dim, query.len()).into());
1268        }
1269
1270        // Over-fetch from HNSW to compensate for post-filtering losses
1271        let over_fetch = k.saturating_mul(2).min(2000);
1272        let ef_search = self.config.hnsw.ef_search;
1273
1274        // Search HNSW index — returns (ExperienceId, cosine_distance) sorted
1275        // by distance ascending (closest first)
1276        let candidates = self
1277            .with_vector_index(collective_id, |index| {
1278                index.search_experiences(query, over_fetch, ef_search)
1279            })?
1280            .unwrap_or_default();
1281
1282        // Fetch full experiences, apply filter, convert distance → similarity
1283        let mut results = Vec::with_capacity(k);
1284        for (exp_id, distance) in candidates {
1285            if results.len() >= k {
1286                break;
1287            }
1288
1289            if let Some(experience) = self.storage.get_experience(exp_id)? {
1290                if filter.matches(&experience) {
1291                    results.push(SearchResult {
1292                        experience,
1293                        similarity: 1.0 - distance,
1294                    });
1295                }
1296            }
1297        }
1298
1299        Ok(results)
1300    }
1301
1302    // =========================================================================
1303    // Experience Relations (E3-S01)
1304    // =========================================================================
1305
1306    /// Stores a new relation between two experiences.
1307    ///
1308    /// Relations are typed, directed edges connecting a source experience to a
1309    /// target experience. Both experiences must exist and belong to the same
1310    /// collective. Duplicate relations (same source, target, and type) are
1311    /// rejected.
1312    ///
1313    /// # Arguments
1314    ///
1315    /// * `relation` - The relation to create (source, target, type, strength)
1316    ///
1317    /// # Errors
1318    ///
1319    /// Returns an error if:
1320    /// - Source or target experience doesn't exist ([`NotFoundError::Experience`])
1321    /// - Experiences belong to different collectives ([`ValidationError::InvalidField`])
1322    /// - A relation with the same (source, target, type) already exists
1323    /// - Self-relation attempted (source == target)
1324    /// - Strength is out of range `[0.0, 1.0]`
1325    #[instrument(skip(self, relation))]
1326    pub fn store_relation(
1327        &self,
1328        relation: crate::relation::NewExperienceRelation,
1329    ) -> Result<crate::types::RelationId> {
1330        use crate::relation::{validate_new_relation, ExperienceRelation};
1331        use crate::types::RelationId;
1332
1333        // Validate input fields (self-relation, strength bounds, metadata size)
1334        validate_new_relation(&relation)?;
1335
1336        // Load source and target experiences to verify existence
1337        let source = self
1338            .storage
1339            .get_experience(relation.source_id)?
1340            .ok_or_else(|| PulseDBError::from(NotFoundError::experience(relation.source_id)))?;
1341        let target = self
1342            .storage
1343            .get_experience(relation.target_id)?
1344            .ok_or_else(|| PulseDBError::from(NotFoundError::experience(relation.target_id)))?;
1345
1346        // Verify same collective
1347        if source.collective_id != target.collective_id {
1348            return Err(PulseDBError::from(ValidationError::invalid_field(
1349                "target_id",
1350                "source and target experiences must belong to the same collective",
1351            )));
1352        }
1353
1354        // Check for duplicate (same source, target, type)
1355        if self.storage.relation_exists(
1356            relation.source_id,
1357            relation.target_id,
1358            relation.relation_type,
1359        )? {
1360            return Err(PulseDBError::from(ValidationError::invalid_field(
1361                "relation_type",
1362                "a relation with this source, target, and type already exists",
1363            )));
1364        }
1365
1366        // Construct the full relation
1367        let id = RelationId::new();
1368        let full_relation = ExperienceRelation {
1369            id,
1370            source_id: relation.source_id,
1371            target_id: relation.target_id,
1372            relation_type: relation.relation_type,
1373            strength: relation.strength,
1374            metadata: relation.metadata,
1375            created_at: Timestamp::now(),
1376        };
1377
1378        self.storage.save_relation(&full_relation)?;
1379
1380        info!(
1381            id = %id,
1382            source = %relation.source_id,
1383            target = %relation.target_id,
1384            relation_type = ?full_relation.relation_type,
1385            "Relation stored"
1386        );
1387        Ok(id)
1388    }
1389
1390    /// Retrieves experiences related to the given experience.
1391    ///
1392    /// Returns pairs of `(Experience, ExperienceRelation)` based on the
1393    /// requested direction:
1394    /// - `Outgoing`: experiences that this experience points TO (as source)
1395    /// - `Incoming`: experiences that point TO this experience (as target)
1396    /// - `Both`: union of outgoing and incoming
1397    ///
1398    /// To filter by relation type, use
1399    /// [`get_related_experiences_filtered`](Self::get_related_experiences_filtered).
1400    ///
1401    /// Silently skips relations where the related experience no longer exists
1402    /// (orphan tolerance).
1403    ///
1404    /// # Errors
1405    ///
1406    /// Returns a storage error if the read transaction fails.
1407    #[instrument(skip(self))]
1408    pub fn get_related_experiences(
1409        &self,
1410        experience_id: ExperienceId,
1411        direction: crate::relation::RelationDirection,
1412    ) -> Result<Vec<(Experience, crate::relation::ExperienceRelation)>> {
1413        self.get_related_experiences_filtered(experience_id, direction, None)
1414    }
1415
1416    /// Retrieves experiences related to the given experience, with optional
1417    /// type filtering.
1418    ///
1419    /// Like [`get_related_experiences()`](Self::get_related_experiences), but
1420    /// accepts an optional [`RelationType`](crate::RelationType) filter.
1421    /// When `Some(rt)`, only relations matching that type are returned.
1422    ///
1423    /// # Arguments
1424    ///
1425    /// * `experience_id` - The experience to query relations for
1426    /// * `direction` - Which direction(s) to traverse
1427    /// * `relation_type` - If `Some`, only return relations of this type
1428    ///
1429    /// # Example
1430    ///
1431    /// ```rust
1432    /// # fn main() -> pulsedb::Result<()> {
1433    /// # let dir = tempfile::tempdir().unwrap();
1434    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
1435    /// # let cid = db.create_collective("example")?;
1436    /// # let exp_a = db.record_experience(pulsedb::NewExperience {
1437    /// #     collective_id: cid,
1438    /// #     content: "a".into(),
1439    /// #     embedding: Some(vec![0.1f32; 384]),
1440    /// #     ..Default::default()
1441    /// # })?;
1442    /// use pulsedb::{RelationType, RelationDirection};
1443    ///
1444    /// // Only "Supports" relations outgoing from exp_a
1445    /// let supports = db.get_related_experiences_filtered(
1446    ///     exp_a,
1447    ///     RelationDirection::Outgoing,
1448    ///     Some(RelationType::Supports),
1449    /// )?;
1450    /// # Ok(())
1451    /// # }
1452    /// ```
1453    #[instrument(skip(self))]
1454    pub fn get_related_experiences_filtered(
1455        &self,
1456        experience_id: ExperienceId,
1457        direction: crate::relation::RelationDirection,
1458        relation_type: Option<crate::relation::RelationType>,
1459    ) -> Result<Vec<(Experience, crate::relation::ExperienceRelation)>> {
1460        use crate::relation::RelationDirection;
1461
1462        let mut results = Vec::new();
1463
1464        // Outgoing: this experience is the source → fetch target experiences
1465        if matches!(
1466            direction,
1467            RelationDirection::Outgoing | RelationDirection::Both
1468        ) {
1469            let rel_ids = self.storage.get_relation_ids_by_source(experience_id)?;
1470            for rel_id in rel_ids {
1471                if let Some(relation) = self.storage.get_relation(rel_id)? {
1472                    if relation_type.is_some_and(|rt| rt != relation.relation_type) {
1473                        continue;
1474                    }
1475                    if let Some(experience) = self.storage.get_experience(relation.target_id)? {
1476                        results.push((experience, relation));
1477                    }
1478                }
1479            }
1480        }
1481
1482        // Incoming: this experience is the target → fetch source experiences
1483        if matches!(
1484            direction,
1485            RelationDirection::Incoming | RelationDirection::Both
1486        ) {
1487            let rel_ids = self.storage.get_relation_ids_by_target(experience_id)?;
1488            for rel_id in rel_ids {
1489                if let Some(relation) = self.storage.get_relation(rel_id)? {
1490                    if relation_type.is_some_and(|rt| rt != relation.relation_type) {
1491                        continue;
1492                    }
1493                    if let Some(experience) = self.storage.get_experience(relation.source_id)? {
1494                        results.push((experience, relation));
1495                    }
1496                }
1497            }
1498        }
1499
1500        Ok(results)
1501    }
1502
1503    /// Retrieves a relation by ID.
1504    ///
1505    /// Returns `None` if no relation with the given ID exists.
1506    pub fn get_relation(
1507        &self,
1508        id: crate::types::RelationId,
1509    ) -> Result<Option<crate::relation::ExperienceRelation>> {
1510        self.storage.get_relation(id)
1511    }
1512
1513    /// Deletes a relation by ID.
1514    ///
1515    /// # Errors
1516    ///
1517    /// Returns [`NotFoundError::Relation`] if no relation with the given ID exists.
1518    #[instrument(skip(self))]
1519    pub fn delete_relation(&self, id: crate::types::RelationId) -> Result<()> {
1520        let deleted = self.storage.delete_relation(id)?;
1521        if !deleted {
1522            return Err(PulseDBError::from(NotFoundError::relation(id)));
1523        }
1524        info!(id = %id, "Relation deleted");
1525        Ok(())
1526    }
1527
1528    // =========================================================================
1529    // Derived Insights (E3-S02)
1530    // =========================================================================
1531
1532    /// Stores a new derived insight.
1533    ///
1534    /// Creates a synthesized knowledge record from multiple source experiences.
1535    /// The method:
1536    /// 1. Validates the input (content, confidence, sources)
1537    /// 2. Verifies the collective exists
1538    /// 3. Verifies all source experiences exist and belong to the same collective
1539    /// 4. Resolves the embedding (generates if Builtin, requires if External)
1540    /// 5. Stores the insight with inline embedding
1541    /// 6. Inserts into the insight HNSW index
1542    ///
1543    /// # Arguments
1544    ///
1545    /// * `insight` - The insight to store (see [`NewDerivedInsight`])
1546    ///
1547    /// # Errors
1548    ///
1549    /// - [`ValidationError`](crate::ValidationError) if input is invalid
1550    /// - [`NotFoundError::Collective`] if the collective doesn't exist
1551    /// - [`NotFoundError::Experience`] if any source experience doesn't exist
1552    /// - [`ValidationError::InvalidField`] if source experiences belong to
1553    ///   different collectives
1554    /// - [`ValidationError::DimensionMismatch`] if embedding dimension is wrong
1555    #[instrument(skip(self, insight), fields(collective_id = %insight.collective_id))]
1556    pub fn store_insight(&self, insight: NewDerivedInsight) -> Result<InsightId> {
1557        let is_external = matches!(self.config.embedding_provider, EmbeddingProvider::External);
1558
1559        // Validate input fields
1560        validate_new_insight(&insight)?;
1561
1562        // Verify collective exists
1563        let collective = self
1564            .storage
1565            .get_collective(insight.collective_id)?
1566            .ok_or_else(|| PulseDBError::from(NotFoundError::collective(insight.collective_id)))?;
1567
1568        // Verify all source experiences exist and belong to this collective
1569        for source_id in &insight.source_experience_ids {
1570            let source_exp = self
1571                .storage
1572                .get_experience(*source_id)?
1573                .ok_or_else(|| PulseDBError::from(NotFoundError::experience(*source_id)))?;
1574            if source_exp.collective_id != insight.collective_id {
1575                return Err(PulseDBError::from(ValidationError::invalid_field(
1576                    "source_experience_ids",
1577                    format!(
1578                        "experience {} belongs to collective {}, not {}",
1579                        source_id, source_exp.collective_id, insight.collective_id
1580                    ),
1581                )));
1582            }
1583        }
1584
1585        // Resolve embedding
1586        let embedding = match insight.embedding {
1587            Some(ref emb) => {
1588                // Validate dimension
1589                let expected_dim = collective.embedding_dimension as usize;
1590                if emb.len() != expected_dim {
1591                    return Err(ValidationError::dimension_mismatch(expected_dim, emb.len()).into());
1592                }
1593                emb.clone()
1594            }
1595            None => {
1596                if is_external {
1597                    return Err(PulseDBError::embedding(
1598                        "embedding is required when using External embedding provider",
1599                    ));
1600                }
1601                self.embedding.embed(&insight.content)?
1602            }
1603        };
1604
1605        let embedding_for_hnsw = embedding.clone();
1606        let now = Timestamp::now();
1607        let id = InsightId::new();
1608
1609        // Construct the full insight record
1610        let derived_insight = DerivedInsight {
1611            id,
1612            collective_id: insight.collective_id,
1613            content: insight.content,
1614            embedding,
1615            source_experience_ids: insight.source_experience_ids,
1616            insight_type: insight.insight_type,
1617            confidence: insight.confidence,
1618            domain: insight.domain,
1619            created_at: now,
1620            updated_at: now,
1621        };
1622
1623        // Write to redb FIRST (source of truth)
1624        self.storage.save_insight(&derived_insight)?;
1625
1626        // Insert into insight HNSW index (using InsightId→ExperienceId byte conversion)
1627        let exp_id = ExperienceId::from_bytes(*id.as_bytes());
1628        let insight_vectors = self
1629            .insight_vectors
1630            .read()
1631            .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?;
1632        if let Some(index) = insight_vectors.get(&insight.collective_id) {
1633            index.insert_experience(exp_id, &embedding_for_hnsw)?;
1634        }
1635
1636        info!(id = %id, "Insight stored");
1637        Ok(id)
1638    }
1639
1640    /// Retrieves a derived insight by ID.
1641    ///
1642    /// Returns `None` if no insight with the given ID exists.
1643    #[instrument(skip(self))]
1644    pub fn get_insight(&self, id: InsightId) -> Result<Option<DerivedInsight>> {
1645        self.storage.get_insight(id)
1646    }
1647
1648    /// Searches for insights semantically similar to the query embedding.
1649    ///
1650    /// Uses the insight-specific HNSW index for approximate nearest neighbor
1651    /// search, then fetches full insight records from storage.
1652    ///
1653    /// # Arguments
1654    ///
1655    /// * `collective_id` - The collective to search within
1656    /// * `query` - Query embedding vector (must match collective's dimension)
1657    /// * `k` - Maximum number of results to return
1658    ///
1659    /// # Errors
1660    ///
1661    /// - [`ValidationError::DimensionMismatch`] if `query.len()` doesn't match
1662    /// - [`NotFoundError::Collective`] if the collective doesn't exist
1663    #[instrument(skip(self, query))]
1664    pub fn get_insights(
1665        &self,
1666        collective_id: CollectiveId,
1667        query: &[f32],
1668        k: usize,
1669    ) -> Result<Vec<(DerivedInsight, f32)>> {
1670        // Verify collective exists and check embedding dimension
1671        let collective = self
1672            .storage
1673            .get_collective(collective_id)?
1674            .ok_or_else(|| PulseDBError::from(NotFoundError::collective(collective_id)))?;
1675
1676        let expected_dim = collective.embedding_dimension as usize;
1677        if query.len() != expected_dim {
1678            return Err(ValidationError::dimension_mismatch(expected_dim, query.len()).into());
1679        }
1680
1681        let ef_search = self.config.hnsw.ef_search;
1682
1683        // Search insight HNSW — returns (ExperienceId, distance) pairs
1684        let insight_vectors = self
1685            .insight_vectors
1686            .read()
1687            .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?;
1688
1689        let candidates = match insight_vectors.get(&collective_id) {
1690            Some(index) => index.search_experiences(query, k, ef_search)?,
1691            None => return Ok(vec![]),
1692        };
1693        drop(insight_vectors);
1694
1695        // Convert ExperienceId back to InsightId and fetch records
1696        let mut results = Vec::with_capacity(candidates.len());
1697        for (exp_id, distance) in candidates {
1698            let insight_id = InsightId::from_bytes(*exp_id.as_bytes());
1699            if let Some(insight) = self.storage.get_insight(insight_id)? {
1700                // Convert HNSW distance to similarity (1.0 - distance), matching search_similar pattern
1701                results.push((insight, 1.0 - distance));
1702            }
1703        }
1704
1705        Ok(results)
1706    }
1707
1708    /// Deletes a derived insight by ID.
1709    ///
1710    /// Removes the insight from storage and soft-deletes it from the HNSW index.
1711    ///
1712    /// # Errors
1713    ///
1714    /// Returns [`NotFoundError::Insight`] if no insight with the given ID exists.
1715    #[instrument(skip(self))]
1716    pub fn delete_insight(&self, id: InsightId) -> Result<()> {
1717        // Read insight first to get collective_id for HNSW lookup
1718        let insight = self
1719            .storage
1720            .get_insight(id)?
1721            .ok_or_else(|| PulseDBError::from(NotFoundError::insight(id)))?;
1722
1723        // Delete from redb FIRST (source of truth)
1724        self.storage.delete_insight(id)?;
1725
1726        // Soft-delete from insight HNSW (using InsightId→ExperienceId byte conversion)
1727        let exp_id = ExperienceId::from_bytes(*id.as_bytes());
1728        let insight_vectors = self
1729            .insight_vectors
1730            .read()
1731            .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?;
1732        if let Some(index) = insight_vectors.get(&insight.collective_id) {
1733            index.delete_experience(exp_id)?;
1734        }
1735
1736        info!(id = %id, "Insight deleted");
1737        Ok(())
1738    }
1739
1740    // =========================================================================
1741    // Activity Tracking (E3-S03)
1742    // =========================================================================
1743
1744    /// Registers an agent's presence in a collective.
1745    ///
1746    /// Creates a new activity record or replaces an existing one for the
1747    /// same `(collective_id, agent_id)` pair (upsert semantics). Both
1748    /// `started_at` and `last_heartbeat` are set to `Timestamp::now()`.
1749    ///
1750    /// # Arguments
1751    ///
1752    /// * `activity` - The activity registration (see [`NewActivity`])
1753    ///
1754    /// # Errors
1755    ///
1756    /// - [`ValidationError`] if agent_id is empty or fields exceed size limits
1757    /// - [`NotFoundError::Collective`] if the collective doesn't exist
1758    ///
1759    /// # Example
1760    ///
1761    /// ```rust
1762    /// # fn main() -> pulsedb::Result<()> {
1763    /// # let dir = tempfile::tempdir().unwrap();
1764    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
1765    /// # let collective_id = db.create_collective("example")?;
1766    /// use pulsedb::NewActivity;
1767    ///
1768    /// db.register_activity(NewActivity {
1769    ///     agent_id: "claude-opus".to_string(),
1770    ///     collective_id,
1771    ///     current_task: Some("Reviewing pull request".to_string()),
1772    ///     context_summary: None,
1773    /// })?;
1774    /// # Ok(())
1775    /// # }
1776    /// ```
1777    #[instrument(skip(self, activity), fields(agent_id = %activity.agent_id, collective_id = %activity.collective_id))]
1778    pub fn register_activity(&self, activity: NewActivity) -> Result<()> {
1779        // Validate input
1780        validate_new_activity(&activity)?;
1781
1782        // Verify collective exists
1783        self.storage
1784            .get_collective(activity.collective_id)?
1785            .ok_or_else(|| PulseDBError::from(NotFoundError::collective(activity.collective_id)))?;
1786
1787        // Build stored activity with timestamps
1788        let now = Timestamp::now();
1789        let stored = Activity {
1790            agent_id: activity.agent_id,
1791            collective_id: activity.collective_id,
1792            current_task: activity.current_task,
1793            context_summary: activity.context_summary,
1794            started_at: now,
1795            last_heartbeat: now,
1796        };
1797
1798        self.storage.save_activity(&stored)?;
1799
1800        info!(
1801            agent_id = %stored.agent_id,
1802            collective_id = %stored.collective_id,
1803            "Activity registered"
1804        );
1805        Ok(())
1806    }
1807
1808    /// Updates an agent's heartbeat timestamp.
1809    ///
1810    /// Refreshes the `last_heartbeat` to `Timestamp::now()` without changing
1811    /// any other fields. The agent must have an existing activity registered.
1812    ///
1813    /// # Errors
1814    ///
1815    /// - [`NotFoundError::Activity`] if no activity exists for the agent/collective pair
1816    #[instrument(skip(self))]
1817    pub fn update_heartbeat(&self, agent_id: &str, collective_id: CollectiveId) -> Result<()> {
1818        let mut activity = self
1819            .storage
1820            .get_activity(agent_id, collective_id)?
1821            .ok_or_else(|| {
1822                PulseDBError::from(NotFoundError::activity(format!(
1823                    "{} in {}",
1824                    agent_id, collective_id
1825                )))
1826            })?;
1827
1828        activity.last_heartbeat = Timestamp::now();
1829        self.storage.save_activity(&activity)?;
1830
1831        info!(agent_id = %agent_id, collective_id = %collective_id, "Heartbeat updated");
1832        Ok(())
1833    }
1834
1835    /// Ends an agent's activity in a collective.
1836    ///
1837    /// Removes the activity record. After calling this, the agent will no
1838    /// longer appear in `get_active_agents()` results.
1839    ///
1840    /// # Errors
1841    ///
1842    /// - [`NotFoundError::Activity`] if no activity exists for the agent/collective pair
1843    #[instrument(skip(self))]
1844    pub fn end_activity(&self, agent_id: &str, collective_id: CollectiveId) -> Result<()> {
1845        let deleted = self.storage.delete_activity(agent_id, collective_id)?;
1846
1847        if !deleted {
1848            return Err(PulseDBError::from(NotFoundError::activity(format!(
1849                "{} in {}",
1850                agent_id, collective_id
1851            ))));
1852        }
1853
1854        info!(agent_id = %agent_id, collective_id = %collective_id, "Activity ended");
1855        Ok(())
1856    }
1857
1858    /// Returns all active (non-stale) agents in a collective.
1859    ///
1860    /// Fetches all activities, filters out those whose `last_heartbeat` is
1861    /// older than `config.activity.stale_threshold`, and returns the rest
1862    /// sorted by `last_heartbeat` descending (most recently active first).
1863    ///
1864    /// # Errors
1865    ///
1866    /// - [`NotFoundError::Collective`] if the collective doesn't exist
1867    #[instrument(skip(self))]
1868    pub fn get_active_agents(&self, collective_id: CollectiveId) -> Result<Vec<Activity>> {
1869        // Verify collective exists
1870        self.storage
1871            .get_collective(collective_id)?
1872            .ok_or_else(|| PulseDBError::from(NotFoundError::collective(collective_id)))?;
1873
1874        let all_activities = self.storage.list_activities_in_collective(collective_id)?;
1875
1876        // Filter stale activities
1877        let now = Timestamp::now();
1878        let threshold_ms = self.config.activity.stale_threshold.as_millis() as i64;
1879        let cutoff = now.as_millis() - threshold_ms;
1880
1881        let mut active: Vec<Activity> = all_activities
1882            .into_iter()
1883            .filter(|a| a.last_heartbeat.as_millis() >= cutoff)
1884            .collect();
1885
1886        // Sort by last_heartbeat descending (most recently active first)
1887        active.sort_by(|a, b| b.last_heartbeat.cmp(&a.last_heartbeat));
1888
1889        Ok(active)
1890    }
1891
1892    // =========================================================================
1893    // Context Candidates (E2-S04)
1894    // =========================================================================
1895
1896    /// Retrieves unified context candidates from all retrieval primitives.
1897    ///
1898    /// This is the primary API for context assembly. It orchestrates:
1899    /// 1. Similarity search ([`search_similar_filtered`](Self::search_similar_filtered))
1900    /// 2. Recent experiences ([`get_recent_experiences_filtered`](Self::get_recent_experiences_filtered))
1901    /// 3. Insight search ([`get_insights`](Self::get_insights)) — if requested
1902    /// 4. Relation collection ([`get_related_experiences`](Self::get_related_experiences)) — if requested
1903    /// 5. Active agents ([`get_active_agents`](Self::get_active_agents)) — if requested
1904    ///
1905    /// # Arguments
1906    ///
1907    /// * `request` - Configuration for which primitives to query and limits
1908    ///
1909    /// # Errors
1910    ///
1911    /// - [`ValidationError::InvalidField`] if `max_similar` or `max_recent` is 0 or > 1000
1912    /// - [`ValidationError::DimensionMismatch`] if `query_embedding.len()` doesn't match
1913    ///   the collective's embedding dimension
1914    /// - [`NotFoundError::Collective`] if the collective doesn't exist
1915    ///
1916    /// # Performance
1917    ///
1918    /// Target: < 100ms at 100K experiences. The similarity search (~50ms) dominates;
1919    /// all other sub-calls are < 10ms each.
1920    ///
1921    /// # Example
1922    ///
1923    /// ```rust
1924    /// # fn main() -> pulsedb::Result<()> {
1925    /// # let dir = tempfile::tempdir().unwrap();
1926    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
1927    /// # let collective_id = db.create_collective("example")?;
1928    /// # let query_vec = vec![0.1f32; 384];
1929    /// use pulsedb::{ContextRequest, SearchFilter};
1930    ///
1931    /// let candidates = db.get_context_candidates(ContextRequest {
1932    ///     collective_id,
1933    ///     query_embedding: query_vec,
1934    ///     max_similar: 10,
1935    ///     max_recent: 5,
1936    ///     include_insights: true,
1937    ///     include_relations: true,
1938    ///     include_active_agents: true,
1939    ///     filter: SearchFilter {
1940    ///         domains: Some(vec!["rust".to_string()]),
1941    ///         ..SearchFilter::default()
1942    ///     },
1943    ///     ..ContextRequest::default()
1944    /// })?;
1945    /// # Ok(())
1946    /// # }
1947    /// ```
1948    #[instrument(skip(self, request), fields(collective_id = %request.collective_id))]
1949    pub fn get_context_candidates(&self, request: ContextRequest) -> Result<ContextCandidates> {
1950        // ── Validate limits ──────────────────────────────────────
1951        if request.max_similar == 0 || request.max_similar > 1000 {
1952            return Err(ValidationError::invalid_field(
1953                "max_similar",
1954                "must be between 1 and 1000",
1955            )
1956            .into());
1957        }
1958        if request.max_recent == 0 || request.max_recent > 1000 {
1959            return Err(
1960                ValidationError::invalid_field("max_recent", "must be between 1 and 1000").into(),
1961            );
1962        }
1963
1964        // ── Verify collective exists and check dimension ─────────
1965        let collective = self
1966            .storage
1967            .get_collective(request.collective_id)?
1968            .ok_or_else(|| PulseDBError::from(NotFoundError::collective(request.collective_id)))?;
1969
1970        let expected_dim = collective.embedding_dimension as usize;
1971        if request.query_embedding.len() != expected_dim {
1972            return Err(ValidationError::dimension_mismatch(
1973                expected_dim,
1974                request.query_embedding.len(),
1975            )
1976            .into());
1977        }
1978
1979        // ── 1. Similar experiences (HNSW vector search) ──────────
1980        let similar_experiences = self.search_similar_filtered(
1981            request.collective_id,
1982            &request.query_embedding,
1983            request.max_similar,
1984            request.filter.clone(),
1985        )?;
1986
1987        // ── 2. Recent experiences (timestamp index scan) ─────────
1988        let recent_experiences = self.get_recent_experiences_filtered(
1989            request.collective_id,
1990            request.max_recent,
1991            request.filter,
1992        )?;
1993
1994        // ── 3. Insights (HNSW vector search on insight index) ────
1995        let insights = if request.include_insights {
1996            self.get_insights(
1997                request.collective_id,
1998                &request.query_embedding,
1999                request.max_similar,
2000            )?
2001            .into_iter()
2002            .map(|(insight, _score)| insight)
2003            .collect()
2004        } else {
2005            vec![]
2006        };
2007
2008        // ── 4. Relations (graph traversal from result experiences) ─
2009        let relations = if request.include_relations {
2010            use std::collections::HashSet;
2011
2012            let mut seen = HashSet::new();
2013            let mut all_relations = Vec::new();
2014
2015            // Collect unique experience IDs from both result sets
2016            let exp_ids: Vec<_> = similar_experiences
2017                .iter()
2018                .map(|r| r.experience.id)
2019                .chain(recent_experiences.iter().map(|e| e.id))
2020                .collect();
2021
2022            for exp_id in exp_ids {
2023                let related =
2024                    self.get_related_experiences(exp_id, crate::relation::RelationDirection::Both)?;
2025
2026                for (_experience, relation) in related {
2027                    if seen.insert(relation.id) {
2028                        all_relations.push(relation);
2029                    }
2030                }
2031            }
2032
2033            all_relations
2034        } else {
2035            vec![]
2036        };
2037
2038        // ── 5. Active agents (staleness-filtered activity records) ─
2039        let active_agents = if request.include_active_agents {
2040            self.get_active_agents(request.collective_id)?
2041        } else {
2042            vec![]
2043        };
2044
2045        Ok(ContextCandidates {
2046            similar_experiences,
2047            recent_experiences,
2048            insights,
2049            relations,
2050            active_agents,
2051        })
2052    }
2053
2054    // =========================================================================
2055    // Watch System (E4-S01)
2056    // =========================================================================
2057
2058    /// Subscribes to all experience changes in a collective.
2059    ///
2060    /// Returns a [`WatchStream`] that yields [`WatchEvent`] values for every
2061    /// create, update, archive, and delete operation. The stream ends when
2062    /// dropped or when the `PulseDB` instance is closed.
2063    ///
2064    /// Multiple subscribers per collective are supported. Each gets an
2065    /// independent copy of every event.
2066    ///
2067    /// # Example
2068    ///
2069    /// ```rust,no_run
2070    /// # #[tokio::main]
2071    /// # async fn main() -> pulsedb::Result<()> {
2072    /// # let dir = tempfile::tempdir().unwrap();
2073    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
2074    /// # let collective_id = db.create_collective("example")?;
2075    /// use futures::StreamExt;
2076    ///
2077    /// let mut stream = db.watch_experiences(collective_id)?;
2078    /// while let Some(event) = stream.next().await {
2079    ///     println!("{:?}: {}", event.event_type, event.experience_id);
2080    /// }
2081    /// # Ok(())
2082    /// # }
2083    /// ```
2084    pub fn watch_experiences(&self, collective_id: CollectiveId) -> Result<WatchStream> {
2085        self.watch.subscribe(collective_id, None)
2086    }
2087
2088    /// Subscribes to filtered experience changes in a collective.
2089    ///
2090    /// Like [`watch_experiences`](Self::watch_experiences), but only delivers
2091    /// events that match the filter criteria. Filters are applied on the
2092    /// sender side before channel delivery.
2093    ///
2094    /// # Example
2095    ///
2096    /// ```rust
2097    /// # fn main() -> pulsedb::Result<()> {
2098    /// # let dir = tempfile::tempdir().unwrap();
2099    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
2100    /// # let collective_id = db.create_collective("example")?;
2101    /// use pulsedb::WatchFilter;
2102    ///
2103    /// let filter = WatchFilter {
2104    ///     domains: Some(vec!["security".to_string()]),
2105    ///     min_importance: Some(0.7),
2106    ///     ..Default::default()
2107    /// };
2108    /// let mut stream = db.watch_experiences_filtered(collective_id, filter)?;
2109    /// # Ok(())
2110    /// # }
2111    /// ```
2112    pub fn watch_experiences_filtered(
2113        &self,
2114        collective_id: CollectiveId,
2115        filter: WatchFilter,
2116    ) -> Result<WatchStream> {
2117        self.watch.subscribe(collective_id, Some(filter))
2118    }
2119
2120    // =========================================================================
2121    // Cross-Process Watch (E4-S02)
2122    // =========================================================================
2123
2124    /// Returns the current WAL sequence number.
2125    ///
2126    /// Use this to establish a baseline before starting to poll for changes.
2127    /// Returns 0 if no experience writes have occurred yet.
2128    ///
2129    /// # Example
2130    ///
2131    /// ```rust
2132    /// # fn main() -> pulsedb::Result<()> {
2133    /// # let dir = tempfile::tempdir().unwrap();
2134    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
2135    /// let seq = db.get_current_sequence()?;
2136    /// // ... later ...
2137    /// let (events, new_seq) = db.poll_changes(seq)?;
2138    /// # Ok(())
2139    /// # }
2140    /// ```
2141    pub fn get_current_sequence(&self) -> Result<u64> {
2142        self.storage.get_wal_sequence()
2143    }
2144
2145    /// Polls for experience changes since the given sequence number.
2146    ///
2147    /// Returns a tuple of `(events, new_sequence)`:
2148    /// - `events`: New [`WatchEvent`]s in sequence order
2149    /// - `new_sequence`: Pass this value back on the next call
2150    ///
2151    /// Returns an empty vec and the same sequence if no changes exist.
2152    ///
2153    /// # Arguments
2154    ///
2155    /// * `since_seq` - The last sequence number you received (0 for first call)
2156    ///
2157    /// # Performance
2158    ///
2159    /// Target: < 10ms per call. Internally performs a range scan on the
2160    /// watch_events table, O(k) where k is the number of new events.
2161    ///
2162    /// # Example
2163    ///
2164    /// ```rust,no_run
2165    /// # fn main() -> pulsedb::Result<()> {
2166    /// # let dir = tempfile::tempdir().unwrap();
2167    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
2168    /// use std::time::Duration;
2169    ///
2170    /// let mut seq = 0u64;
2171    /// loop {
2172    ///     let (events, new_seq) = db.poll_changes(seq)?;
2173    ///     seq = new_seq;
2174    ///     for event in events {
2175    ///         println!("{:?}: {}", event.event_type, event.experience_id);
2176    ///     }
2177    ///     std::thread::sleep(Duration::from_millis(100));
2178    /// }
2179    /// # }
2180    /// ```
2181    pub fn poll_changes(&self, since_seq: u64) -> Result<(Vec<WatchEvent>, u64)> {
2182        let (records, new_seq) = self.storage.poll_watch_events(since_seq, 1000)?;
2183        let events = records.into_iter().map(WatchEvent::from).collect();
2184        Ok((events, new_seq))
2185    }
2186
2187    /// Polls for changes with a custom batch size limit.
2188    ///
2189    /// Same as [`poll_changes`](Self::poll_changes) but returns at most
2190    /// `limit` events per call. Use this for backpressure control.
2191    pub fn poll_changes_batch(
2192        &self,
2193        since_seq: u64,
2194        limit: usize,
2195    ) -> Result<(Vec<WatchEvent>, u64)> {
2196        let (records, new_seq) = self.storage.poll_watch_events(since_seq, limit)?;
2197        let events = records.into_iter().map(WatchEvent::from).collect();
2198        Ok((events, new_seq))
2199    }
2200}
2201
2202// PulseDB is auto Send + Sync: Box<dyn StorageEngine + Send + Sync>,
2203// Box<dyn EmbeddingService + Send + Sync>, and Config are all Send + Sync.
2204
2205#[cfg(test)]
2206mod tests {
2207    use super::*;
2208    use crate::config::EmbeddingDimension;
2209    use tempfile::tempdir;
2210
2211    #[test]
2212    fn test_open_creates_database() {
2213        let dir = tempdir().unwrap();
2214        let path = dir.path().join("test.db");
2215
2216        let db = PulseDB::open(&path, Config::default()).unwrap();
2217
2218        assert!(path.exists());
2219        assert_eq!(db.embedding_dimension(), 384);
2220
2221        db.close().unwrap();
2222    }
2223
2224    #[test]
2225    fn test_open_existing_database() {
2226        let dir = tempdir().unwrap();
2227        let path = dir.path().join("test.db");
2228
2229        // Create
2230        let db = PulseDB::open(&path, Config::default()).unwrap();
2231        db.close().unwrap();
2232
2233        // Reopen
2234        let db = PulseDB::open(&path, Config::default()).unwrap();
2235        assert_eq!(db.embedding_dimension(), 384);
2236        db.close().unwrap();
2237    }
2238
2239    #[test]
2240    fn test_config_validation() {
2241        let dir = tempdir().unwrap();
2242        let path = dir.path().join("test.db");
2243
2244        let invalid_config = Config {
2245            cache_size_mb: 0, // Invalid
2246            ..Default::default()
2247        };
2248
2249        let result = PulseDB::open(&path, invalid_config);
2250        assert!(result.is_err());
2251    }
2252
2253    #[test]
2254    fn test_dimension_mismatch() {
2255        let dir = tempdir().unwrap();
2256        let path = dir.path().join("test.db");
2257
2258        // Create with D384
2259        let db = PulseDB::open(
2260            &path,
2261            Config {
2262                embedding_dimension: EmbeddingDimension::D384,
2263                ..Default::default()
2264            },
2265        )
2266        .unwrap();
2267        db.close().unwrap();
2268
2269        // Try to reopen with D768
2270        let result = PulseDB::open(
2271            &path,
2272            Config {
2273                embedding_dimension: EmbeddingDimension::D768,
2274                ..Default::default()
2275            },
2276        );
2277
2278        assert!(result.is_err());
2279    }
2280
2281    #[test]
2282    fn test_metadata_access() {
2283        let dir = tempdir().unwrap();
2284        let path = dir.path().join("test.db");
2285
2286        let db = PulseDB::open(&path, Config::default()).unwrap();
2287
2288        let metadata = db.metadata();
2289        assert_eq!(metadata.embedding_dimension, EmbeddingDimension::D384);
2290
2291        db.close().unwrap();
2292    }
2293
2294    #[test]
2295    fn test_pulsedb_is_send_sync() {
2296        fn assert_send_sync<T: Send + Sync>() {}
2297        assert_send_sync::<PulseDB>();
2298    }
2299}