Skip to main content

pulsedb/
db.rs

1//! PulseDB main struct and lifecycle operations.
2//!
3//! The [`PulseDB`] struct is the primary interface for interacting with
4//! the database. It provides methods for:
5//!
6//! - Opening and closing the database
7//! - Managing collectives (isolation units)
8//! - Recording and querying experiences
9//! - Semantic search and context retrieval
10//!
11//! # Quick Start
12//!
13//! ```rust
14//! # fn main() -> pulsedb::Result<()> {
15//! # let dir = tempfile::tempdir().unwrap();
16//! use pulsedb::{PulseDB, Config, NewExperience};
17//!
18//! // Open or create a database
19//! let db = PulseDB::open(dir.path().join("test.db"), Config::default())?;
20//!
21//! // Create a collective for your project
22//! let collective = db.create_collective("my-project")?;
23//!
24//! // Record an experience
25//! db.record_experience(NewExperience {
26//!     collective_id: collective,
27//!     content: "Always validate user input".to_string(),
28//!     embedding: Some(vec![0.1f32; 384]),
29//!     ..Default::default()
30//! })?;
31//!
32//! // Close when done
33//! db.close()?;
34//! # Ok(())
35//! # }
36//! ```
37//!
38//! # Thread Safety
39//!
40//! `PulseDB` is `Send + Sync` and can be shared across threads using `Arc`.
41//! The underlying storage uses MVCC for concurrent reads with exclusive
42//! write locking.
43//!
44//! ```rust
45//! # fn main() -> pulsedb::Result<()> {
46//! # let dir = tempfile::tempdir().unwrap();
47//! use std::sync::Arc;
48//! use pulsedb::{PulseDB, Config};
49//!
50//! let db = Arc::new(PulseDB::open(dir.path().join("test.db"), Config::default())?);
51//!
52//! // Clone Arc for use in another thread
53//! let db_clone = Arc::clone(&db);
54//! std::thread::spawn(move || {
55//!     // Safe to use db_clone here
56//! });
57//! # Ok(())
58//! # }
59//! ```
60
61use std::collections::HashMap;
62use std::path::{Path, PathBuf};
63use std::sync::{Arc, RwLock};
64
65#[cfg(feature = "sync")]
66use tracing::debug;
67use tracing::{info, instrument, warn};
68
69use crate::activity::{validate_new_activity, Activity, NewActivity};
70use crate::collective::types::CollectiveStats;
71use crate::collective::{validate_collective_name, Collective};
72use crate::config::{Config, EmbeddingProvider};
73use crate::embedding::{create_embedding_service, EmbeddingService};
74use crate::error::{NotFoundError, PulseDBError, Result, ValidationError};
75use crate::experience::{
76    validate_experience_update, validate_new_experience, Experience, ExperienceUpdate,
77    NewExperience,
78};
79use crate::insight::{validate_new_insight, DerivedInsight, NewDerivedInsight};
80#[cfg(feature = "sync")]
81use crate::relation::ExperienceRelation;
82use crate::search::{ContextCandidates, ContextRequest, SearchFilter, SearchResult};
83use crate::storage::{open_storage, DatabaseMetadata, StorageEngine};
84#[cfg(feature = "sync")]
85use crate::types::RelationId;
86use crate::types::{CollectiveId, ExperienceId, InsightId, Timestamp};
87use crate::vector::HnswIndex;
88use crate::watch::{WatchEvent, WatchEventType, WatchFilter, WatchService, WatchStream};
89
90/// The main PulseDB database handle.
91///
92/// This is the primary interface for all database operations. Create an
93/// instance with [`PulseDB::open()`] and close it with [`PulseDB::close()`].
94///
95/// # Ownership
96///
97/// `PulseDB` owns its storage and embedding service. When you call `close()`,
98/// the database is consumed and cannot be used afterward. This ensures
99/// resources are properly released.
100pub struct PulseDB {
101    /// Storage engine (redb or mock for testing).
102    storage: Box<dyn StorageEngine>,
103
104    /// Embedding service (external or ONNX).
105    embedding: Box<dyn EmbeddingService>,
106
107    /// Configuration used to open this database.
108    config: Config,
109
110    /// Per-collective HNSW vector indexes for experience semantic search.
111    ///
112    /// Outer RwLock protects the HashMap (add/remove collectives).
113    /// Each HnswIndex has its own internal RwLock for concurrent search+insert.
114    vectors: RwLock<HashMap<CollectiveId, HnswIndex>>,
115
116    /// Per-collective HNSW vector indexes for insight semantic search.
117    ///
118    /// Separate from `vectors` to prevent ID collisions between experiences
119    /// and insights. Uses InsightId→ExperienceId byte conversion for the
120    /// HNSW API (safe because indexes are isolated per collective).
121    insight_vectors: RwLock<HashMap<CollectiveId, HnswIndex>>,
122
123    /// Watch service for real-time experience change notifications.
124    ///
125    /// Arc-wrapped because [`WatchStream`] holds a weak reference for
126    /// cleanup on drop.
127    watch: Arc<WatchService>,
128}
129
130impl std::fmt::Debug for PulseDB {
131    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132        let vector_count = self.vectors.read().map(|v| v.len()).unwrap_or(0);
133        let insight_vector_count = self.insight_vectors.read().map(|v| v.len()).unwrap_or(0);
134        f.debug_struct("PulseDB")
135            .field("config", &self.config)
136            .field("embedding_dimension", &self.embedding_dimension())
137            .field("vector_indexes", &vector_count)
138            .field("insight_vector_indexes", &insight_vector_count)
139            .finish_non_exhaustive()
140    }
141}
142
143impl PulseDB {
144    /// Opens or creates a PulseDB database at the specified path.
145    ///
146    /// If the database doesn't exist, it will be created with the given
147    /// configuration. If it exists, the configuration will be validated
148    /// against the stored settings (e.g., embedding dimension must match).
149    ///
150    /// # Arguments
151    ///
152    /// * `path` - Path to the database file (created if it doesn't exist)
153    /// * `config` - Configuration options for the database
154    ///
155    /// # Errors
156    ///
157    /// Returns an error if:
158    /// - Configuration is invalid (see [`Config::validate`])
159    /// - Database file is corrupted
160    /// - Database is locked by another process
161    /// - Schema version doesn't match (needs migration)
162    /// - Embedding dimension doesn't match existing database
163    ///
164    /// # Example
165    ///
166    /// ```rust
167    /// # fn main() -> pulsedb::Result<()> {
168    /// # let dir = tempfile::tempdir().unwrap();
169    /// use pulsedb::{PulseDB, Config, EmbeddingDimension};
170    ///
171    /// // Open with default configuration
172    /// let db = PulseDB::open(dir.path().join("default.db"), Config::default())?;
173    /// # drop(db);
174    ///
175    /// // Open with custom embedding dimension
176    /// let db = PulseDB::open(dir.path().join("custom.db"), Config {
177    ///     embedding_dimension: EmbeddingDimension::D768,
178    ///     ..Default::default()
179    /// })?;
180    /// # Ok(())
181    /// # }
182    /// ```
183    #[instrument(skip(config), fields(path = %path.as_ref().display()))]
184    pub fn open(path: impl AsRef<Path>, config: Config) -> Result<Self> {
185        // Validate configuration first
186        config.validate().map_err(PulseDBError::from)?;
187
188        info!("Opening PulseDB");
189
190        // Open storage engine
191        let storage = open_storage(&path, &config)?;
192
193        // Create embedding service
194        let embedding = create_embedding_service(&config)?;
195
196        // Load or rebuild HNSW indexes for all existing collectives
197        let vectors = Self::load_all_indexes(&*storage, &config)?;
198        let insight_vectors = Self::load_all_insight_indexes(&*storage, &config)?;
199
200        info!(
201            dimension = config.embedding_dimension.size(),
202            sync_mode = ?config.sync_mode,
203            collectives = vectors.len(),
204            "PulseDB opened successfully"
205        );
206
207        let watch = Arc::new(WatchService::new(
208            config.watch.buffer_size,
209            config.watch.in_process,
210        ));
211
212        Ok(Self {
213            storage,
214            embedding,
215            config,
216            vectors: RwLock::new(vectors),
217            insight_vectors: RwLock::new(insight_vectors),
218            watch,
219        })
220    }
221
222    /// Closes the database, flushing all pending writes.
223    ///
224    /// This method consumes the `PulseDB` instance, ensuring it cannot
225    /// be used after closing. The underlying storage engine flushes all
226    /// buffered data to disk.
227    ///
228    /// # Errors
229    ///
230    /// Returns an error if the storage backend reports a flush failure.
231    /// Note: the current redb backend flushes durably on drop, so this
232    /// always returns `Ok(())` in practice.
233    ///
234    /// # Example
235    ///
236    /// ```rust
237    /// # fn main() -> pulsedb::Result<()> {
238    /// # let dir = tempfile::tempdir().unwrap();
239    /// use pulsedb::{PulseDB, Config};
240    ///
241    /// let db = PulseDB::open(dir.path().join("test.db"), Config::default())?;
242    /// // ... use the database ...
243    /// db.close()?;  // db is consumed here
244    /// // db.something() // Compile error: db was moved
245    /// # Ok(())
246    /// # }
247    /// ```
248    #[instrument(skip(self))]
249    pub fn close(self) -> Result<()> {
250        info!("Closing PulseDB");
251
252        // Persist HNSW indexes BEFORE closing storage.
253        // If HNSW save fails, storage is still open for potential recovery.
254        // On next open(), stale/missing HNSW files trigger a rebuild from redb.
255        if let Some(hnsw_dir) = self.hnsw_dir() {
256            // Experience HNSW indexes
257            let vectors = self
258                .vectors
259                .read()
260                .map_err(|_| PulseDBError::vector("Vectors lock poisoned during close"))?;
261            for (collective_id, index) in vectors.iter() {
262                if let Err(e) = index.save_to_dir(&hnsw_dir, &collective_id.to_string()) {
263                    warn!(
264                        collective = %collective_id,
265                        error = %e,
266                        "Failed to save HNSW index (will rebuild on next open)"
267                    );
268                }
269            }
270            drop(vectors);
271
272            // Insight HNSW indexes (separate files with _insights suffix)
273            let insight_vectors = self
274                .insight_vectors
275                .read()
276                .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned during close"))?;
277            for (collective_id, index) in insight_vectors.iter() {
278                let name = format!("{}_insights", collective_id);
279                if let Err(e) = index.save_to_dir(&hnsw_dir, &name) {
280                    warn!(
281                        collective = %collective_id,
282                        error = %e,
283                        "Failed to save insight HNSW index (will rebuild on next open)"
284                    );
285                }
286            }
287        }
288
289        // Close storage (flushes pending writes)
290        self.storage.close()?;
291
292        info!("PulseDB closed successfully");
293        Ok(())
294    }
295
296    /// Returns a reference to the database configuration.
297    ///
298    /// This is the configuration that was used to open the database.
299    /// Note that some settings (like embedding dimension) are locked
300    /// on database creation and cannot be changed.
301    #[inline]
302    pub fn config(&self) -> &Config {
303        &self.config
304    }
305
306    /// Returns the database metadata.
307    ///
308    /// Metadata includes schema version, embedding dimension, and timestamps
309    /// for when the database was created and last opened.
310    #[inline]
311    pub fn metadata(&self) -> &DatabaseMetadata {
312        self.storage.metadata()
313    }
314
315    /// Returns the embedding dimension configured for this database.
316    ///
317    /// All embeddings stored in this database must have exactly this
318    /// many dimensions.
319    #[inline]
320    pub fn embedding_dimension(&self) -> usize {
321        self.config.embedding_dimension.size()
322    }
323
324    // =========================================================================
325    // Internal Accessors (for use by feature modules)
326    // =========================================================================
327
328    /// Returns a reference to the storage engine.
329    ///
330    /// This is for internal use by other PulseDB modules.
331    #[inline]
332    #[allow(dead_code)] // Will be used by search (Phase 2) and other modules
333    pub(crate) fn storage(&self) -> &dyn StorageEngine {
334        self.storage.as_ref()
335    }
336
337    /// Returns a reference to the embedding service.
338    ///
339    /// This is for internal use by other PulseDB modules.
340    #[inline]
341    #[allow(dead_code)] // Will be used by search (Phase 2) and other modules
342    pub(crate) fn embedding(&self) -> &dyn EmbeddingService {
343        self.embedding.as_ref()
344    }
345
346    // =========================================================================
347    // HNSW Index Lifecycle
348    // =========================================================================
349
350    /// Returns the directory for HNSW index files.
351    ///
352    /// Derives `{db_path}.hnsw/` from the storage path. Returns `None` if
353    /// the storage has no file path (e.g., in-memory tests).
354    fn hnsw_dir(&self) -> Option<PathBuf> {
355        self.storage.path().map(|p| {
356            let mut hnsw_path = p.as_os_str().to_owned();
357            hnsw_path.push(".hnsw");
358            PathBuf::from(hnsw_path)
359        })
360    }
361
362    /// Loads or rebuilds HNSW indexes for all existing collectives.
363    ///
364    /// For each collective in storage:
365    /// 1. Try loading metadata from `.hnsw.meta` file
366    /// 2. Rebuild the graph from redb embeddings (always, since we can't
367    ///    load the graph due to hnsw_rs lifetime constraints)
368    /// 3. Restore deleted set from metadata if available
369    fn load_all_indexes(
370        storage: &dyn StorageEngine,
371        config: &Config,
372    ) -> Result<HashMap<CollectiveId, HnswIndex>> {
373        let collectives = storage.list_collectives()?;
374        let mut vectors = HashMap::with_capacity(collectives.len());
375
376        let hnsw_dir = storage.path().map(|p| {
377            let mut hnsw_path = p.as_os_str().to_owned();
378            hnsw_path.push(".hnsw");
379            PathBuf::from(hnsw_path)
380        });
381
382        for collective in &collectives {
383            let dimension = collective.embedding_dimension as usize;
384
385            // List all experience IDs in this collective
386            let exp_ids = storage.list_experience_ids_in_collective(collective.id)?;
387
388            // Load embeddings from redb (source of truth)
389            let mut embeddings = Vec::with_capacity(exp_ids.len());
390            for exp_id in &exp_ids {
391                if let Some(embedding) = storage.get_embedding(*exp_id)? {
392                    embeddings.push((*exp_id, embedding));
393                }
394            }
395
396            // Try loading metadata (for deleted set and ID mappings)
397            let metadata = hnsw_dir
398                .as_ref()
399                .and_then(|dir| HnswIndex::load_metadata(dir, &collective.id.to_string()).ok())
400                .flatten();
401
402            // Rebuild the HNSW graph from embeddings
403            let index = if embeddings.is_empty() {
404                HnswIndex::new(dimension, &config.hnsw)
405            } else {
406                let start = std::time::Instant::now();
407                let idx = HnswIndex::rebuild_from_embeddings(dimension, &config.hnsw, embeddings)?;
408                info!(
409                    collective = %collective.id,
410                    vectors = idx.active_count(),
411                    elapsed_ms = start.elapsed().as_millis() as u64,
412                    "Rebuilt HNSW index from redb embeddings"
413                );
414                idx
415            };
416
417            // Restore deleted set from metadata if available
418            if let Some(meta) = metadata {
419                index.restore_deleted_set(&meta.deleted)?;
420            }
421
422            vectors.insert(collective.id, index);
423        }
424
425        Ok(vectors)
426    }
427
428    /// Loads or rebuilds insight HNSW indexes for all existing collectives.
429    ///
430    /// For each collective, loads all insights from storage and rebuilds
431    /// the HNSW graph from their inline embeddings. Uses InsightId→ExperienceId
432    /// byte conversion for the HNSW API.
433    fn load_all_insight_indexes(
434        storage: &dyn StorageEngine,
435        config: &Config,
436    ) -> Result<HashMap<CollectiveId, HnswIndex>> {
437        let collectives = storage.list_collectives()?;
438        let mut insight_vectors = HashMap::with_capacity(collectives.len());
439
440        let hnsw_dir = storage.path().map(|p| {
441            let mut hnsw_path = p.as_os_str().to_owned();
442            hnsw_path.push(".hnsw");
443            PathBuf::from(hnsw_path)
444        });
445
446        for collective in &collectives {
447            let dimension = collective.embedding_dimension as usize;
448
449            // List all insight IDs in this collective
450            let insight_ids = storage.list_insight_ids_in_collective(collective.id)?;
451
452            // Load insights and extract embeddings (converting InsightId → ExperienceId)
453            let mut embeddings = Vec::with_capacity(insight_ids.len());
454            for insight_id in &insight_ids {
455                if let Some(insight) = storage.get_insight(*insight_id)? {
456                    let exp_id = ExperienceId::from_bytes(*insight_id.as_bytes());
457                    embeddings.push((exp_id, insight.embedding));
458                }
459            }
460
461            // Try loading metadata (for deleted set)
462            let name = format!("{}_insights", collective.id);
463            let metadata = hnsw_dir
464                .as_ref()
465                .and_then(|dir| HnswIndex::load_metadata(dir, &name).ok())
466                .flatten();
467
468            // Rebuild HNSW graph from embeddings
469            let index = if embeddings.is_empty() {
470                HnswIndex::new(dimension, &config.hnsw)
471            } else {
472                let start = std::time::Instant::now();
473                let idx = HnswIndex::rebuild_from_embeddings(dimension, &config.hnsw, embeddings)?;
474                info!(
475                    collective = %collective.id,
476                    insights = idx.active_count(),
477                    elapsed_ms = start.elapsed().as_millis() as u64,
478                    "Rebuilt insight HNSW index from stored insights"
479                );
480                idx
481            };
482
483            // Restore deleted set from metadata if available
484            if let Some(meta) = metadata {
485                index.restore_deleted_set(&meta.deleted)?;
486            }
487
488            insight_vectors.insert(collective.id, index);
489        }
490
491        Ok(insight_vectors)
492    }
493
494    /// Executes a closure with the HNSW index for a collective.
495    ///
496    /// This is the primary accessor for vector search operations (used by
497    /// `search_similar()`). The closure runs while the outer RwLock guard
498    /// is held (read lock), so the HnswIndex reference stays valid.
499    /// Returns `None` if no index exists for the collective.
500    #[doc(hidden)]
501    pub fn with_vector_index<F, R>(&self, collective_id: CollectiveId, f: F) -> Result<Option<R>>
502    where
503        F: FnOnce(&HnswIndex) -> Result<R>,
504    {
505        let vectors = self
506            .vectors
507            .read()
508            .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?;
509        match vectors.get(&collective_id) {
510            Some(index) => Ok(Some(f(index)?)),
511            None => Ok(None),
512        }
513    }
514
515    // =========================================================================
516    // Test Helpers
517    // =========================================================================
518
519    /// Returns a reference to the storage engine for integration testing.
520    ///
521    /// This method is intentionally hidden from documentation. It provides
522    /// test-only access to the storage layer for verifying ACID guarantees
523    /// and crash recovery. Production code should use the public PulseDB API.
524    #[doc(hidden)]
525    #[inline]
526    pub fn storage_for_test(&self) -> &dyn StorageEngine {
527        self.storage.as_ref()
528    }
529
530    // =========================================================================
531    // Collective Management (E1-S02)
532    // =========================================================================
533
534    /// Creates a new collective with the given name.
535    ///
536    /// The collective's embedding dimension is locked to the database's
537    /// configured dimension at creation time.
538    ///
539    /// # Arguments
540    ///
541    /// * `name` - Human-readable name (1-255 characters, not whitespace-only)
542    ///
543    /// # Errors
544    ///
545    /// Returns a validation error if the name is empty, whitespace-only,
546    /// or exceeds 255 characters.
547    ///
548    /// # Example
549    ///
550    /// ```rust
551    /// # fn main() -> pulsedb::Result<()> {
552    /// # let dir = tempfile::tempdir().unwrap();
553    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
554    /// let id = db.create_collective("my-project")?;
555    /// # Ok(())
556    /// # }
557    /// ```
558    #[instrument(skip(self))]
559    pub fn create_collective(&self, name: &str) -> Result<CollectiveId> {
560        validate_collective_name(name)?;
561
562        let dimension = self.config.embedding_dimension.size() as u16;
563        let collective = Collective::new(name, dimension);
564        let id = collective.id;
565
566        // Persist to redb first (source of truth)
567        self.storage.save_collective(&collective)?;
568
569        // Create empty HNSW indexes for this collective
570        let exp_index = HnswIndex::new(dimension as usize, &self.config.hnsw);
571        let insight_index = HnswIndex::new(dimension as usize, &self.config.hnsw);
572        self.vectors
573            .write()
574            .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?
575            .insert(id, exp_index);
576        self.insight_vectors
577            .write()
578            .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?
579            .insert(id, insight_index);
580
581        info!(id = %id, name = %name, "Collective created");
582        Ok(id)
583    }
584
585    /// Creates a new collective with an owner for multi-tenancy.
586    ///
587    /// Same as [`create_collective`](Self::create_collective) but assigns
588    /// an owner ID, enabling filtering with
589    /// [`list_collectives_by_owner`](Self::list_collectives_by_owner).
590    ///
591    /// # Arguments
592    ///
593    /// * `name` - Human-readable name (1-255 characters)
594    /// * `owner_id` - Owner identifier (must not be empty)
595    ///
596    /// # Errors
597    ///
598    /// Returns a validation error if the name or owner_id is invalid.
599    #[instrument(skip(self))]
600    pub fn create_collective_with_owner(&self, name: &str, owner_id: &str) -> Result<CollectiveId> {
601        validate_collective_name(name)?;
602
603        if owner_id.is_empty() {
604            return Err(PulseDBError::from(
605                crate::error::ValidationError::required_field("owner_id"),
606            ));
607        }
608
609        let dimension = self.config.embedding_dimension.size() as u16;
610        let collective = Collective::with_owner(name, owner_id, dimension);
611        let id = collective.id;
612
613        // Persist to redb first (source of truth)
614        self.storage.save_collective(&collective)?;
615
616        // Create empty HNSW indexes for this collective
617        let exp_index = HnswIndex::new(dimension as usize, &self.config.hnsw);
618        let insight_index = HnswIndex::new(dimension as usize, &self.config.hnsw);
619        self.vectors
620            .write()
621            .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?
622            .insert(id, exp_index);
623        self.insight_vectors
624            .write()
625            .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?
626            .insert(id, insight_index);
627
628        info!(id = %id, name = %name, owner = %owner_id, "Collective created with owner");
629        Ok(id)
630    }
631
632    /// Returns a collective by ID, or `None` if not found.
633    ///
634    /// # Example
635    ///
636    /// ```rust
637    /// # fn main() -> pulsedb::Result<()> {
638    /// # let dir = tempfile::tempdir().unwrap();
639    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
640    /// # let id = db.create_collective("example")?;
641    /// if let Some(collective) = db.get_collective(id)? {
642    ///     println!("Found: {}", collective.name);
643    /// }
644    /// # Ok(())
645    /// # }
646    /// ```
647    #[instrument(skip(self))]
648    pub fn get_collective(&self, id: CollectiveId) -> Result<Option<Collective>> {
649        self.storage.get_collective(id)
650    }
651
652    /// Lists all collectives in the database.
653    ///
654    /// Returns an empty vector if no collectives exist.
655    pub fn list_collectives(&self) -> Result<Vec<Collective>> {
656        self.storage.list_collectives()
657    }
658
659    /// Lists collectives filtered by owner ID.
660    ///
661    /// Returns only collectives whose `owner_id` matches the given value.
662    /// Returns an empty vector if no matching collectives exist.
663    pub fn list_collectives_by_owner(&self, owner_id: &str) -> Result<Vec<Collective>> {
664        let all = self.storage.list_collectives()?;
665        Ok(all
666            .into_iter()
667            .filter(|c| c.owner_id.as_deref() == Some(owner_id))
668            .collect())
669    }
670
671    /// Returns statistics for a collective.
672    ///
673    /// # Errors
674    ///
675    /// Returns [`NotFoundError::Collective`] if the collective doesn't exist.
676    #[instrument(skip(self))]
677    pub fn get_collective_stats(&self, id: CollectiveId) -> Result<CollectiveStats> {
678        // Verify collective exists
679        self.storage
680            .get_collective(id)?
681            .ok_or_else(|| PulseDBError::from(NotFoundError::collective(id)))?;
682
683        let experience_count = self.storage.count_experiences_in_collective(id)?;
684
685        Ok(CollectiveStats {
686            experience_count,
687            storage_bytes: 0,
688            oldest_experience: None,
689            newest_experience: None,
690        })
691    }
692
693    /// Deletes a collective and all its associated data.
694    ///
695    /// Performs cascade deletion: removes all experiences belonging to the
696    /// collective before removing the collective record itself.
697    ///
698    /// # Errors
699    ///
700    /// Returns [`NotFoundError::Collective`] if the collective doesn't exist.
701    ///
702    /// # Example
703    ///
704    /// ```rust
705    /// # fn main() -> pulsedb::Result<()> {
706    /// # let dir = tempfile::tempdir().unwrap();
707    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
708    /// # let collective_id = db.create_collective("to-delete")?;
709    /// db.delete_collective(collective_id)?;
710    /// assert!(db.get_collective(collective_id)?.is_none());
711    /// # Ok(())
712    /// # }
713    /// ```
714    #[instrument(skip(self))]
715    pub fn delete_collective(&self, id: CollectiveId) -> Result<()> {
716        // Verify collective exists
717        self.storage
718            .get_collective(id)?
719            .ok_or_else(|| PulseDBError::from(NotFoundError::collective(id)))?;
720
721        // Cascade: delete all experiences for this collective
722        let deleted_count = self.storage.delete_experiences_by_collective(id)?;
723        if deleted_count > 0 {
724            info!(count = deleted_count, "Cascade-deleted experiences");
725        }
726
727        // Cascade: delete all insights for this collective
728        let deleted_insights = self.storage.delete_insights_by_collective(id)?;
729        if deleted_insights > 0 {
730            info!(count = deleted_insights, "Cascade-deleted insights");
731        }
732
733        // Cascade: delete all activities for this collective
734        let deleted_activities = self.storage.delete_activities_by_collective(id)?;
735        if deleted_activities > 0 {
736            info!(count = deleted_activities, "Cascade-deleted activities");
737        }
738
739        // Delete the collective record from storage
740        self.storage.delete_collective(id)?;
741
742        // Remove HNSW indexes from memory
743        self.vectors
744            .write()
745            .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?
746            .remove(&id);
747        self.insight_vectors
748            .write()
749            .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?
750            .remove(&id);
751
752        // Remove HNSW files from disk (non-fatal if fails)
753        if let Some(hnsw_dir) = self.hnsw_dir() {
754            if let Err(e) = HnswIndex::remove_files(&hnsw_dir, &id.to_string()) {
755                warn!(
756                    collective = %id,
757                    error = %e,
758                    "Failed to remove experience HNSW files (non-fatal)"
759                );
760            }
761            let insight_name = format!("{}_insights", id);
762            if let Err(e) = HnswIndex::remove_files(&hnsw_dir, &insight_name) {
763                warn!(
764                    collective = %id,
765                    error = %e,
766                    "Failed to remove insight HNSW files (non-fatal)"
767                );
768            }
769        }
770
771        info!(id = %id, "Collective deleted");
772        Ok(())
773    }
774
775    // =========================================================================
776    // Experience CRUD (E1-S03)
777    // =========================================================================
778
779    /// Records a new experience in the database.
780    ///
781    /// This is the primary method for storing agent-learned knowledge. The method:
782    /// 1. Validates the input (content, scores, tags, embedding)
783    /// 2. Verifies the collective exists
784    /// 3. Resolves the embedding (generates if Builtin, requires if External)
785    /// 4. Stores the experience atomically across 4 tables
786    ///
787    /// # Arguments
788    ///
789    /// * `exp` - The experience to record (see [`NewExperience`])
790    ///
791    /// # Errors
792    ///
793    /// - [`ValidationError`](crate::ValidationError) if input is invalid
794    /// - [`NotFoundError::Collective`] if the collective doesn't exist
795    /// - [`PulseDBError::Embedding`] if embedding generation fails (Builtin mode)
796    #[instrument(skip(self, exp), fields(collective_id = %exp.collective_id))]
797    pub fn record_experience(&self, exp: NewExperience) -> Result<ExperienceId> {
798        let is_external = matches!(self.config.embedding_provider, EmbeddingProvider::External);
799
800        // Verify collective exists and get its dimension
801        let collective = self
802            .storage
803            .get_collective(exp.collective_id)?
804            .ok_or_else(|| PulseDBError::from(NotFoundError::collective(exp.collective_id)))?;
805
806        // Validate input
807        validate_new_experience(&exp, collective.embedding_dimension, is_external)?;
808
809        // Resolve embedding
810        let embedding = match exp.embedding {
811            Some(emb) => emb,
812            None => {
813                // Builtin mode: generate embedding from content
814                self.embedding.embed(&exp.content)?
815            }
816        };
817
818        // Clone embedding for HNSW insertion (~1.5KB for 384d, negligible vs I/O)
819        let embedding_for_hnsw = embedding.clone();
820        let collective_id = exp.collective_id;
821
822        // Construct the full experience record
823        let experience = Experience {
824            id: ExperienceId::new(),
825            collective_id,
826            content: exp.content,
827            embedding,
828            experience_type: exp.experience_type,
829            importance: exp.importance,
830            confidence: exp.confidence,
831            applications: 0,
832            domain: exp.domain,
833            related_files: exp.related_files,
834            source_agent: exp.source_agent,
835            source_task: exp.source_task,
836            timestamp: Timestamp::now(),
837            archived: false,
838        };
839
840        let id = experience.id;
841
842        // Write to redb FIRST (source of truth). If crash happens after
843        // this but before HNSW insert, rebuild on next open will include it.
844        self.storage.save_experience(&experience)?;
845
846        // Insert into HNSW index (derived structure)
847        let vectors = self
848            .vectors
849            .read()
850            .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?;
851        if let Some(index) = vectors.get(&collective_id) {
852            index.insert_experience(id, &embedding_for_hnsw)?;
853        }
854
855        // Emit watch event after both storage and HNSW succeed
856        self.watch.emit(
857            WatchEvent {
858                experience_id: id,
859                collective_id,
860                event_type: WatchEventType::Created,
861                timestamp: experience.timestamp,
862            },
863            &experience,
864        )?;
865
866        info!(id = %id, "Experience recorded");
867        Ok(id)
868    }
869
870    /// Retrieves an experience by ID, including its embedding.
871    ///
872    /// Returns `None` if no experience with the given ID exists.
873    #[instrument(skip(self))]
874    pub fn get_experience(&self, id: ExperienceId) -> Result<Option<Experience>> {
875        self.storage.get_experience(id)
876    }
877
878    /// Updates mutable fields of an experience.
879    ///
880    /// Only fields set to `Some(...)` in the update are changed.
881    /// Content and embedding are immutable — create a new experience instead.
882    ///
883    /// # Errors
884    ///
885    /// - [`ValidationError`](crate::ValidationError) if updated values are invalid
886    /// - [`NotFoundError::Experience`] if the experience doesn't exist
887    #[instrument(skip(self, update))]
888    pub fn update_experience(&self, id: ExperienceId, update: ExperienceUpdate) -> Result<()> {
889        validate_experience_update(&update)?;
890
891        let updated = self.storage.update_experience(id, &update)?;
892        if !updated {
893            return Err(PulseDBError::from(NotFoundError::experience(id)));
894        }
895
896        // Emit watch event (fetch experience for collective_id + filter matching)
897        if self.watch.has_subscribers() {
898            if let Ok(Some(exp)) = self.storage.get_experience(id) {
899                let event_type = if update.archived == Some(true) {
900                    WatchEventType::Archived
901                } else {
902                    WatchEventType::Updated
903                };
904                self.watch.emit(
905                    WatchEvent {
906                        experience_id: id,
907                        collective_id: exp.collective_id,
908                        event_type,
909                        timestamp: Timestamp::now(),
910                    },
911                    &exp,
912                )?;
913            }
914        }
915
916        info!(id = %id, "Experience updated");
917        Ok(())
918    }
919
920    /// Archives an experience (soft-delete).
921    ///
922    /// Archived experiences remain in storage but are excluded from search
923    /// results. Use [`unarchive_experience`](Self::unarchive_experience) to restore.
924    ///
925    /// # Errors
926    ///
927    /// Returns [`NotFoundError::Experience`] if the experience doesn't exist.
928    #[instrument(skip(self))]
929    pub fn archive_experience(&self, id: ExperienceId) -> Result<()> {
930        self.update_experience(
931            id,
932            ExperienceUpdate {
933                archived: Some(true),
934                ..Default::default()
935            },
936        )
937    }
938
939    /// Restores an archived experience.
940    ///
941    /// The experience will once again appear in search results.
942    ///
943    /// # Errors
944    ///
945    /// Returns [`NotFoundError::Experience`] if the experience doesn't exist.
946    #[instrument(skip(self))]
947    pub fn unarchive_experience(&self, id: ExperienceId) -> Result<()> {
948        self.update_experience(
949            id,
950            ExperienceUpdate {
951                archived: Some(false),
952                ..Default::default()
953            },
954        )
955    }
956
957    /// Permanently deletes an experience and its embedding.
958    ///
959    /// This removes the experience from all tables and indices.
960    /// Unlike archiving, this is irreversible.
961    ///
962    /// # Errors
963    ///
964    /// Returns [`NotFoundError::Experience`] if the experience doesn't exist.
965    #[instrument(skip(self))]
966    pub fn delete_experience(&self, id: ExperienceId) -> Result<()> {
967        // Read experience first to get collective_id for HNSW lookup.
968        // This adds one extra read, but delete is not a hot path.
969        let experience = self
970            .storage
971            .get_experience(id)?
972            .ok_or_else(|| PulseDBError::from(NotFoundError::experience(id)))?;
973
974        // Cascade-delete any relations involving this experience.
975        // Done before experience deletion so we can still look up relation data.
976        let rel_count = self.storage.delete_relations_for_experience(id)?;
977        if rel_count > 0 {
978            info!(
979                count = rel_count,
980                "Cascade-deleted relations for experience"
981            );
982        }
983
984        // Delete from redb FIRST (source of truth). If crash happens after
985        // this but before HNSW soft-delete, on reopen the experience won't be
986        // loaded from redb, so it's automatically excluded from the rebuilt index.
987        self.storage.delete_experience(id)?;
988
989        // Soft-delete from HNSW index (mark as deleted, not removed from graph).
990        // This takes effect immediately for the current session's searches.
991        let vectors = self
992            .vectors
993            .read()
994            .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?;
995        if let Some(index) = vectors.get(&experience.collective_id) {
996            index.delete_experience(id)?;
997        }
998
999        // Emit watch event after storage + HNSW deletion
1000        self.watch.emit(
1001            WatchEvent {
1002                experience_id: id,
1003                collective_id: experience.collective_id,
1004                event_type: WatchEventType::Deleted,
1005                timestamp: Timestamp::now(),
1006            },
1007            &experience,
1008        )?;
1009
1010        info!(id = %id, "Experience deleted");
1011        Ok(())
1012    }
1013
1014    /// Reinforces an experience by incrementing its application count.
1015    ///
1016    /// Each call atomically increments the `applications` counter by 1.
1017    /// Returns the new application count.
1018    ///
1019    /// # Errors
1020    ///
1021    /// Returns [`NotFoundError::Experience`] if the experience doesn't exist.
1022    #[instrument(skip(self))]
1023    pub fn reinforce_experience(&self, id: ExperienceId) -> Result<u32> {
1024        let new_count = self
1025            .storage
1026            .reinforce_experience(id)?
1027            .ok_or_else(|| PulseDBError::from(NotFoundError::experience(id)))?;
1028
1029        // Emit watch event (fetch experience for collective_id + filter matching)
1030        if self.watch.has_subscribers() {
1031            if let Ok(Some(exp)) = self.storage.get_experience(id) {
1032                self.watch.emit(
1033                    WatchEvent {
1034                        experience_id: id,
1035                        collective_id: exp.collective_id,
1036                        event_type: WatchEventType::Updated,
1037                        timestamp: Timestamp::now(),
1038                    },
1039                    &exp,
1040                )?;
1041            }
1042        }
1043
1044        info!(id = %id, applications = new_count, "Experience reinforced");
1045        Ok(new_count)
1046    }
1047
1048    // =========================================================================
1049    // Recent Experiences
1050    // =========================================================================
1051
1052    /// Retrieves the most recent experiences in a collective, newest first.
1053    ///
1054    /// Returns up to `limit` non-archived experiences ordered by timestamp
1055    /// descending (newest first). Uses the by-collective timestamp index
1056    /// for efficient retrieval.
1057    ///
1058    /// # Arguments
1059    ///
1060    /// * `collective_id` - The collective to query
1061    /// * `limit` - Maximum number of experiences to return (1-1000)
1062    ///
1063    /// # Errors
1064    ///
1065    /// - [`ValidationError::InvalidField`] if `limit` is 0 or > 1000
1066    /// - [`NotFoundError::Collective`] if the collective doesn't exist
1067    ///
1068    /// # Example
1069    ///
1070    /// ```rust
1071    /// # fn main() -> pulsedb::Result<()> {
1072    /// # let dir = tempfile::tempdir().unwrap();
1073    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
1074    /// # let collective_id = db.create_collective("example")?;
1075    /// let recent = db.get_recent_experiences(collective_id, 50)?;
1076    /// for exp in &recent {
1077    ///     println!("{}: {}", exp.timestamp, exp.content);
1078    /// }
1079    /// # Ok(())
1080    /// # }
1081    /// ```
1082    #[instrument(skip(self))]
1083    pub fn get_recent_experiences(
1084        &self,
1085        collective_id: CollectiveId,
1086        limit: usize,
1087    ) -> Result<Vec<Experience>> {
1088        self.get_recent_experiences_filtered(collective_id, limit, SearchFilter::default())
1089    }
1090
1091    /// Retrieves the most recent experiences in a collective with filtering.
1092    ///
1093    /// Like [`get_recent_experiences()`](Self::get_recent_experiences), but
1094    /// applies additional filters on domain, experience type, importance,
1095    /// confidence, and timestamp.
1096    ///
1097    /// Over-fetches from storage (2x `limit`) to account for entries removed
1098    /// by post-filtering, then truncates to the requested `limit`.
1099    ///
1100    /// # Arguments
1101    ///
1102    /// * `collective_id` - The collective to query
1103    /// * `limit` - Maximum number of experiences to return (1-1000)
1104    /// * `filter` - Filter criteria to apply
1105    ///
1106    /// # Errors
1107    ///
1108    /// - [`ValidationError::InvalidField`] if `limit` is 0 or > 1000
1109    /// - [`NotFoundError::Collective`] if the collective doesn't exist
1110    #[instrument(skip(self, filter))]
1111    pub fn get_recent_experiences_filtered(
1112        &self,
1113        collective_id: CollectiveId,
1114        limit: usize,
1115        filter: SearchFilter,
1116    ) -> Result<Vec<Experience>> {
1117        // Validate limit
1118        if limit == 0 || limit > 1000 {
1119            return Err(
1120                ValidationError::invalid_field("limit", "must be between 1 and 1000").into(),
1121            );
1122        }
1123
1124        // Verify collective exists
1125        self.storage
1126            .get_collective(collective_id)?
1127            .ok_or_else(|| PulseDBError::from(NotFoundError::collective(collective_id)))?;
1128
1129        // Over-fetch IDs to account for post-filtering losses
1130        let over_fetch = limit.saturating_mul(2).min(2000);
1131        let recent_ids = self
1132            .storage
1133            .get_recent_experience_ids(collective_id, over_fetch)?;
1134
1135        // Load full experiences and apply filter
1136        let mut results = Vec::with_capacity(limit);
1137        for (exp_id, _timestamp) in recent_ids {
1138            if results.len() >= limit {
1139                break;
1140            }
1141
1142            if let Some(experience) = self.storage.get_experience(exp_id)? {
1143                if filter.matches(&experience) {
1144                    results.push(experience);
1145                }
1146            }
1147        }
1148
1149        Ok(results)
1150    }
1151
1152    // =========================================================================
1153    // Similarity Search (E2-S02)
1154    // =========================================================================
1155
1156    /// Searches for experiences semantically similar to the query embedding.
1157    ///
1158    /// Uses the HNSW vector index for approximate nearest neighbor search,
1159    /// then fetches full experience records from storage. Archived experiences
1160    /// are excluded by default.
1161    ///
1162    /// Results are sorted by similarity descending (most similar first).
1163    /// Similarity is computed as `1.0 - cosine_distance`.
1164    ///
1165    /// # Arguments
1166    ///
1167    /// * `collective_id` - The collective to search within
1168    /// * `query` - Query embedding vector (must match collective's dimension)
1169    /// * `k` - Maximum number of results to return (1-1000)
1170    ///
1171    /// # Errors
1172    ///
1173    /// - [`ValidationError::InvalidField`] if `k` is 0 or > 1000
1174    /// - [`ValidationError::DimensionMismatch`] if `query.len()` doesn't match
1175    ///   the collective's embedding dimension
1176    /// - [`NotFoundError::Collective`] if the collective doesn't exist
1177    ///
1178    /// # Example
1179    ///
1180    /// ```rust
1181    /// # fn main() -> pulsedb::Result<()> {
1182    /// # let dir = tempfile::tempdir().unwrap();
1183    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
1184    /// # let collective_id = db.create_collective("example")?;
1185    /// let query = vec![0.1f32; 384]; // Your query embedding
1186    /// let results = db.search_similar(collective_id, &query, 10)?;
1187    /// for result in &results {
1188    ///     println!(
1189    ///         "[{:.3}] {}",
1190    ///         result.similarity, result.experience.content
1191    ///     );
1192    /// }
1193    /// # Ok(())
1194    /// # }
1195    /// ```
1196    #[instrument(skip(self, query))]
1197    pub fn search_similar(
1198        &self,
1199        collective_id: CollectiveId,
1200        query: &[f32],
1201        k: usize,
1202    ) -> Result<Vec<SearchResult>> {
1203        self.search_similar_filtered(collective_id, query, k, SearchFilter::default())
1204    }
1205
1206    /// Searches for semantically similar experiences with additional filtering.
1207    ///
1208    /// Like [`search_similar()`](Self::search_similar), but applies additional
1209    /// filters on domain, experience type, importance, confidence, and timestamp.
1210    ///
1211    /// Over-fetches from the HNSW index (2x `k`) to account for entries removed
1212    /// by post-filtering, then truncates to the requested `k`.
1213    ///
1214    /// # Arguments
1215    ///
1216    /// * `collective_id` - The collective to search within
1217    /// * `query` - Query embedding vector (must match collective's dimension)
1218    /// * `k` - Maximum number of results to return (1-1000)
1219    /// * `filter` - Filter criteria to apply after vector search
1220    ///
1221    /// # Errors
1222    ///
1223    /// - [`ValidationError::InvalidField`] if `k` is 0 or > 1000
1224    /// - [`ValidationError::DimensionMismatch`] if `query.len()` doesn't match
1225    ///   the collective's embedding dimension
1226    /// - [`NotFoundError::Collective`] if the collective doesn't exist
1227    ///
1228    /// # Example
1229    ///
1230    /// ```rust
1231    /// # fn main() -> pulsedb::Result<()> {
1232    /// # let dir = tempfile::tempdir().unwrap();
1233    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
1234    /// # let collective_id = db.create_collective("example")?;
1235    /// # let query_embedding = vec![0.1f32; 384];
1236    /// use pulsedb::SearchFilter;
1237    ///
1238    /// let filter = SearchFilter {
1239    ///     domains: Some(vec!["rust".to_string()]),
1240    ///     min_importance: Some(0.5),
1241    ///     ..SearchFilter::default()
1242    /// };
1243    /// let results = db.search_similar_filtered(
1244    ///     collective_id,
1245    ///     &query_embedding,
1246    ///     10,
1247    ///     filter,
1248    /// )?;
1249    /// # Ok(())
1250    /// # }
1251    /// ```
1252    #[instrument(skip(self, query, filter))]
1253    pub fn search_similar_filtered(
1254        &self,
1255        collective_id: CollectiveId,
1256        query: &[f32],
1257        k: usize,
1258        filter: SearchFilter,
1259    ) -> Result<Vec<SearchResult>> {
1260        // Validate k
1261        if k == 0 || k > 1000 {
1262            return Err(ValidationError::invalid_field("k", "must be between 1 and 1000").into());
1263        }
1264
1265        // Verify collective exists and check embedding dimension
1266        let collective = self
1267            .storage
1268            .get_collective(collective_id)?
1269            .ok_or_else(|| PulseDBError::from(NotFoundError::collective(collective_id)))?;
1270
1271        let expected_dim = collective.embedding_dimension as usize;
1272        if query.len() != expected_dim {
1273            return Err(ValidationError::dimension_mismatch(expected_dim, query.len()).into());
1274        }
1275
1276        // Over-fetch from HNSW to compensate for post-filtering losses
1277        let over_fetch = k.saturating_mul(2).min(2000);
1278        let ef_search = self.config.hnsw.ef_search;
1279
1280        // Search HNSW index — returns (ExperienceId, cosine_distance) sorted
1281        // by distance ascending (closest first)
1282        let candidates = self
1283            .with_vector_index(collective_id, |index| {
1284                index.search_experiences(query, over_fetch, ef_search)
1285            })?
1286            .unwrap_or_default();
1287
1288        // Fetch full experiences, apply filter, convert distance → similarity
1289        let mut results = Vec::with_capacity(k);
1290        for (exp_id, distance) in candidates {
1291            if results.len() >= k {
1292                break;
1293            }
1294
1295            if let Some(experience) = self.storage.get_experience(exp_id)? {
1296                if filter.matches(&experience) {
1297                    results.push(SearchResult {
1298                        experience,
1299                        similarity: 1.0 - distance,
1300                    });
1301                }
1302            }
1303        }
1304
1305        Ok(results)
1306    }
1307
1308    // =========================================================================
1309    // Experience Relations (E3-S01)
1310    // =========================================================================
1311
1312    /// Stores a new relation between two experiences.
1313    ///
1314    /// Relations are typed, directed edges connecting a source experience to a
1315    /// target experience. Both experiences must exist and belong to the same
1316    /// collective. Duplicate relations (same source, target, and type) are
1317    /// rejected.
1318    ///
1319    /// # Arguments
1320    ///
1321    /// * `relation` - The relation to create (source, target, type, strength)
1322    ///
1323    /// # Errors
1324    ///
1325    /// Returns an error if:
1326    /// - Source or target experience doesn't exist ([`NotFoundError::Experience`])
1327    /// - Experiences belong to different collectives ([`ValidationError::InvalidField`])
1328    /// - A relation with the same (source, target, type) already exists
1329    /// - Self-relation attempted (source == target)
1330    /// - Strength is out of range `[0.0, 1.0]`
1331    #[instrument(skip(self, relation))]
1332    pub fn store_relation(
1333        &self,
1334        relation: crate::relation::NewExperienceRelation,
1335    ) -> Result<crate::types::RelationId> {
1336        use crate::relation::{validate_new_relation, ExperienceRelation};
1337        use crate::types::RelationId;
1338
1339        // Validate input fields (self-relation, strength bounds, metadata size)
1340        validate_new_relation(&relation)?;
1341
1342        // Load source and target experiences to verify existence
1343        let source = self
1344            .storage
1345            .get_experience(relation.source_id)?
1346            .ok_or_else(|| PulseDBError::from(NotFoundError::experience(relation.source_id)))?;
1347        let target = self
1348            .storage
1349            .get_experience(relation.target_id)?
1350            .ok_or_else(|| PulseDBError::from(NotFoundError::experience(relation.target_id)))?;
1351
1352        // Verify same collective
1353        if source.collective_id != target.collective_id {
1354            return Err(PulseDBError::from(ValidationError::invalid_field(
1355                "target_id",
1356                "source and target experiences must belong to the same collective",
1357            )));
1358        }
1359
1360        // Check for duplicate (same source, target, type)
1361        if self.storage.relation_exists(
1362            relation.source_id,
1363            relation.target_id,
1364            relation.relation_type,
1365        )? {
1366            return Err(PulseDBError::from(ValidationError::invalid_field(
1367                "relation_type",
1368                "a relation with this source, target, and type already exists",
1369            )));
1370        }
1371
1372        // Construct the full relation
1373        let id = RelationId::new();
1374        let full_relation = ExperienceRelation {
1375            id,
1376            source_id: relation.source_id,
1377            target_id: relation.target_id,
1378            relation_type: relation.relation_type,
1379            strength: relation.strength,
1380            metadata: relation.metadata,
1381            created_at: Timestamp::now(),
1382        };
1383
1384        self.storage.save_relation(&full_relation)?;
1385
1386        info!(
1387            id = %id,
1388            source = %relation.source_id,
1389            target = %relation.target_id,
1390            relation_type = ?full_relation.relation_type,
1391            "Relation stored"
1392        );
1393        Ok(id)
1394    }
1395
1396    /// Retrieves experiences related to the given experience.
1397    ///
1398    /// Returns pairs of `(Experience, ExperienceRelation)` based on the
1399    /// requested direction:
1400    /// - `Outgoing`: experiences that this experience points TO (as source)
1401    /// - `Incoming`: experiences that point TO this experience (as target)
1402    /// - `Both`: union of outgoing and incoming
1403    ///
1404    /// To filter by relation type, use
1405    /// [`get_related_experiences_filtered`](Self::get_related_experiences_filtered).
1406    ///
1407    /// Silently skips relations where the related experience no longer exists
1408    /// (orphan tolerance).
1409    ///
1410    /// # Errors
1411    ///
1412    /// Returns a storage error if the read transaction fails.
1413    #[instrument(skip(self))]
1414    pub fn get_related_experiences(
1415        &self,
1416        experience_id: ExperienceId,
1417        direction: crate::relation::RelationDirection,
1418    ) -> Result<Vec<(Experience, crate::relation::ExperienceRelation)>> {
1419        self.get_related_experiences_filtered(experience_id, direction, None)
1420    }
1421
1422    /// Retrieves experiences related to the given experience, with optional
1423    /// type filtering.
1424    ///
1425    /// Like [`get_related_experiences()`](Self::get_related_experiences), but
1426    /// accepts an optional [`RelationType`](crate::RelationType) filter.
1427    /// When `Some(rt)`, only relations matching that type are returned.
1428    ///
1429    /// # Arguments
1430    ///
1431    /// * `experience_id` - The experience to query relations for
1432    /// * `direction` - Which direction(s) to traverse
1433    /// * `relation_type` - If `Some`, only return relations of this type
1434    ///
1435    /// # Example
1436    ///
1437    /// ```rust
1438    /// # fn main() -> pulsedb::Result<()> {
1439    /// # let dir = tempfile::tempdir().unwrap();
1440    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
1441    /// # let cid = db.create_collective("example")?;
1442    /// # let exp_a = db.record_experience(pulsedb::NewExperience {
1443    /// #     collective_id: cid,
1444    /// #     content: "a".into(),
1445    /// #     embedding: Some(vec![0.1f32; 384]),
1446    /// #     ..Default::default()
1447    /// # })?;
1448    /// use pulsedb::{RelationType, RelationDirection};
1449    ///
1450    /// // Only "Supports" relations outgoing from exp_a
1451    /// let supports = db.get_related_experiences_filtered(
1452    ///     exp_a,
1453    ///     RelationDirection::Outgoing,
1454    ///     Some(RelationType::Supports),
1455    /// )?;
1456    /// # Ok(())
1457    /// # }
1458    /// ```
1459    #[instrument(skip(self))]
1460    pub fn get_related_experiences_filtered(
1461        &self,
1462        experience_id: ExperienceId,
1463        direction: crate::relation::RelationDirection,
1464        relation_type: Option<crate::relation::RelationType>,
1465    ) -> Result<Vec<(Experience, crate::relation::ExperienceRelation)>> {
1466        use crate::relation::RelationDirection;
1467
1468        let mut results = Vec::new();
1469
1470        // Outgoing: this experience is the source → fetch target experiences
1471        if matches!(
1472            direction,
1473            RelationDirection::Outgoing | RelationDirection::Both
1474        ) {
1475            let rel_ids = self.storage.get_relation_ids_by_source(experience_id)?;
1476            for rel_id in rel_ids {
1477                if let Some(relation) = self.storage.get_relation(rel_id)? {
1478                    if relation_type.is_some_and(|rt| rt != relation.relation_type) {
1479                        continue;
1480                    }
1481                    if let Some(experience) = self.storage.get_experience(relation.target_id)? {
1482                        results.push((experience, relation));
1483                    }
1484                }
1485            }
1486        }
1487
1488        // Incoming: this experience is the target → fetch source experiences
1489        if matches!(
1490            direction,
1491            RelationDirection::Incoming | RelationDirection::Both
1492        ) {
1493            let rel_ids = self.storage.get_relation_ids_by_target(experience_id)?;
1494            for rel_id in rel_ids {
1495                if let Some(relation) = self.storage.get_relation(rel_id)? {
1496                    if relation_type.is_some_and(|rt| rt != relation.relation_type) {
1497                        continue;
1498                    }
1499                    if let Some(experience) = self.storage.get_experience(relation.source_id)? {
1500                        results.push((experience, relation));
1501                    }
1502                }
1503            }
1504        }
1505
1506        Ok(results)
1507    }
1508
1509    /// Retrieves a relation by ID.
1510    ///
1511    /// Returns `None` if no relation with the given ID exists.
1512    pub fn get_relation(
1513        &self,
1514        id: crate::types::RelationId,
1515    ) -> Result<Option<crate::relation::ExperienceRelation>> {
1516        self.storage.get_relation(id)
1517    }
1518
1519    /// Deletes a relation by ID.
1520    ///
1521    /// # Errors
1522    ///
1523    /// Returns [`NotFoundError::Relation`] if no relation with the given ID exists.
1524    #[instrument(skip(self))]
1525    pub fn delete_relation(&self, id: crate::types::RelationId) -> Result<()> {
1526        let deleted = self.storage.delete_relation(id)?;
1527        if !deleted {
1528            return Err(PulseDBError::from(NotFoundError::relation(id)));
1529        }
1530        info!(id = %id, "Relation deleted");
1531        Ok(())
1532    }
1533
1534    // =========================================================================
1535    // Derived Insights (E3-S02)
1536    // =========================================================================
1537
1538    /// Stores a new derived insight.
1539    ///
1540    /// Creates a synthesized knowledge record from multiple source experiences.
1541    /// The method:
1542    /// 1. Validates the input (content, confidence, sources)
1543    /// 2. Verifies the collective exists
1544    /// 3. Verifies all source experiences exist and belong to the same collective
1545    /// 4. Resolves the embedding (generates if Builtin, requires if External)
1546    /// 5. Stores the insight with inline embedding
1547    /// 6. Inserts into the insight HNSW index
1548    ///
1549    /// # Arguments
1550    ///
1551    /// * `insight` - The insight to store (see [`NewDerivedInsight`])
1552    ///
1553    /// # Errors
1554    ///
1555    /// - [`ValidationError`](crate::ValidationError) if input is invalid
1556    /// - [`NotFoundError::Collective`] if the collective doesn't exist
1557    /// - [`NotFoundError::Experience`] if any source experience doesn't exist
1558    /// - [`ValidationError::InvalidField`] if source experiences belong to
1559    ///   different collectives
1560    /// - [`ValidationError::DimensionMismatch`] if embedding dimension is wrong
1561    #[instrument(skip(self, insight), fields(collective_id = %insight.collective_id))]
1562    pub fn store_insight(&self, insight: NewDerivedInsight) -> Result<InsightId> {
1563        let is_external = matches!(self.config.embedding_provider, EmbeddingProvider::External);
1564
1565        // Validate input fields
1566        validate_new_insight(&insight)?;
1567
1568        // Verify collective exists
1569        let collective = self
1570            .storage
1571            .get_collective(insight.collective_id)?
1572            .ok_or_else(|| PulseDBError::from(NotFoundError::collective(insight.collective_id)))?;
1573
1574        // Verify all source experiences exist and belong to this collective
1575        for source_id in &insight.source_experience_ids {
1576            let source_exp = self
1577                .storage
1578                .get_experience(*source_id)?
1579                .ok_or_else(|| PulseDBError::from(NotFoundError::experience(*source_id)))?;
1580            if source_exp.collective_id != insight.collective_id {
1581                return Err(PulseDBError::from(ValidationError::invalid_field(
1582                    "source_experience_ids",
1583                    format!(
1584                        "experience {} belongs to collective {}, not {}",
1585                        source_id, source_exp.collective_id, insight.collective_id
1586                    ),
1587                )));
1588            }
1589        }
1590
1591        // Resolve embedding
1592        let embedding = match insight.embedding {
1593            Some(ref emb) => {
1594                // Validate dimension
1595                let expected_dim = collective.embedding_dimension as usize;
1596                if emb.len() != expected_dim {
1597                    return Err(ValidationError::dimension_mismatch(expected_dim, emb.len()).into());
1598                }
1599                emb.clone()
1600            }
1601            None => {
1602                if is_external {
1603                    return Err(PulseDBError::embedding(
1604                        "embedding is required when using External embedding provider",
1605                    ));
1606                }
1607                self.embedding.embed(&insight.content)?
1608            }
1609        };
1610
1611        let embedding_for_hnsw = embedding.clone();
1612        let now = Timestamp::now();
1613        let id = InsightId::new();
1614
1615        // Construct the full insight record
1616        let derived_insight = DerivedInsight {
1617            id,
1618            collective_id: insight.collective_id,
1619            content: insight.content,
1620            embedding,
1621            source_experience_ids: insight.source_experience_ids,
1622            insight_type: insight.insight_type,
1623            confidence: insight.confidence,
1624            domain: insight.domain,
1625            created_at: now,
1626            updated_at: now,
1627        };
1628
1629        // Write to redb FIRST (source of truth)
1630        self.storage.save_insight(&derived_insight)?;
1631
1632        // Insert into insight HNSW index (using InsightId→ExperienceId byte conversion)
1633        let exp_id = ExperienceId::from_bytes(*id.as_bytes());
1634        let insight_vectors = self
1635            .insight_vectors
1636            .read()
1637            .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?;
1638        if let Some(index) = insight_vectors.get(&insight.collective_id) {
1639            index.insert_experience(exp_id, &embedding_for_hnsw)?;
1640        }
1641
1642        info!(id = %id, "Insight stored");
1643        Ok(id)
1644    }
1645
1646    /// Retrieves a derived insight by ID.
1647    ///
1648    /// Returns `None` if no insight with the given ID exists.
1649    #[instrument(skip(self))]
1650    pub fn get_insight(&self, id: InsightId) -> Result<Option<DerivedInsight>> {
1651        self.storage.get_insight(id)
1652    }
1653
1654    /// Searches for insights semantically similar to the query embedding.
1655    ///
1656    /// Uses the insight-specific HNSW index for approximate nearest neighbor
1657    /// search, then fetches full insight records from storage.
1658    ///
1659    /// # Arguments
1660    ///
1661    /// * `collective_id` - The collective to search within
1662    /// * `query` - Query embedding vector (must match collective's dimension)
1663    /// * `k` - Maximum number of results to return
1664    ///
1665    /// # Errors
1666    ///
1667    /// - [`ValidationError::DimensionMismatch`] if `query.len()` doesn't match
1668    /// - [`NotFoundError::Collective`] if the collective doesn't exist
1669    #[instrument(skip(self, query))]
1670    pub fn get_insights(
1671        &self,
1672        collective_id: CollectiveId,
1673        query: &[f32],
1674        k: usize,
1675    ) -> Result<Vec<(DerivedInsight, f32)>> {
1676        // Verify collective exists and check embedding dimension
1677        let collective = self
1678            .storage
1679            .get_collective(collective_id)?
1680            .ok_or_else(|| PulseDBError::from(NotFoundError::collective(collective_id)))?;
1681
1682        let expected_dim = collective.embedding_dimension as usize;
1683        if query.len() != expected_dim {
1684            return Err(ValidationError::dimension_mismatch(expected_dim, query.len()).into());
1685        }
1686
1687        let ef_search = self.config.hnsw.ef_search;
1688
1689        // Search insight HNSW — returns (ExperienceId, distance) pairs
1690        let insight_vectors = self
1691            .insight_vectors
1692            .read()
1693            .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?;
1694
1695        let candidates = match insight_vectors.get(&collective_id) {
1696            Some(index) => index.search_experiences(query, k, ef_search)?,
1697            None => return Ok(vec![]),
1698        };
1699        drop(insight_vectors);
1700
1701        // Convert ExperienceId back to InsightId and fetch records
1702        let mut results = Vec::with_capacity(candidates.len());
1703        for (exp_id, distance) in candidates {
1704            let insight_id = InsightId::from_bytes(*exp_id.as_bytes());
1705            if let Some(insight) = self.storage.get_insight(insight_id)? {
1706                // Convert HNSW distance to similarity (1.0 - distance), matching search_similar pattern
1707                results.push((insight, 1.0 - distance));
1708            }
1709        }
1710
1711        Ok(results)
1712    }
1713
1714    /// Deletes a derived insight by ID.
1715    ///
1716    /// Removes the insight from storage and soft-deletes it from the HNSW index.
1717    ///
1718    /// # Errors
1719    ///
1720    /// Returns [`NotFoundError::Insight`] if no insight with the given ID exists.
1721    #[instrument(skip(self))]
1722    pub fn delete_insight(&self, id: InsightId) -> Result<()> {
1723        // Read insight first to get collective_id for HNSW lookup
1724        let insight = self
1725            .storage
1726            .get_insight(id)?
1727            .ok_or_else(|| PulseDBError::from(NotFoundError::insight(id)))?;
1728
1729        // Delete from redb FIRST (source of truth)
1730        self.storage.delete_insight(id)?;
1731
1732        // Soft-delete from insight HNSW (using InsightId→ExperienceId byte conversion)
1733        let exp_id = ExperienceId::from_bytes(*id.as_bytes());
1734        let insight_vectors = self
1735            .insight_vectors
1736            .read()
1737            .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?;
1738        if let Some(index) = insight_vectors.get(&insight.collective_id) {
1739            index.delete_experience(exp_id)?;
1740        }
1741
1742        info!(id = %id, "Insight deleted");
1743        Ok(())
1744    }
1745
1746    // =========================================================================
1747    // Activity Tracking (E3-S03)
1748    // =========================================================================
1749
1750    /// Registers an agent's presence in a collective.
1751    ///
1752    /// Creates a new activity record or replaces an existing one for the
1753    /// same `(collective_id, agent_id)` pair (upsert semantics). Both
1754    /// `started_at` and `last_heartbeat` are set to `Timestamp::now()`.
1755    ///
1756    /// # Arguments
1757    ///
1758    /// * `activity` - The activity registration (see [`NewActivity`])
1759    ///
1760    /// # Errors
1761    ///
1762    /// - [`ValidationError`] if agent_id is empty or fields exceed size limits
1763    /// - [`NotFoundError::Collective`] if the collective doesn't exist
1764    ///
1765    /// # Example
1766    ///
1767    /// ```rust
1768    /// # fn main() -> pulsedb::Result<()> {
1769    /// # let dir = tempfile::tempdir().unwrap();
1770    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
1771    /// # let collective_id = db.create_collective("example")?;
1772    /// use pulsedb::NewActivity;
1773    ///
1774    /// db.register_activity(NewActivity {
1775    ///     agent_id: "claude-opus".to_string(),
1776    ///     collective_id,
1777    ///     current_task: Some("Reviewing pull request".to_string()),
1778    ///     context_summary: None,
1779    /// })?;
1780    /// # Ok(())
1781    /// # }
1782    /// ```
1783    #[instrument(skip(self, activity), fields(agent_id = %activity.agent_id, collective_id = %activity.collective_id))]
1784    pub fn register_activity(&self, activity: NewActivity) -> Result<()> {
1785        // Validate input
1786        validate_new_activity(&activity)?;
1787
1788        // Verify collective exists
1789        self.storage
1790            .get_collective(activity.collective_id)?
1791            .ok_or_else(|| PulseDBError::from(NotFoundError::collective(activity.collective_id)))?;
1792
1793        // Build stored activity with timestamps
1794        let now = Timestamp::now();
1795        let stored = Activity {
1796            agent_id: activity.agent_id,
1797            collective_id: activity.collective_id,
1798            current_task: activity.current_task,
1799            context_summary: activity.context_summary,
1800            started_at: now,
1801            last_heartbeat: now,
1802        };
1803
1804        self.storage.save_activity(&stored)?;
1805
1806        info!(
1807            agent_id = %stored.agent_id,
1808            collective_id = %stored.collective_id,
1809            "Activity registered"
1810        );
1811        Ok(())
1812    }
1813
1814    /// Updates an agent's heartbeat timestamp.
1815    ///
1816    /// Refreshes the `last_heartbeat` to `Timestamp::now()` without changing
1817    /// any other fields. The agent must have an existing activity registered.
1818    ///
1819    /// # Errors
1820    ///
1821    /// - [`NotFoundError::Activity`] if no activity exists for the agent/collective pair
1822    #[instrument(skip(self))]
1823    pub fn update_heartbeat(&self, agent_id: &str, collective_id: CollectiveId) -> Result<()> {
1824        let mut activity = self
1825            .storage
1826            .get_activity(agent_id, collective_id)?
1827            .ok_or_else(|| {
1828                PulseDBError::from(NotFoundError::activity(format!(
1829                    "{} in {}",
1830                    agent_id, collective_id
1831                )))
1832            })?;
1833
1834        activity.last_heartbeat = Timestamp::now();
1835        self.storage.save_activity(&activity)?;
1836
1837        info!(agent_id = %agent_id, collective_id = %collective_id, "Heartbeat updated");
1838        Ok(())
1839    }
1840
1841    /// Ends an agent's activity in a collective.
1842    ///
1843    /// Removes the activity record. After calling this, the agent will no
1844    /// longer appear in `get_active_agents()` results.
1845    ///
1846    /// # Errors
1847    ///
1848    /// - [`NotFoundError::Activity`] if no activity exists for the agent/collective pair
1849    #[instrument(skip(self))]
1850    pub fn end_activity(&self, agent_id: &str, collective_id: CollectiveId) -> Result<()> {
1851        let deleted = self.storage.delete_activity(agent_id, collective_id)?;
1852
1853        if !deleted {
1854            return Err(PulseDBError::from(NotFoundError::activity(format!(
1855                "{} in {}",
1856                agent_id, collective_id
1857            ))));
1858        }
1859
1860        info!(agent_id = %agent_id, collective_id = %collective_id, "Activity ended");
1861        Ok(())
1862    }
1863
1864    /// Returns all active (non-stale) agents in a collective.
1865    ///
1866    /// Fetches all activities, filters out those whose `last_heartbeat` is
1867    /// older than `config.activity.stale_threshold`, and returns the rest
1868    /// sorted by `last_heartbeat` descending (most recently active first).
1869    ///
1870    /// # Errors
1871    ///
1872    /// - [`NotFoundError::Collective`] if the collective doesn't exist
1873    #[instrument(skip(self))]
1874    pub fn get_active_agents(&self, collective_id: CollectiveId) -> Result<Vec<Activity>> {
1875        // Verify collective exists
1876        self.storage
1877            .get_collective(collective_id)?
1878            .ok_or_else(|| PulseDBError::from(NotFoundError::collective(collective_id)))?;
1879
1880        let all_activities = self.storage.list_activities_in_collective(collective_id)?;
1881
1882        // Filter stale activities
1883        let now = Timestamp::now();
1884        let threshold_ms = self.config.activity.stale_threshold.as_millis() as i64;
1885        let cutoff = now.as_millis() - threshold_ms;
1886
1887        let mut active: Vec<Activity> = all_activities
1888            .into_iter()
1889            .filter(|a| a.last_heartbeat.as_millis() >= cutoff)
1890            .collect();
1891
1892        // Sort by last_heartbeat descending (most recently active first)
1893        active.sort_by(|a, b| b.last_heartbeat.cmp(&a.last_heartbeat));
1894
1895        Ok(active)
1896    }
1897
1898    // =========================================================================
1899    // Context Candidates (E2-S04)
1900    // =========================================================================
1901
1902    /// Retrieves unified context candidates from all retrieval primitives.
1903    ///
1904    /// This is the primary API for context assembly. It orchestrates:
1905    /// 1. Similarity search ([`search_similar_filtered`](Self::search_similar_filtered))
1906    /// 2. Recent experiences ([`get_recent_experiences_filtered`](Self::get_recent_experiences_filtered))
1907    /// 3. Insight search ([`get_insights`](Self::get_insights)) — if requested
1908    /// 4. Relation collection ([`get_related_experiences`](Self::get_related_experiences)) — if requested
1909    /// 5. Active agents ([`get_active_agents`](Self::get_active_agents)) — if requested
1910    ///
1911    /// # Arguments
1912    ///
1913    /// * `request` - Configuration for which primitives to query and limits
1914    ///
1915    /// # Errors
1916    ///
1917    /// - [`ValidationError::InvalidField`] if `max_similar` or `max_recent` is 0 or > 1000
1918    /// - [`ValidationError::DimensionMismatch`] if `query_embedding.len()` doesn't match
1919    ///   the collective's embedding dimension
1920    /// - [`NotFoundError::Collective`] if the collective doesn't exist
1921    ///
1922    /// # Performance
1923    ///
1924    /// Target: < 100ms at 100K experiences. The similarity search (~50ms) dominates;
1925    /// all other sub-calls are < 10ms each.
1926    ///
1927    /// # Example
1928    ///
1929    /// ```rust
1930    /// # fn main() -> pulsedb::Result<()> {
1931    /// # let dir = tempfile::tempdir().unwrap();
1932    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
1933    /// # let collective_id = db.create_collective("example")?;
1934    /// # let query_vec = vec![0.1f32; 384];
1935    /// use pulsedb::{ContextRequest, SearchFilter};
1936    ///
1937    /// let candidates = db.get_context_candidates(ContextRequest {
1938    ///     collective_id,
1939    ///     query_embedding: query_vec,
1940    ///     max_similar: 10,
1941    ///     max_recent: 5,
1942    ///     include_insights: true,
1943    ///     include_relations: true,
1944    ///     include_active_agents: true,
1945    ///     filter: SearchFilter {
1946    ///         domains: Some(vec!["rust".to_string()]),
1947    ///         ..SearchFilter::default()
1948    ///     },
1949    ///     ..ContextRequest::default()
1950    /// })?;
1951    /// # Ok(())
1952    /// # }
1953    /// ```
1954    #[instrument(skip(self, request), fields(collective_id = %request.collective_id))]
1955    pub fn get_context_candidates(&self, request: ContextRequest) -> Result<ContextCandidates> {
1956        // ── Validate limits ──────────────────────────────────────
1957        if request.max_similar == 0 || request.max_similar > 1000 {
1958            return Err(ValidationError::invalid_field(
1959                "max_similar",
1960                "must be between 1 and 1000",
1961            )
1962            .into());
1963        }
1964        if request.max_recent == 0 || request.max_recent > 1000 {
1965            return Err(
1966                ValidationError::invalid_field("max_recent", "must be between 1 and 1000").into(),
1967            );
1968        }
1969
1970        // ── Verify collective exists and check dimension ─────────
1971        let collective = self
1972            .storage
1973            .get_collective(request.collective_id)?
1974            .ok_or_else(|| PulseDBError::from(NotFoundError::collective(request.collective_id)))?;
1975
1976        let expected_dim = collective.embedding_dimension as usize;
1977        if request.query_embedding.len() != expected_dim {
1978            return Err(ValidationError::dimension_mismatch(
1979                expected_dim,
1980                request.query_embedding.len(),
1981            )
1982            .into());
1983        }
1984
1985        // ── 1. Similar experiences (HNSW vector search) ──────────
1986        let similar_experiences = self.search_similar_filtered(
1987            request.collective_id,
1988            &request.query_embedding,
1989            request.max_similar,
1990            request.filter.clone(),
1991        )?;
1992
1993        // ── 2. Recent experiences (timestamp index scan) ─────────
1994        let recent_experiences = self.get_recent_experiences_filtered(
1995            request.collective_id,
1996            request.max_recent,
1997            request.filter,
1998        )?;
1999
2000        // ── 3. Insights (HNSW vector search on insight index) ────
2001        let insights = if request.include_insights {
2002            self.get_insights(
2003                request.collective_id,
2004                &request.query_embedding,
2005                request.max_similar,
2006            )?
2007            .into_iter()
2008            .map(|(insight, _score)| insight)
2009            .collect()
2010        } else {
2011            vec![]
2012        };
2013
2014        // ── 4. Relations (graph traversal from result experiences) ─
2015        let relations = if request.include_relations {
2016            use std::collections::HashSet;
2017
2018            let mut seen = HashSet::new();
2019            let mut all_relations = Vec::new();
2020
2021            // Collect unique experience IDs from both result sets
2022            let exp_ids: Vec<_> = similar_experiences
2023                .iter()
2024                .map(|r| r.experience.id)
2025                .chain(recent_experiences.iter().map(|e| e.id))
2026                .collect();
2027
2028            for exp_id in exp_ids {
2029                let related =
2030                    self.get_related_experiences(exp_id, crate::relation::RelationDirection::Both)?;
2031
2032                for (_experience, relation) in related {
2033                    if seen.insert(relation.id) {
2034                        all_relations.push(relation);
2035                    }
2036                }
2037            }
2038
2039            all_relations
2040        } else {
2041            vec![]
2042        };
2043
2044        // ── 5. Active agents (staleness-filtered activity records) ─
2045        let active_agents = if request.include_active_agents {
2046            self.get_active_agents(request.collective_id)?
2047        } else {
2048            vec![]
2049        };
2050
2051        Ok(ContextCandidates {
2052            similar_experiences,
2053            recent_experiences,
2054            insights,
2055            relations,
2056            active_agents,
2057        })
2058    }
2059
2060    // =========================================================================
2061    // Watch System (E4-S01)
2062    // =========================================================================
2063
2064    /// Subscribes to all experience changes in a collective.
2065    ///
2066    /// Returns a [`WatchStream`] that yields [`WatchEvent`] values for every
2067    /// create, update, archive, and delete operation. The stream ends when
2068    /// dropped or when the `PulseDB` instance is closed.
2069    ///
2070    /// Multiple subscribers per collective are supported. Each gets an
2071    /// independent copy of every event.
2072    ///
2073    /// # Example
2074    ///
2075    /// ```rust,no_run
2076    /// # #[tokio::main]
2077    /// # async fn main() -> pulsedb::Result<()> {
2078    /// # let dir = tempfile::tempdir().unwrap();
2079    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
2080    /// # let collective_id = db.create_collective("example")?;
2081    /// use futures::StreamExt;
2082    ///
2083    /// let mut stream = db.watch_experiences(collective_id)?;
2084    /// while let Some(event) = stream.next().await {
2085    ///     println!("{:?}: {}", event.event_type, event.experience_id);
2086    /// }
2087    /// # Ok(())
2088    /// # }
2089    /// ```
2090    pub fn watch_experiences(&self, collective_id: CollectiveId) -> Result<WatchStream> {
2091        self.watch.subscribe(collective_id, None)
2092    }
2093
2094    /// Subscribes to filtered experience changes in a collective.
2095    ///
2096    /// Like [`watch_experiences`](Self::watch_experiences), but only delivers
2097    /// events that match the filter criteria. Filters are applied on the
2098    /// sender side before channel delivery.
2099    ///
2100    /// # Example
2101    ///
2102    /// ```rust
2103    /// # fn main() -> pulsedb::Result<()> {
2104    /// # let dir = tempfile::tempdir().unwrap();
2105    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
2106    /// # let collective_id = db.create_collective("example")?;
2107    /// use pulsedb::WatchFilter;
2108    ///
2109    /// let filter = WatchFilter {
2110    ///     domains: Some(vec!["security".to_string()]),
2111    ///     min_importance: Some(0.7),
2112    ///     ..Default::default()
2113    /// };
2114    /// let mut stream = db.watch_experiences_filtered(collective_id, filter)?;
2115    /// # Ok(())
2116    /// # }
2117    /// ```
2118    pub fn watch_experiences_filtered(
2119        &self,
2120        collective_id: CollectiveId,
2121        filter: WatchFilter,
2122    ) -> Result<WatchStream> {
2123        self.watch.subscribe(collective_id, Some(filter))
2124    }
2125
2126    // =========================================================================
2127    // Cross-Process Watch (E4-S02)
2128    // =========================================================================
2129
2130    /// Returns the current WAL sequence number.
2131    ///
2132    /// Use this to establish a baseline before starting to poll for changes.
2133    /// Returns 0 if no experience writes have occurred yet.
2134    ///
2135    /// # Example
2136    ///
2137    /// ```rust
2138    /// # fn main() -> pulsedb::Result<()> {
2139    /// # let dir = tempfile::tempdir().unwrap();
2140    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
2141    /// let seq = db.get_current_sequence()?;
2142    /// // ... later ...
2143    /// let (events, new_seq) = db.poll_changes(seq)?;
2144    /// # Ok(())
2145    /// # }
2146    /// ```
2147    pub fn get_current_sequence(&self) -> Result<u64> {
2148        self.storage.get_wal_sequence()
2149    }
2150
2151    /// Polls for experience changes since the given sequence number.
2152    ///
2153    /// Returns a tuple of `(events, new_sequence)`:
2154    /// - `events`: New [`WatchEvent`]s in sequence order
2155    /// - `new_sequence`: Pass this value back on the next call
2156    ///
2157    /// Returns an empty vec and the same sequence if no changes exist.
2158    ///
2159    /// # Arguments
2160    ///
2161    /// * `since_seq` - The last sequence number you received (0 for first call)
2162    ///
2163    /// # Performance
2164    ///
2165    /// Target: < 10ms per call. Internally performs a range scan on the
2166    /// watch_events table, O(k) where k is the number of new events.
2167    ///
2168    /// # Example
2169    ///
2170    /// ```rust,no_run
2171    /// # fn main() -> pulsedb::Result<()> {
2172    /// # let dir = tempfile::tempdir().unwrap();
2173    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
2174    /// use std::time::Duration;
2175    ///
2176    /// let mut seq = 0u64;
2177    /// loop {
2178    ///     let (events, new_seq) = db.poll_changes(seq)?;
2179    ///     seq = new_seq;
2180    ///     for event in events {
2181    ///         println!("{:?}: {}", event.event_type, event.experience_id);
2182    ///     }
2183    ///     std::thread::sleep(Duration::from_millis(100));
2184    /// }
2185    /// # }
2186    /// ```
2187    pub fn poll_changes(&self, since_seq: u64) -> Result<(Vec<WatchEvent>, u64)> {
2188        use crate::storage::schema::EntityTypeTag;
2189        let (records, new_seq) = self.storage.poll_watch_events(since_seq, 1000)?;
2190        let events = records
2191            .into_iter()
2192            .filter(|r| r.entity_type == EntityTypeTag::Experience)
2193            .map(WatchEvent::from)
2194            .collect();
2195        Ok((events, new_seq))
2196    }
2197
2198    /// Polls for changes with a custom batch size limit.
2199    ///
2200    /// Same as [`poll_changes`](Self::poll_changes) but returns at most
2201    /// `limit` events per call. Use this for backpressure control.
2202    pub fn poll_changes_batch(
2203        &self,
2204        since_seq: u64,
2205        limit: usize,
2206    ) -> Result<(Vec<WatchEvent>, u64)> {
2207        use crate::storage::schema::EntityTypeTag;
2208        let (records, new_seq) = self.storage.poll_watch_events(since_seq, limit)?;
2209        let events = records
2210            .into_iter()
2211            .filter(|r| r.entity_type == EntityTypeTag::Experience)
2212            .map(WatchEvent::from)
2213            .collect();
2214        Ok((events, new_seq))
2215    }
2216
2217    // =========================================================================
2218    // Sync WAL Compaction (feature: sync)
2219    // =========================================================================
2220
2221    /// Compacts the WAL by removing events that all peers have already synced.
2222    ///
2223    /// Finds the minimum cursor across all known peers and deletes WAL events
2224    /// up to that sequence. If no peers exist, no compaction occurs (events
2225    /// may be needed when a peer connects later).
2226    ///
2227    /// Call this periodically (e.g., daily) to reclaim disk space.
2228    /// Returns the number of WAL events deleted.
2229    ///
2230    /// # Example
2231    ///
2232    /// ```rust,no_run
2233    /// # fn main() -> pulsedb::Result<()> {
2234    /// # let dir = tempfile::tempdir().unwrap();
2235    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
2236    /// let deleted = db.compact_wal()?;
2237    /// println!("Compacted {} WAL events", deleted);
2238    /// # Ok(())
2239    /// # }
2240    /// ```
2241    #[cfg(feature = "sync")]
2242    pub fn compact_wal(&self) -> Result<u64> {
2243        let cursors = self
2244            .storage
2245            .list_sync_cursors()
2246            .map_err(|e| PulseDBError::internal(format!("Failed to list sync cursors: {}", e)))?;
2247
2248        if cursors.is_empty() {
2249            // No peers — don't compact (events may be needed later)
2250            return Ok(0);
2251        }
2252
2253        let min_seq = cursors.iter().map(|c| c.last_sequence).min().unwrap_or(0);
2254
2255        if min_seq == 0 {
2256            return Ok(0);
2257        }
2258
2259        let deleted = self.storage.compact_wal_events(min_seq)?;
2260        info!(deleted, min_seq, "WAL compacted");
2261        Ok(deleted)
2262    }
2263
2264    // =========================================================================
2265    // Sync Apply Methods (feature: sync)
2266    // =========================================================================
2267    //
2268    // These methods apply remote changes received via sync. They bypass
2269    // validation and embedding generation (data was validated on the source).
2270    // WAL recording is suppressed by the SyncApplyGuard (entered by the caller).
2271    // Watch emit is skipped (no in-process notifications for sync changes).
2272    //
2273    // These are pub(crate) and will be called by the sync applier in Phase 3.
2274
2275    /// Applies a synced experience from a remote peer.
2276    ///
2277    /// Writes the full experience to storage and inserts into HNSW.
2278    /// Caller must hold `SyncApplyGuard` to suppress WAL recording.
2279    #[cfg(feature = "sync")]
2280    #[allow(dead_code)] // Called by sync applier (Phase 3)
2281    pub fn apply_synced_experience(&self, experience: Experience) -> Result<()> {
2282        let collective_id = experience.collective_id;
2283        let id = experience.id;
2284        let embedding = experience.embedding.clone();
2285
2286        self.storage.save_experience(&experience)?;
2287
2288        // Insert into HNSW index
2289        let vectors = self
2290            .vectors
2291            .read()
2292            .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?;
2293        if let Some(index) = vectors.get(&collective_id) {
2294            index.insert_experience(id, &embedding)?;
2295        }
2296
2297        debug!(id = %id, "Synced experience applied");
2298        Ok(())
2299    }
2300
2301    /// Applies a synced experience update from a remote peer.
2302    ///
2303    /// Caller must hold `SyncApplyGuard` to suppress WAL recording.
2304    #[cfg(feature = "sync")]
2305    #[allow(dead_code)] // Called by sync applier (Phase 3)
2306    pub fn apply_synced_experience_update(
2307        &self,
2308        id: ExperienceId,
2309        update: ExperienceUpdate,
2310    ) -> Result<()> {
2311        self.storage.update_experience(id, &update)?;
2312        debug!(id = %id, "Synced experience update applied");
2313        Ok(())
2314    }
2315
2316    /// Applies a synced experience deletion from a remote peer.
2317    ///
2318    /// Removes from storage and soft-deletes from HNSW.
2319    /// Caller must hold `SyncApplyGuard` to suppress WAL recording.
2320    #[cfg(feature = "sync")]
2321    #[allow(dead_code)] // Called by sync applier (Phase 3)
2322    pub fn apply_synced_experience_delete(&self, id: ExperienceId) -> Result<()> {
2323        // Get collective_id for HNSW lookup before deleting
2324        if let Some(exp) = self.storage.get_experience(id)? {
2325            let collective_id = exp.collective_id;
2326
2327            // Cascade delete relations
2328            self.storage.delete_relations_for_experience(id)?;
2329
2330            self.storage.delete_experience(id)?;
2331
2332            // Soft-delete from HNSW
2333            let vectors = self
2334                .vectors
2335                .read()
2336                .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?;
2337            if let Some(index) = vectors.get(&collective_id) {
2338                index.delete_experience(id)?;
2339            }
2340        }
2341
2342        debug!(id = %id, "Synced experience delete applied");
2343        Ok(())
2344    }
2345
2346    /// Applies a synced relation from a remote peer.
2347    ///
2348    /// Caller must hold `SyncApplyGuard` to suppress WAL recording.
2349    #[cfg(feature = "sync")]
2350    #[allow(dead_code)] // Called by sync applier (Phase 3)
2351    pub fn apply_synced_relation(&self, relation: ExperienceRelation) -> Result<()> {
2352        let id = relation.id;
2353        self.storage.save_relation(&relation)?;
2354        debug!(id = %id, "Synced relation applied");
2355        Ok(())
2356    }
2357
2358    /// Applies a synced relation deletion from a remote peer.
2359    ///
2360    /// Caller must hold `SyncApplyGuard` to suppress WAL recording.
2361    #[cfg(feature = "sync")]
2362    #[allow(dead_code)] // Called by sync applier (Phase 3)
2363    pub fn apply_synced_relation_delete(&self, id: RelationId) -> Result<()> {
2364        self.storage.delete_relation(id)?;
2365        debug!(id = %id, "Synced relation delete applied");
2366        Ok(())
2367    }
2368
2369    /// Applies a synced insight from a remote peer.
2370    ///
2371    /// Writes to storage and inserts into insight HNSW index.
2372    /// Caller must hold `SyncApplyGuard` to suppress WAL recording.
2373    #[cfg(feature = "sync")]
2374    #[allow(dead_code)] // Called by sync applier (Phase 3)
2375    pub fn apply_synced_insight(&self, insight: DerivedInsight) -> Result<()> {
2376        let id = insight.id;
2377        let collective_id = insight.collective_id;
2378        let embedding = insight.embedding.clone();
2379
2380        self.storage.save_insight(&insight)?;
2381
2382        // Insert into insight HNSW (using InsightId→ExperienceId byte conversion)
2383        let exp_id = ExperienceId::from_bytes(*id.as_bytes());
2384        let insight_vectors = self
2385            .insight_vectors
2386            .read()
2387            .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?;
2388        if let Some(index) = insight_vectors.get(&collective_id) {
2389            index.insert_experience(exp_id, &embedding)?;
2390        }
2391
2392        debug!(id = %id, "Synced insight applied");
2393        Ok(())
2394    }
2395
2396    /// Applies a synced insight deletion from a remote peer.
2397    ///
2398    /// Removes from storage and soft-deletes from insight HNSW.
2399    /// Caller must hold `SyncApplyGuard` to suppress WAL recording.
2400    #[cfg(feature = "sync")]
2401    #[allow(dead_code)] // Called by sync applier (Phase 3)
2402    pub fn apply_synced_insight_delete(&self, id: InsightId) -> Result<()> {
2403        if let Some(insight) = self.storage.get_insight(id)? {
2404            self.storage.delete_insight(id)?;
2405
2406            // Soft-delete from insight HNSW
2407            let exp_id = ExperienceId::from_bytes(*id.as_bytes());
2408            let insight_vectors = self
2409                .insight_vectors
2410                .read()
2411                .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?;
2412            if let Some(index) = insight_vectors.get(&insight.collective_id) {
2413                index.delete_experience(exp_id)?;
2414            }
2415        }
2416
2417        debug!(id = %id, "Synced insight delete applied");
2418        Ok(())
2419    }
2420
2421    /// Applies a synced collective from a remote peer.
2422    ///
2423    /// Writes to storage and creates HNSW indexes for the collective.
2424    /// Caller must hold `SyncApplyGuard` to suppress WAL recording.
2425    #[cfg(feature = "sync")]
2426    #[allow(dead_code)] // Called by sync applier (Phase 3)
2427    pub fn apply_synced_collective(&self, collective: Collective) -> Result<()> {
2428        let id = collective.id;
2429        let dimension = collective.embedding_dimension as usize;
2430
2431        self.storage.save_collective(&collective)?;
2432
2433        // Create HNSW indexes (same as create_collective)
2434        let exp_index = crate::vector::HnswIndex::new(dimension, &self.config.hnsw);
2435        let insight_index = crate::vector::HnswIndex::new(dimension, &self.config.hnsw);
2436        self.vectors
2437            .write()
2438            .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?
2439            .insert(id, exp_index);
2440        self.insight_vectors
2441            .write()
2442            .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?
2443            .insert(id, insight_index);
2444
2445        debug!(id = %id, "Synced collective applied");
2446        Ok(())
2447    }
2448}
2449
2450// PulseDB is auto Send + Sync: Box<dyn StorageEngine + Send + Sync>,
2451// Box<dyn EmbeddingService + Send + Sync>, and Config are all Send + Sync.
2452
2453#[cfg(test)]
2454mod tests {
2455    use super::*;
2456    use crate::config::EmbeddingDimension;
2457    use tempfile::tempdir;
2458
2459    #[test]
2460    fn test_open_creates_database() {
2461        let dir = tempdir().unwrap();
2462        let path = dir.path().join("test.db");
2463
2464        let db = PulseDB::open(&path, Config::default()).unwrap();
2465
2466        assert!(path.exists());
2467        assert_eq!(db.embedding_dimension(), 384);
2468
2469        db.close().unwrap();
2470    }
2471
2472    #[test]
2473    fn test_open_existing_database() {
2474        let dir = tempdir().unwrap();
2475        let path = dir.path().join("test.db");
2476
2477        // Create
2478        let db = PulseDB::open(&path, Config::default()).unwrap();
2479        db.close().unwrap();
2480
2481        // Reopen
2482        let db = PulseDB::open(&path, Config::default()).unwrap();
2483        assert_eq!(db.embedding_dimension(), 384);
2484        db.close().unwrap();
2485    }
2486
2487    #[test]
2488    fn test_config_validation() {
2489        let dir = tempdir().unwrap();
2490        let path = dir.path().join("test.db");
2491
2492        let invalid_config = Config {
2493            cache_size_mb: 0, // Invalid
2494            ..Default::default()
2495        };
2496
2497        let result = PulseDB::open(&path, invalid_config);
2498        assert!(result.is_err());
2499    }
2500
2501    #[test]
2502    fn test_dimension_mismatch() {
2503        let dir = tempdir().unwrap();
2504        let path = dir.path().join("test.db");
2505
2506        // Create with D384
2507        let db = PulseDB::open(
2508            &path,
2509            Config {
2510                embedding_dimension: EmbeddingDimension::D384,
2511                ..Default::default()
2512            },
2513        )
2514        .unwrap();
2515        db.close().unwrap();
2516
2517        // Try to reopen with D768
2518        let result = PulseDB::open(
2519            &path,
2520            Config {
2521                embedding_dimension: EmbeddingDimension::D768,
2522                ..Default::default()
2523            },
2524        );
2525
2526        assert!(result.is_err());
2527    }
2528
2529    #[test]
2530    fn test_metadata_access() {
2531        let dir = tempdir().unwrap();
2532        let path = dir.path().join("test.db");
2533
2534        let db = PulseDB::open(&path, Config::default()).unwrap();
2535
2536        let metadata = db.metadata();
2537        assert_eq!(metadata.embedding_dimension, EmbeddingDimension::D384);
2538
2539        db.close().unwrap();
2540    }
2541
2542    #[test]
2543    fn test_pulsedb_is_send_sync() {
2544        fn assert_send_sync<T: Send + Sync>() {}
2545        assert_send_sync::<PulseDB>();
2546    }
2547}