Skip to main content

pulsedb/
db.rs

1//! PulseDB main struct and lifecycle operations.
2//!
3//! The [`PulseDB`] struct is the primary interface for interacting with
4//! the database. It provides methods for:
5//!
6//! - Opening and closing the database
7//! - Managing collectives (isolation units)
8//! - Recording and querying experiences
9//! - Semantic search and context retrieval
10//!
11//! # Quick Start
12//!
13//! ```rust
14//! # fn main() -> pulsedb::Result<()> {
15//! # let dir = tempfile::tempdir().unwrap();
16//! use pulsedb::{PulseDB, Config, NewExperience};
17//!
18//! // Open or create a database
19//! let db = PulseDB::open(dir.path().join("test.db"), Config::default())?;
20//!
21//! // Create a collective for your project
22//! let collective = db.create_collective("my-project")?;
23//!
24//! // Record an experience
25//! db.record_experience(NewExperience {
26//!     collective_id: collective,
27//!     content: "Always validate user input".to_string(),
28//!     embedding: Some(vec![0.1f32; 384]),
29//!     ..Default::default()
30//! })?;
31//!
32//! // Close when done
33//! db.close()?;
34//! # Ok(())
35//! # }
36//! ```
37//!
38//! # Thread Safety
39//!
40//! `PulseDB` is `Send + Sync` and can be shared across threads using `Arc`.
41//! The underlying storage uses MVCC for concurrent reads with exclusive
42//! write locking.
43//!
44//! ```rust
45//! # fn main() -> pulsedb::Result<()> {
46//! # let dir = tempfile::tempdir().unwrap();
47//! use std::sync::Arc;
48//! use pulsedb::{PulseDB, Config};
49//!
50//! let db = Arc::new(PulseDB::open(dir.path().join("test.db"), Config::default())?);
51//!
52//! // Clone Arc for use in another thread
53//! let db_clone = Arc::clone(&db);
54//! std::thread::spawn(move || {
55//!     // Safe to use db_clone here
56//! });
57//! # Ok(())
58//! # }
59//! ```
60
61use std::collections::HashMap;
62use std::path::{Path, PathBuf};
63use std::sync::{Arc, RwLock};
64
65#[cfg(feature = "sync")]
66use tracing::debug;
67use tracing::{info, instrument, warn};
68
69use crate::activity::{validate_new_activity, Activity, NewActivity};
70use crate::collective::types::CollectiveStats;
71use crate::collective::{validate_collective_name, Collective};
72use crate::config::{Config, EmbeddingProvider};
73use crate::embedding::{create_embedding_service, EmbeddingService};
74use crate::error::{NotFoundError, PulseDBError, Result, ValidationError};
75use crate::experience::{
76    validate_experience_update, validate_new_experience, Experience, ExperienceUpdate,
77    NewExperience,
78};
79use crate::insight::{validate_new_insight, DerivedInsight, NewDerivedInsight};
80#[cfg(feature = "sync")]
81use crate::relation::ExperienceRelation;
82use crate::search::{ContextCandidates, ContextRequest, SearchFilter, SearchResult};
83use crate::storage::{open_storage, DatabaseMetadata, StorageEngine};
84#[cfg(feature = "sync")]
85use crate::types::RelationId;
86use crate::types::{CollectiveId, ExperienceId, InsightId, Timestamp};
87use crate::vector::HnswIndex;
88use crate::watch::{WatchEvent, WatchEventType, WatchFilter, WatchService, WatchStream};
89
90/// The main PulseDB database handle.
91///
92/// This is the primary interface for all database operations. Create an
93/// instance with [`PulseDB::open()`] and close it with [`PulseDB::close()`].
94///
95/// # Ownership
96///
97/// `PulseDB` owns its storage and embedding service. When you call `close()`,
98/// the database is consumed and cannot be used afterward. This ensures
99/// resources are properly released.
100pub struct PulseDB {
101    /// Storage engine (redb or mock for testing).
102    storage: Box<dyn StorageEngine>,
103
104    /// Embedding service (external or ONNX).
105    embedding: Box<dyn EmbeddingService>,
106
107    /// Configuration used to open this database.
108    config: Config,
109
110    /// Per-collective HNSW vector indexes for experience semantic search.
111    ///
112    /// Outer RwLock protects the HashMap (add/remove collectives).
113    /// Each HnswIndex has its own internal RwLock for concurrent search+insert.
114    vectors: RwLock<HashMap<CollectiveId, HnswIndex>>,
115
116    /// Per-collective HNSW vector indexes for insight semantic search.
117    ///
118    /// Separate from `vectors` to prevent ID collisions between experiences
119    /// and insights. Uses InsightId→ExperienceId byte conversion for the
120    /// HNSW API (safe because indexes are isolated per collective).
121    insight_vectors: RwLock<HashMap<CollectiveId, HnswIndex>>,
122
123    /// Watch service for real-time experience change notifications.
124    ///
125    /// Arc-wrapped because [`WatchStream`] holds a weak reference for
126    /// cleanup on drop.
127    watch: Arc<WatchService>,
128}
129
130impl std::fmt::Debug for PulseDB {
131    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132        let vector_count = self.vectors.read().map(|v| v.len()).unwrap_or(0);
133        let insight_vector_count = self.insight_vectors.read().map(|v| v.len()).unwrap_or(0);
134        f.debug_struct("PulseDB")
135            .field("config", &self.config)
136            .field("embedding_dimension", &self.embedding_dimension())
137            .field("vector_indexes", &vector_count)
138            .field("insight_vector_indexes", &insight_vector_count)
139            .finish_non_exhaustive()
140    }
141}
142
143impl PulseDB {
144    /// Opens or creates a PulseDB database at the specified path.
145    ///
146    /// If the database doesn't exist, it will be created with the given
147    /// configuration. If it exists, the configuration will be validated
148    /// against the stored settings (e.g., embedding dimension must match).
149    ///
150    /// # Arguments
151    ///
152    /// * `path` - Path to the database file (created if it doesn't exist)
153    /// * `config` - Configuration options for the database
154    ///
155    /// # Errors
156    ///
157    /// Returns an error if:
158    /// - Configuration is invalid (see [`Config::validate`])
159    /// - Database file is corrupted
160    /// - Database is locked by another process
161    /// - Schema version doesn't match (needs migration)
162    /// - Embedding dimension doesn't match existing database
163    ///
164    /// # Example
165    ///
166    /// ```rust
167    /// # fn main() -> pulsedb::Result<()> {
168    /// # let dir = tempfile::tempdir().unwrap();
169    /// use pulsedb::{PulseDB, Config, EmbeddingDimension};
170    ///
171    /// // Open with default configuration
172    /// let db = PulseDB::open(dir.path().join("default.db"), Config::default())?;
173    /// # drop(db);
174    ///
175    /// // Open with custom embedding dimension
176    /// let db = PulseDB::open(dir.path().join("custom.db"), Config {
177    ///     embedding_dimension: EmbeddingDimension::D768,
178    ///     ..Default::default()
179    /// })?;
180    /// # Ok(())
181    /// # }
182    /// ```
183    #[instrument(skip(config), fields(path = %path.as_ref().display()))]
184    pub fn open(path: impl AsRef<Path>, config: Config) -> Result<Self> {
185        // Validate configuration first
186        config.validate().map_err(PulseDBError::from)?;
187
188        info!("Opening PulseDB");
189
190        // Open storage engine
191        let storage = open_storage(&path, &config)?;
192
193        // Create embedding service
194        let embedding = create_embedding_service(&config)?;
195
196        // Load or rebuild HNSW indexes for all existing collectives
197        let vectors = Self::load_all_indexes(&*storage, &config)?;
198        let insight_vectors = Self::load_all_insight_indexes(&*storage, &config)?;
199
200        info!(
201            dimension = config.embedding_dimension.size(),
202            sync_mode = ?config.sync_mode,
203            collectives = vectors.len(),
204            "PulseDB opened successfully"
205        );
206
207        let watch = Arc::new(WatchService::new(
208            config.watch.buffer_size,
209            config.watch.in_process,
210        ));
211
212        Ok(Self {
213            storage,
214            embedding,
215            config,
216            vectors: RwLock::new(vectors),
217            insight_vectors: RwLock::new(insight_vectors),
218            watch,
219        })
220    }
221
222    /// Closes the database, flushing all pending writes.
223    ///
224    /// This method consumes the `PulseDB` instance, ensuring it cannot
225    /// be used after closing. The underlying storage engine flushes all
226    /// buffered data to disk.
227    ///
228    /// # Errors
229    ///
230    /// Returns an error if the storage backend reports a flush failure.
231    /// Note: the current redb backend flushes durably on drop, so this
232    /// always returns `Ok(())` in practice.
233    ///
234    /// # Example
235    ///
236    /// ```rust
237    /// # fn main() -> pulsedb::Result<()> {
238    /// # let dir = tempfile::tempdir().unwrap();
239    /// use pulsedb::{PulseDB, Config};
240    ///
241    /// let db = PulseDB::open(dir.path().join("test.db"), Config::default())?;
242    /// // ... use the database ...
243    /// db.close()?;  // db is consumed here
244    /// // db.something() // Compile error: db was moved
245    /// # Ok(())
246    /// # }
247    /// ```
248    #[instrument(skip(self))]
249    pub fn close(self) -> Result<()> {
250        info!("Closing PulseDB");
251
252        // Persist HNSW indexes BEFORE closing storage.
253        // If HNSW save fails, storage is still open for potential recovery.
254        // On next open(), stale/missing HNSW files trigger a rebuild from redb.
255        if let Some(hnsw_dir) = self.hnsw_dir() {
256            // Experience HNSW indexes
257            let vectors = self
258                .vectors
259                .read()
260                .map_err(|_| PulseDBError::vector("Vectors lock poisoned during close"))?;
261            for (collective_id, index) in vectors.iter() {
262                if let Err(e) = index.save_to_dir(&hnsw_dir, &collective_id.to_string()) {
263                    warn!(
264                        collective = %collective_id,
265                        error = %e,
266                        "Failed to save HNSW index (will rebuild on next open)"
267                    );
268                }
269            }
270            drop(vectors);
271
272            // Insight HNSW indexes (separate files with _insights suffix)
273            let insight_vectors = self
274                .insight_vectors
275                .read()
276                .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned during close"))?;
277            for (collective_id, index) in insight_vectors.iter() {
278                let name = format!("{}_insights", collective_id);
279                if let Err(e) = index.save_to_dir(&hnsw_dir, &name) {
280                    warn!(
281                        collective = %collective_id,
282                        error = %e,
283                        "Failed to save insight HNSW index (will rebuild on next open)"
284                    );
285                }
286            }
287        }
288
289        // Close storage (flushes pending writes)
290        self.storage.close()?;
291
292        info!("PulseDB closed successfully");
293        Ok(())
294    }
295
296    /// Returns a reference to the database configuration.
297    ///
298    /// This is the configuration that was used to open the database.
299    /// Note that some settings (like embedding dimension) are locked
300    /// on database creation and cannot be changed.
301    #[inline]
302    pub fn config(&self) -> &Config {
303        &self.config
304    }
305
306    /// Returns the database metadata.
307    ///
308    /// Metadata includes schema version, embedding dimension, and timestamps
309    /// for when the database was created and last opened.
310    #[inline]
311    pub fn metadata(&self) -> &DatabaseMetadata {
312        self.storage.metadata()
313    }
314
315    /// Returns the embedding dimension configured for this database.
316    ///
317    /// All embeddings stored in this database must have exactly this
318    /// many dimensions.
319    #[inline]
320    pub fn embedding_dimension(&self) -> usize {
321        self.config.embedding_dimension.size()
322    }
323
324    // =========================================================================
325    // Internal Accessors (for use by feature modules)
326    // =========================================================================
327
328    /// Returns a reference to the storage engine.
329    ///
330    /// This is for internal use by other PulseDB modules.
331    #[inline]
332    #[allow(dead_code)] // Will be used by search (Phase 2) and other modules
333    pub(crate) fn storage(&self) -> &dyn StorageEngine {
334        self.storage.as_ref()
335    }
336
337    /// Returns a reference to the embedding service.
338    ///
339    /// This is for internal use by other PulseDB modules.
340    #[inline]
341    #[allow(dead_code)] // Will be used by search (Phase 2) and other modules
342    pub(crate) fn embedding(&self) -> &dyn EmbeddingService {
343        self.embedding.as_ref()
344    }
345
346    // =========================================================================
347    // HNSW Index Lifecycle
348    // =========================================================================
349
350    /// Returns the directory for HNSW index files.
351    ///
352    /// Derives `{db_path}.hnsw/` from the storage path. Returns `None` if
353    /// the storage has no file path (e.g., in-memory tests).
354    fn hnsw_dir(&self) -> Option<PathBuf> {
355        self.storage.path().map(|p| {
356            let mut hnsw_path = p.as_os_str().to_owned();
357            hnsw_path.push(".hnsw");
358            PathBuf::from(hnsw_path)
359        })
360    }
361
362    /// Loads or rebuilds HNSW indexes for all existing collectives.
363    ///
364    /// For each collective in storage:
365    /// 1. Try loading metadata from `.hnsw.meta` file
366    /// 2. Rebuild the graph from redb embeddings (always, since we can't
367    ///    load the graph due to hnsw_rs lifetime constraints)
368    /// 3. Restore deleted set from metadata if available
369    fn load_all_indexes(
370        storage: &dyn StorageEngine,
371        config: &Config,
372    ) -> Result<HashMap<CollectiveId, HnswIndex>> {
373        let collectives = storage.list_collectives()?;
374        let mut vectors = HashMap::with_capacity(collectives.len());
375
376        let hnsw_dir = storage.path().map(|p| {
377            let mut hnsw_path = p.as_os_str().to_owned();
378            hnsw_path.push(".hnsw");
379            PathBuf::from(hnsw_path)
380        });
381
382        for collective in &collectives {
383            let dimension = collective.embedding_dimension as usize;
384
385            // List all experience IDs in this collective
386            let exp_ids = storage.list_experience_ids_in_collective(collective.id)?;
387
388            // Load embeddings from redb (source of truth)
389            let mut embeddings = Vec::with_capacity(exp_ids.len());
390            for exp_id in &exp_ids {
391                if let Some(embedding) = storage.get_embedding(*exp_id)? {
392                    embeddings.push((*exp_id, embedding));
393                }
394            }
395
396            // Try loading metadata (for deleted set and ID mappings)
397            let metadata = hnsw_dir
398                .as_ref()
399                .and_then(|dir| HnswIndex::load_metadata(dir, &collective.id.to_string()).ok())
400                .flatten();
401
402            // Rebuild the HNSW graph from embeddings
403            let index = if embeddings.is_empty() {
404                HnswIndex::new(dimension, &config.hnsw)
405            } else {
406                let start = std::time::Instant::now();
407                let idx = HnswIndex::rebuild_from_embeddings(dimension, &config.hnsw, embeddings)?;
408                info!(
409                    collective = %collective.id,
410                    vectors = idx.active_count(),
411                    elapsed_ms = start.elapsed().as_millis() as u64,
412                    "Rebuilt HNSW index from redb embeddings"
413                );
414                idx
415            };
416
417            // Restore deleted set from metadata if available
418            if let Some(meta) = metadata {
419                index.restore_deleted_set(&meta.deleted)?;
420            }
421
422            vectors.insert(collective.id, index);
423        }
424
425        Ok(vectors)
426    }
427
428    /// Loads or rebuilds insight HNSW indexes for all existing collectives.
429    ///
430    /// For each collective, loads all insights from storage and rebuilds
431    /// the HNSW graph from their inline embeddings. Uses InsightId→ExperienceId
432    /// byte conversion for the HNSW API.
433    fn load_all_insight_indexes(
434        storage: &dyn StorageEngine,
435        config: &Config,
436    ) -> Result<HashMap<CollectiveId, HnswIndex>> {
437        let collectives = storage.list_collectives()?;
438        let mut insight_vectors = HashMap::with_capacity(collectives.len());
439
440        let hnsw_dir = storage.path().map(|p| {
441            let mut hnsw_path = p.as_os_str().to_owned();
442            hnsw_path.push(".hnsw");
443            PathBuf::from(hnsw_path)
444        });
445
446        for collective in &collectives {
447            let dimension = collective.embedding_dimension as usize;
448
449            // List all insight IDs in this collective
450            let insight_ids = storage.list_insight_ids_in_collective(collective.id)?;
451
452            // Load insights and extract embeddings (converting InsightId → ExperienceId)
453            let mut embeddings = Vec::with_capacity(insight_ids.len());
454            for insight_id in &insight_ids {
455                if let Some(insight) = storage.get_insight(*insight_id)? {
456                    let exp_id = ExperienceId::from_bytes(*insight_id.as_bytes());
457                    embeddings.push((exp_id, insight.embedding));
458                }
459            }
460
461            // Try loading metadata (for deleted set)
462            let name = format!("{}_insights", collective.id);
463            let metadata = hnsw_dir
464                .as_ref()
465                .and_then(|dir| HnswIndex::load_metadata(dir, &name).ok())
466                .flatten();
467
468            // Rebuild HNSW graph from embeddings
469            let index = if embeddings.is_empty() {
470                HnswIndex::new(dimension, &config.hnsw)
471            } else {
472                let start = std::time::Instant::now();
473                let idx = HnswIndex::rebuild_from_embeddings(dimension, &config.hnsw, embeddings)?;
474                info!(
475                    collective = %collective.id,
476                    insights = idx.active_count(),
477                    elapsed_ms = start.elapsed().as_millis() as u64,
478                    "Rebuilt insight HNSW index from stored insights"
479                );
480                idx
481            };
482
483            // Restore deleted set from metadata if available
484            if let Some(meta) = metadata {
485                index.restore_deleted_set(&meta.deleted)?;
486            }
487
488            insight_vectors.insert(collective.id, index);
489        }
490
491        Ok(insight_vectors)
492    }
493
494    /// Executes a closure with the HNSW index for a collective.
495    ///
496    /// This is the primary accessor for vector search operations (used by
497    /// `search_similar()`). The closure runs while the outer RwLock guard
498    /// is held (read lock), so the HnswIndex reference stays valid.
499    /// Returns `None` if no index exists for the collective.
500    #[doc(hidden)]
501    pub fn with_vector_index<F, R>(&self, collective_id: CollectiveId, f: F) -> Result<Option<R>>
502    where
503        F: FnOnce(&HnswIndex) -> Result<R>,
504    {
505        let vectors = self
506            .vectors
507            .read()
508            .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?;
509        match vectors.get(&collective_id) {
510            Some(index) => Ok(Some(f(index)?)),
511            None => Ok(None),
512        }
513    }
514
515    // =========================================================================
516    // Test Helpers
517    // =========================================================================
518
519    /// Returns a reference to the storage engine for integration testing.
520    ///
521    /// This method is intentionally hidden from documentation. It provides
522    /// test-only access to the storage layer for verifying ACID guarantees
523    /// and crash recovery. Production code should use the public PulseDB API.
524    #[doc(hidden)]
525    #[inline]
526    pub fn storage_for_test(&self) -> &dyn StorageEngine {
527        self.storage.as_ref()
528    }
529
530    /// Returns true if this database is in read-only mode.
531    pub fn is_read_only(&self) -> bool {
532        self.config.read_only
533    }
534
535    /// Checks if the database is read-only and returns an error if so.
536    #[inline]
537    fn check_writable(&self) -> Result<()> {
538        if self.config.read_only {
539            return Err(PulseDBError::ReadOnly);
540        }
541        Ok(())
542    }
543
544    // =========================================================================
545    // Collective Management (E1-S02)
546    // =========================================================================
547
548    /// Creates a new collective with the given name.
549    ///
550    /// The collective's embedding dimension is locked to the database's
551    /// configured dimension at creation time.
552    ///
553    /// # Arguments
554    ///
555    /// * `name` - Human-readable name (1-255 characters, not whitespace-only)
556    ///
557    /// # Errors
558    ///
559    /// Returns a validation error if the name is empty, whitespace-only,
560    /// or exceeds 255 characters.
561    ///
562    /// # Example
563    ///
564    /// ```rust
565    /// # fn main() -> pulsedb::Result<()> {
566    /// # let dir = tempfile::tempdir().unwrap();
567    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
568    /// let id = db.create_collective("my-project")?;
569    /// # Ok(())
570    /// # }
571    /// ```
572    #[instrument(skip(self))]
573    pub fn create_collective(&self, name: &str) -> Result<CollectiveId> {
574        self.check_writable()?;
575        validate_collective_name(name)?;
576
577        let dimension = self.config.embedding_dimension.size() as u16;
578        let collective = Collective::new(name, dimension);
579        let id = collective.id;
580
581        // Persist to redb first (source of truth)
582        self.storage.save_collective(&collective)?;
583
584        // Create empty HNSW indexes for this collective
585        let exp_index = HnswIndex::new(dimension as usize, &self.config.hnsw);
586        let insight_index = HnswIndex::new(dimension as usize, &self.config.hnsw);
587        self.vectors
588            .write()
589            .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?
590            .insert(id, exp_index);
591        self.insight_vectors
592            .write()
593            .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?
594            .insert(id, insight_index);
595
596        info!(id = %id, name = %name, "Collective created");
597        Ok(id)
598    }
599
600    /// Creates a new collective with an owner for multi-tenancy.
601    ///
602    /// Same as [`create_collective`](Self::create_collective) but assigns
603    /// an owner ID, enabling filtering with
604    /// [`list_collectives_by_owner`](Self::list_collectives_by_owner).
605    ///
606    /// # Arguments
607    ///
608    /// * `name` - Human-readable name (1-255 characters)
609    /// * `owner_id` - Owner identifier (must not be empty)
610    ///
611    /// # Errors
612    ///
613    /// Returns a validation error if the name or owner_id is invalid.
614    #[instrument(skip(self))]
615    pub fn create_collective_with_owner(&self, name: &str, owner_id: &str) -> Result<CollectiveId> {
616        self.check_writable()?;
617        validate_collective_name(name)?;
618
619        if owner_id.is_empty() {
620            return Err(PulseDBError::from(
621                crate::error::ValidationError::required_field("owner_id"),
622            ));
623        }
624
625        let dimension = self.config.embedding_dimension.size() as u16;
626        let collective = Collective::with_owner(name, owner_id, dimension);
627        let id = collective.id;
628
629        // Persist to redb first (source of truth)
630        self.storage.save_collective(&collective)?;
631
632        // Create empty HNSW indexes for this collective
633        let exp_index = HnswIndex::new(dimension as usize, &self.config.hnsw);
634        let insight_index = HnswIndex::new(dimension as usize, &self.config.hnsw);
635        self.vectors
636            .write()
637            .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?
638            .insert(id, exp_index);
639        self.insight_vectors
640            .write()
641            .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?
642            .insert(id, insight_index);
643
644        info!(id = %id, name = %name, owner = %owner_id, "Collective created with owner");
645        Ok(id)
646    }
647
648    /// Returns a collective by ID, or `None` if not found.
649    ///
650    /// # Example
651    ///
652    /// ```rust
653    /// # fn main() -> pulsedb::Result<()> {
654    /// # let dir = tempfile::tempdir().unwrap();
655    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
656    /// # let id = db.create_collective("example")?;
657    /// if let Some(collective) = db.get_collective(id)? {
658    ///     println!("Found: {}", collective.name);
659    /// }
660    /// # Ok(())
661    /// # }
662    /// ```
663    #[instrument(skip(self))]
664    pub fn get_collective(&self, id: CollectiveId) -> Result<Option<Collective>> {
665        self.storage.get_collective(id)
666    }
667
668    /// Lists all collectives in the database.
669    ///
670    /// Returns an empty vector if no collectives exist.
671    pub fn list_collectives(&self) -> Result<Vec<Collective>> {
672        self.storage.list_collectives()
673    }
674
675    /// Lists collectives filtered by owner ID.
676    ///
677    /// Returns only collectives whose `owner_id` matches the given value.
678    /// Returns an empty vector if no matching collectives exist.
679    pub fn list_collectives_by_owner(&self, owner_id: &str) -> Result<Vec<Collective>> {
680        let all = self.storage.list_collectives()?;
681        Ok(all
682            .into_iter()
683            .filter(|c| c.owner_id.as_deref() == Some(owner_id))
684            .collect())
685    }
686
687    /// Returns statistics for a collective.
688    ///
689    /// # Errors
690    ///
691    /// Returns [`NotFoundError::Collective`] if the collective doesn't exist.
692    #[instrument(skip(self))]
693    pub fn get_collective_stats(&self, id: CollectiveId) -> Result<CollectiveStats> {
694        // Verify collective exists
695        self.storage
696            .get_collective(id)?
697            .ok_or_else(|| PulseDBError::from(NotFoundError::collective(id)))?;
698
699        let experience_count = self.storage.count_experiences_in_collective(id)?;
700
701        Ok(CollectiveStats {
702            experience_count,
703            storage_bytes: 0,
704            oldest_experience: None,
705            newest_experience: None,
706        })
707    }
708
709    /// Deletes a collective and all its associated data.
710    ///
711    /// Performs cascade deletion: removes all experiences belonging to the
712    /// collective before removing the collective record itself.
713    ///
714    /// # Errors
715    ///
716    /// Returns [`NotFoundError::Collective`] if the collective doesn't exist.
717    ///
718    /// # Example
719    ///
720    /// ```rust
721    /// # fn main() -> pulsedb::Result<()> {
722    /// # let dir = tempfile::tempdir().unwrap();
723    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
724    /// # let collective_id = db.create_collective("to-delete")?;
725    /// db.delete_collective(collective_id)?;
726    /// assert!(db.get_collective(collective_id)?.is_none());
727    /// # Ok(())
728    /// # }
729    /// ```
730    #[instrument(skip(self))]
731    pub fn delete_collective(&self, id: CollectiveId) -> Result<()> {
732        self.check_writable()?;
733        // Verify collective exists
734        self.storage
735            .get_collective(id)?
736            .ok_or_else(|| PulseDBError::from(NotFoundError::collective(id)))?;
737
738        // Cascade: delete all experiences for this collective
739        let deleted_count = self.storage.delete_experiences_by_collective(id)?;
740        if deleted_count > 0 {
741            info!(count = deleted_count, "Cascade-deleted experiences");
742        }
743
744        // Cascade: delete all insights for this collective
745        let deleted_insights = self.storage.delete_insights_by_collective(id)?;
746        if deleted_insights > 0 {
747            info!(count = deleted_insights, "Cascade-deleted insights");
748        }
749
750        // Cascade: delete all activities for this collective
751        let deleted_activities = self.storage.delete_activities_by_collective(id)?;
752        if deleted_activities > 0 {
753            info!(count = deleted_activities, "Cascade-deleted activities");
754        }
755
756        // Delete the collective record from storage
757        self.storage.delete_collective(id)?;
758
759        // Remove HNSW indexes from memory
760        self.vectors
761            .write()
762            .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?
763            .remove(&id);
764        self.insight_vectors
765            .write()
766            .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?
767            .remove(&id);
768
769        // Remove HNSW files from disk (non-fatal if fails)
770        if let Some(hnsw_dir) = self.hnsw_dir() {
771            if let Err(e) = HnswIndex::remove_files(&hnsw_dir, &id.to_string()) {
772                warn!(
773                    collective = %id,
774                    error = %e,
775                    "Failed to remove experience HNSW files (non-fatal)"
776                );
777            }
778            let insight_name = format!("{}_insights", id);
779            if let Err(e) = HnswIndex::remove_files(&hnsw_dir, &insight_name) {
780                warn!(
781                    collective = %id,
782                    error = %e,
783                    "Failed to remove insight HNSW files (non-fatal)"
784                );
785            }
786        }
787
788        info!(id = %id, "Collective deleted");
789        Ok(())
790    }
791
792    // =========================================================================
793    // Experience CRUD (E1-S03)
794    // =========================================================================
795
796    /// Records a new experience in the database.
797    ///
798    /// This is the primary method for storing agent-learned knowledge. The method:
799    /// 1. Validates the input (content, scores, tags, embedding)
800    /// 2. Verifies the collective exists
801    /// 3. Resolves the embedding (generates if Builtin, requires if External)
802    /// 4. Stores the experience atomically across 4 tables
803    ///
804    /// # Arguments
805    ///
806    /// * `exp` - The experience to record (see [`NewExperience`])
807    ///
808    /// # Errors
809    ///
810    /// - [`ValidationError`](crate::ValidationError) if input is invalid
811    /// - [`NotFoundError::Collective`] if the collective doesn't exist
812    /// - [`PulseDBError::Embedding`] if embedding generation fails (Builtin mode)
813    #[instrument(skip(self, exp), fields(collective_id = %exp.collective_id))]
814    pub fn record_experience(&self, exp: NewExperience) -> Result<ExperienceId> {
815        self.check_writable()?;
816        let is_external = matches!(self.config.embedding_provider, EmbeddingProvider::External);
817
818        // Verify collective exists and get its dimension
819        let collective = self
820            .storage
821            .get_collective(exp.collective_id)?
822            .ok_or_else(|| PulseDBError::from(NotFoundError::collective(exp.collective_id)))?;
823
824        // Validate input
825        validate_new_experience(&exp, collective.embedding_dimension, is_external)?;
826
827        // Resolve embedding
828        let embedding = match exp.embedding {
829            Some(emb) => emb,
830            None => {
831                // Builtin mode: generate embedding from content
832                self.embedding.embed(&exp.content)?
833            }
834        };
835
836        // Clone embedding for HNSW insertion (~1.5KB for 384d, negligible vs I/O)
837        let embedding_for_hnsw = embedding.clone();
838        let collective_id = exp.collective_id;
839
840        // Construct the full experience record
841        let experience = Experience {
842            id: ExperienceId::new(),
843            collective_id,
844            content: exp.content,
845            embedding,
846            experience_type: exp.experience_type,
847            importance: exp.importance,
848            confidence: exp.confidence,
849            applications: 0,
850            domain: exp.domain,
851            related_files: exp.related_files,
852            source_agent: exp.source_agent,
853            source_task: exp.source_task,
854            timestamp: Timestamp::now(),
855            archived: false,
856        };
857
858        let id = experience.id;
859
860        // Write to redb FIRST (source of truth). If crash happens after
861        // this but before HNSW insert, rebuild on next open will include it.
862        self.storage.save_experience(&experience)?;
863
864        // Insert into HNSW index (derived structure)
865        let vectors = self
866            .vectors
867            .read()
868            .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?;
869        if let Some(index) = vectors.get(&collective_id) {
870            index.insert_experience(id, &embedding_for_hnsw)?;
871        }
872
873        // Emit watch event after both storage and HNSW succeed
874        self.watch.emit(
875            WatchEvent {
876                experience_id: id,
877                collective_id,
878                event_type: WatchEventType::Created,
879                timestamp: experience.timestamp,
880                experience: Some(experience.clone()),
881            },
882            &experience,
883        )?;
884
885        info!(id = %id, "Experience recorded");
886        Ok(id)
887    }
888
889    /// Retrieves an experience by ID, including its embedding.
890    ///
891    /// Returns `None` if no experience with the given ID exists.
892    #[instrument(skip(self))]
893    pub fn get_experience(&self, id: ExperienceId) -> Result<Option<Experience>> {
894        self.storage.get_experience(id)
895    }
896
897    /// Updates mutable fields of an experience.
898    ///
899    /// Only fields set to `Some(...)` in the update are changed.
900    /// Content and embedding are immutable — create a new experience instead.
901    ///
902    /// # Errors
903    ///
904    /// - [`ValidationError`](crate::ValidationError) if updated values are invalid
905    /// - [`NotFoundError::Experience`] if the experience doesn't exist
906    #[instrument(skip(self, update))]
907    pub fn update_experience(&self, id: ExperienceId, update: ExperienceUpdate) -> Result<()> {
908        self.check_writable()?;
909        validate_experience_update(&update)?;
910
911        let updated = self.storage.update_experience(id, &update)?;
912        if !updated {
913            return Err(PulseDBError::from(NotFoundError::experience(id)));
914        }
915
916        // Emit watch event (fetch experience for collective_id + filter matching)
917        if self.watch.has_subscribers() {
918            if let Ok(Some(exp)) = self.storage.get_experience(id) {
919                let event_type = if update.archived == Some(true) {
920                    WatchEventType::Archived
921                } else {
922                    WatchEventType::Updated
923                };
924                self.watch.emit(
925                    WatchEvent {
926                        experience_id: id,
927                        collective_id: exp.collective_id,
928                        event_type,
929                        timestamp: Timestamp::now(),
930                        experience: Some(exp.clone()),
931                    },
932                    &exp,
933                )?;
934            }
935        }
936
937        info!(id = %id, "Experience updated");
938        Ok(())
939    }
940
941    /// Archives an experience (soft-delete).
942    ///
943    /// Archived experiences remain in storage but are excluded from search
944    /// results. Use [`unarchive_experience`](Self::unarchive_experience) to restore.
945    ///
946    /// # Errors
947    ///
948    /// Returns [`NotFoundError::Experience`] if the experience doesn't exist.
949    #[instrument(skip(self))]
950    pub fn archive_experience(&self, id: ExperienceId) -> Result<()> {
951        self.check_writable()?;
952        self.update_experience(
953            id,
954            ExperienceUpdate {
955                archived: Some(true),
956                ..Default::default()
957            },
958        )
959    }
960
961    /// Restores an archived experience.
962    ///
963    /// The experience will once again appear in search results.
964    ///
965    /// # Errors
966    ///
967    /// Returns [`NotFoundError::Experience`] if the experience doesn't exist.
968    #[instrument(skip(self))]
969    pub fn unarchive_experience(&self, id: ExperienceId) -> Result<()> {
970        self.check_writable()?;
971        self.update_experience(
972            id,
973            ExperienceUpdate {
974                archived: Some(false),
975                ..Default::default()
976            },
977        )
978    }
979
980    /// Permanently deletes an experience and its embedding.
981    ///
982    /// This removes the experience from all tables and indices.
983    /// Unlike archiving, this is irreversible.
984    ///
985    /// # Errors
986    ///
987    /// Returns [`NotFoundError::Experience`] if the experience doesn't exist.
988    #[instrument(skip(self))]
989    pub fn delete_experience(&self, id: ExperienceId) -> Result<()> {
990        self.check_writable()?;
991        // Read experience first to get collective_id for HNSW lookup.
992        // This adds one extra read, but delete is not a hot path.
993        let experience = self
994            .storage
995            .get_experience(id)?
996            .ok_or_else(|| PulseDBError::from(NotFoundError::experience(id)))?;
997
998        // Cascade-delete any relations involving this experience.
999        // Done before experience deletion so we can still look up relation data.
1000        let rel_count = self.storage.delete_relations_for_experience(id)?;
1001        if rel_count > 0 {
1002            info!(
1003                count = rel_count,
1004                "Cascade-deleted relations for experience"
1005            );
1006        }
1007
1008        // Delete from redb FIRST (source of truth). If crash happens after
1009        // this but before HNSW soft-delete, on reopen the experience won't be
1010        // loaded from redb, so it's automatically excluded from the rebuilt index.
1011        self.storage.delete_experience(id)?;
1012
1013        // Soft-delete from HNSW index (mark as deleted, not removed from graph).
1014        // This takes effect immediately for the current session's searches.
1015        let vectors = self
1016            .vectors
1017            .read()
1018            .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?;
1019        if let Some(index) = vectors.get(&experience.collective_id) {
1020            index.delete_experience(id)?;
1021        }
1022
1023        // Emit watch event after storage + HNSW deletion
1024        self.watch.emit(
1025            WatchEvent {
1026                experience_id: id,
1027                collective_id: experience.collective_id,
1028                event_type: WatchEventType::Deleted,
1029                timestamp: Timestamp::now(),
1030                experience: None, // Deleted — no data to include
1031            },
1032            &experience,
1033        )?;
1034
1035        info!(id = %id, "Experience deleted");
1036        Ok(())
1037    }
1038
1039    /// Reinforces an experience by incrementing its application count.
1040    ///
1041    /// Each call atomically increments the `applications` counter by 1.
1042    /// Returns the new application count.
1043    ///
1044    /// # Errors
1045    ///
1046    /// Returns [`NotFoundError::Experience`] if the experience doesn't exist.
1047    #[instrument(skip(self))]
1048    pub fn reinforce_experience(&self, id: ExperienceId) -> Result<u32> {
1049        self.check_writable()?;
1050        let new_count = self
1051            .storage
1052            .reinforce_experience(id)?
1053            .ok_or_else(|| PulseDBError::from(NotFoundError::experience(id)))?;
1054
1055        // Emit watch event (fetch experience for collective_id + filter matching)
1056        if self.watch.has_subscribers() {
1057            if let Ok(Some(exp)) = self.storage.get_experience(id) {
1058                self.watch.emit(
1059                    WatchEvent {
1060                        experience_id: id,
1061                        collective_id: exp.collective_id,
1062                        event_type: WatchEventType::Updated,
1063                        timestamp: Timestamp::now(),
1064                        experience: Some(exp.clone()),
1065                    },
1066                    &exp,
1067                )?;
1068            }
1069        }
1070
1071        info!(id = %id, applications = new_count, "Experience reinforced");
1072        Ok(new_count)
1073    }
1074
1075    // =========================================================================
1076    // Recent Experiences
1077    // =========================================================================
1078
1079    // =========================================================================
1080    // Paginated List Operations (PulseVision)
1081    // =========================================================================
1082
1083    /// Lists experiences in a collective with pagination.
1084    ///
1085    /// Returns full `Experience` records (including embeddings) ordered by
1086    /// timestamp. Use `offset` and `limit` for pagination.
1087    ///
1088    /// Designed for visualization tools (PulseVision) that need to enumerate
1089    /// the entire embedding space of a collective.
1090    #[instrument(skip(self))]
1091    pub fn list_experiences(
1092        &self,
1093        collective_id: CollectiveId,
1094        limit: usize,
1095        offset: usize,
1096    ) -> Result<Vec<Experience>> {
1097        let ids = self
1098            .storage
1099            .list_experience_ids_paginated(collective_id, limit, offset)?;
1100        let mut experiences = Vec::with_capacity(ids.len());
1101        for id in ids {
1102            if let Some(exp) = self.storage.get_experience(id)? {
1103                experiences.push(exp);
1104            }
1105        }
1106        Ok(experiences)
1107    }
1108
1109    /// Lists relations in a collective with pagination.
1110    #[instrument(skip(self))]
1111    pub fn list_relations(
1112        &self,
1113        collective_id: CollectiveId,
1114        limit: usize,
1115        offset: usize,
1116    ) -> Result<Vec<crate::relation::ExperienceRelation>> {
1117        self.storage
1118            .list_relations_in_collective(collective_id, limit, offset)
1119    }
1120
1121    /// Lists insights in a collective with pagination.
1122    ///
1123    /// Returns full `DerivedInsight` records including embeddings.
1124    #[instrument(skip(self))]
1125    pub fn list_insights(
1126        &self,
1127        collective_id: CollectiveId,
1128        limit: usize,
1129        offset: usize,
1130    ) -> Result<Vec<DerivedInsight>> {
1131        let ids = self
1132            .storage
1133            .list_insight_ids_paginated(collective_id, limit, offset)?;
1134        let mut insights = Vec::with_capacity(ids.len());
1135        for id in ids {
1136            if let Some(insight) = self.storage.get_insight(id)? {
1137                insights.push(insight);
1138            }
1139        }
1140        Ok(insights)
1141    }
1142
1143    /// Retrieves the most recent experiences in a collective.
1144    ///
1145    /// Returns full experiences ordered by timestamp (newest first).
1146    #[instrument(skip(self))]
1147    pub fn get_recent_experiences(
1148        &self,
1149        collective_id: CollectiveId,
1150        limit: usize,
1151    ) -> Result<Vec<Experience>> {
1152        self.get_recent_experiences_filtered(collective_id, limit, SearchFilter::default())
1153    }
1154
1155    /// Retrieves the most recent experiences in a collective with filtering.
1156    ///
1157    /// Like [`get_recent_experiences()`](Self::get_recent_experiences), but
1158    /// applies additional filters on domain, experience type, importance,
1159    /// confidence, and timestamp.
1160    ///
1161    /// Over-fetches from storage (2x `limit`) to account for entries removed
1162    /// by post-filtering, then truncates to the requested `limit`.
1163    ///
1164    /// # Arguments
1165    ///
1166    /// * `collective_id` - The collective to query
1167    /// * `limit` - Maximum number of experiences to return (1-1000)
1168    /// * `filter` - Filter criteria to apply
1169    ///
1170    /// # Errors
1171    ///
1172    /// - [`ValidationError::InvalidField`] if `limit` is 0 or > 1000
1173    /// - [`NotFoundError::Collective`] if the collective doesn't exist
1174    #[instrument(skip(self, filter))]
1175    pub fn get_recent_experiences_filtered(
1176        &self,
1177        collective_id: CollectiveId,
1178        limit: usize,
1179        filter: SearchFilter,
1180    ) -> Result<Vec<Experience>> {
1181        // Validate limit
1182        if limit == 0 || limit > 1000 {
1183            return Err(
1184                ValidationError::invalid_field("limit", "must be between 1 and 1000").into(),
1185            );
1186        }
1187
1188        // Verify collective exists
1189        self.storage
1190            .get_collective(collective_id)?
1191            .ok_or_else(|| PulseDBError::from(NotFoundError::collective(collective_id)))?;
1192
1193        // Over-fetch IDs to account for post-filtering losses
1194        let over_fetch = limit.saturating_mul(2).min(2000);
1195        let recent_ids = self
1196            .storage
1197            .get_recent_experience_ids(collective_id, over_fetch)?;
1198
1199        // Load full experiences and apply filter
1200        let mut results = Vec::with_capacity(limit);
1201        for (exp_id, _timestamp) in recent_ids {
1202            if results.len() >= limit {
1203                break;
1204            }
1205
1206            if let Some(experience) = self.storage.get_experience(exp_id)? {
1207                if filter.matches(&experience) {
1208                    results.push(experience);
1209                }
1210            }
1211        }
1212
1213        Ok(results)
1214    }
1215
1216    // =========================================================================
1217    // Similarity Search (E2-S02)
1218    // =========================================================================
1219
1220    /// Searches for experiences semantically similar to the query embedding.
1221    ///
1222    /// Uses the HNSW vector index for approximate nearest neighbor search,
1223    /// then fetches full experience records from storage. Archived experiences
1224    /// are excluded by default.
1225    ///
1226    /// Results are sorted by similarity descending (most similar first).
1227    /// Similarity is computed as `1.0 - cosine_distance`.
1228    ///
1229    /// # Arguments
1230    ///
1231    /// * `collective_id` - The collective to search within
1232    /// * `query` - Query embedding vector (must match collective's dimension)
1233    /// * `k` - Maximum number of results to return (1-1000)
1234    ///
1235    /// # Errors
1236    ///
1237    /// - [`ValidationError::InvalidField`] if `k` is 0 or > 1000
1238    /// - [`ValidationError::DimensionMismatch`] if `query.len()` doesn't match
1239    ///   the collective's embedding dimension
1240    /// - [`NotFoundError::Collective`] if the collective doesn't exist
1241    ///
1242    /// # Example
1243    ///
1244    /// ```rust
1245    /// # fn main() -> pulsedb::Result<()> {
1246    /// # let dir = tempfile::tempdir().unwrap();
1247    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
1248    /// # let collective_id = db.create_collective("example")?;
1249    /// let query = vec![0.1f32; 384]; // Your query embedding
1250    /// let results = db.search_similar(collective_id, &query, 10)?;
1251    /// for result in &results {
1252    ///     println!(
1253    ///         "[{:.3}] {}",
1254    ///         result.similarity, result.experience.content
1255    ///     );
1256    /// }
1257    /// # Ok(())
1258    /// # }
1259    /// ```
1260    #[instrument(skip(self, query))]
1261    pub fn search_similar(
1262        &self,
1263        collective_id: CollectiveId,
1264        query: &[f32],
1265        k: usize,
1266    ) -> Result<Vec<SearchResult>> {
1267        self.search_similar_filtered(collective_id, query, k, SearchFilter::default())
1268    }
1269
1270    /// Searches for semantically similar experiences with additional filtering.
1271    ///
1272    /// Like [`search_similar()`](Self::search_similar), but applies additional
1273    /// filters on domain, experience type, importance, confidence, and timestamp.
1274    ///
1275    /// Over-fetches from the HNSW index (2x `k`) to account for entries removed
1276    /// by post-filtering, then truncates to the requested `k`.
1277    ///
1278    /// # Arguments
1279    ///
1280    /// * `collective_id` - The collective to search within
1281    /// * `query` - Query embedding vector (must match collective's dimension)
1282    /// * `k` - Maximum number of results to return (1-1000)
1283    /// * `filter` - Filter criteria to apply after vector search
1284    ///
1285    /// # Errors
1286    ///
1287    /// - [`ValidationError::InvalidField`] if `k` is 0 or > 1000
1288    /// - [`ValidationError::DimensionMismatch`] if `query.len()` doesn't match
1289    ///   the collective's embedding dimension
1290    /// - [`NotFoundError::Collective`] if the collective doesn't exist
1291    ///
1292    /// # Example
1293    ///
1294    /// ```rust
1295    /// # fn main() -> pulsedb::Result<()> {
1296    /// # let dir = tempfile::tempdir().unwrap();
1297    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
1298    /// # let collective_id = db.create_collective("example")?;
1299    /// # let query_embedding = vec![0.1f32; 384];
1300    /// use pulsedb::SearchFilter;
1301    ///
1302    /// let filter = SearchFilter {
1303    ///     domains: Some(vec!["rust".to_string()]),
1304    ///     min_importance: Some(0.5),
1305    ///     ..SearchFilter::default()
1306    /// };
1307    /// let results = db.search_similar_filtered(
1308    ///     collective_id,
1309    ///     &query_embedding,
1310    ///     10,
1311    ///     filter,
1312    /// )?;
1313    /// # Ok(())
1314    /// # }
1315    /// ```
1316    #[instrument(skip(self, query, filter))]
1317    pub fn search_similar_filtered(
1318        &self,
1319        collective_id: CollectiveId,
1320        query: &[f32],
1321        k: usize,
1322        filter: SearchFilter,
1323    ) -> Result<Vec<SearchResult>> {
1324        // Validate k
1325        if k == 0 || k > 1000 {
1326            return Err(ValidationError::invalid_field("k", "must be between 1 and 1000").into());
1327        }
1328
1329        // Verify collective exists and check embedding dimension
1330        let collective = self
1331            .storage
1332            .get_collective(collective_id)?
1333            .ok_or_else(|| PulseDBError::from(NotFoundError::collective(collective_id)))?;
1334
1335        let expected_dim = collective.embedding_dimension as usize;
1336        if query.len() != expected_dim {
1337            return Err(ValidationError::dimension_mismatch(expected_dim, query.len()).into());
1338        }
1339
1340        // Over-fetch from HNSW to compensate for post-filtering losses
1341        let over_fetch = k.saturating_mul(2).min(2000);
1342        let ef_search = self.config.hnsw.ef_search;
1343
1344        // Search HNSW index — returns (ExperienceId, cosine_distance) sorted
1345        // by distance ascending (closest first)
1346        let candidates = self
1347            .with_vector_index(collective_id, |index| {
1348                index.search_experiences(query, over_fetch, ef_search)
1349            })?
1350            .unwrap_or_default();
1351
1352        // Fetch full experiences, apply filter, convert distance → similarity
1353        let mut results = Vec::with_capacity(k);
1354        for (exp_id, distance) in candidates {
1355            if results.len() >= k {
1356                break;
1357            }
1358
1359            if let Some(experience) = self.storage.get_experience(exp_id)? {
1360                if filter.matches(&experience) {
1361                    results.push(SearchResult {
1362                        experience,
1363                        similarity: 1.0 - distance,
1364                    });
1365                }
1366            }
1367        }
1368
1369        Ok(results)
1370    }
1371
1372    // =========================================================================
1373    // Experience Relations (E3-S01)
1374    // =========================================================================
1375
1376    /// Stores a new relation between two experiences.
1377    ///
1378    /// Relations are typed, directed edges connecting a source experience to a
1379    /// target experience. Both experiences must exist and belong to the same
1380    /// collective. Duplicate relations (same source, target, and type) are
1381    /// rejected.
1382    ///
1383    /// # Arguments
1384    ///
1385    /// * `relation` - The relation to create (source, target, type, strength)
1386    ///
1387    /// # Errors
1388    ///
1389    /// Returns an error if:
1390    /// - Source or target experience doesn't exist ([`NotFoundError::Experience`])
1391    /// - Experiences belong to different collectives ([`ValidationError::InvalidField`])
1392    /// - A relation with the same (source, target, type) already exists
1393    /// - Self-relation attempted (source == target)
1394    /// - Strength is out of range `[0.0, 1.0]`
1395    #[instrument(skip(self, relation))]
1396    pub fn store_relation(
1397        &self,
1398        relation: crate::relation::NewExperienceRelation,
1399    ) -> Result<crate::types::RelationId> {
1400        self.check_writable()?;
1401        use crate::relation::{validate_new_relation, ExperienceRelation};
1402        use crate::types::RelationId;
1403
1404        // Validate input fields (self-relation, strength bounds, metadata size)
1405        validate_new_relation(&relation)?;
1406
1407        // Load source and target experiences to verify existence
1408        let source = self
1409            .storage
1410            .get_experience(relation.source_id)?
1411            .ok_or_else(|| PulseDBError::from(NotFoundError::experience(relation.source_id)))?;
1412        let target = self
1413            .storage
1414            .get_experience(relation.target_id)?
1415            .ok_or_else(|| PulseDBError::from(NotFoundError::experience(relation.target_id)))?;
1416
1417        // Verify same collective
1418        if source.collective_id != target.collective_id {
1419            return Err(PulseDBError::from(ValidationError::invalid_field(
1420                "target_id",
1421                "source and target experiences must belong to the same collective",
1422            )));
1423        }
1424
1425        // Check for duplicate (same source, target, type)
1426        if self.storage.relation_exists(
1427            relation.source_id,
1428            relation.target_id,
1429            relation.relation_type,
1430        )? {
1431            return Err(PulseDBError::from(ValidationError::invalid_field(
1432                "relation_type",
1433                "a relation with this source, target, and type already exists",
1434            )));
1435        }
1436
1437        // Construct the full relation
1438        let id = RelationId::new();
1439        let full_relation = ExperienceRelation {
1440            id,
1441            source_id: relation.source_id,
1442            target_id: relation.target_id,
1443            relation_type: relation.relation_type,
1444            strength: relation.strength,
1445            metadata: relation.metadata,
1446            created_at: Timestamp::now(),
1447        };
1448
1449        self.storage.save_relation(&full_relation)?;
1450
1451        info!(
1452            id = %id,
1453            source = %relation.source_id,
1454            target = %relation.target_id,
1455            relation_type = ?full_relation.relation_type,
1456            "Relation stored"
1457        );
1458        Ok(id)
1459    }
1460
1461    /// Retrieves experiences related to the given experience.
1462    ///
1463    /// Returns pairs of `(Experience, ExperienceRelation)` based on the
1464    /// requested direction:
1465    /// - `Outgoing`: experiences that this experience points TO (as source)
1466    /// - `Incoming`: experiences that point TO this experience (as target)
1467    /// - `Both`: union of outgoing and incoming
1468    ///
1469    /// To filter by relation type, use
1470    /// [`get_related_experiences_filtered`](Self::get_related_experiences_filtered).
1471    ///
1472    /// Silently skips relations where the related experience no longer exists
1473    /// (orphan tolerance).
1474    ///
1475    /// # Errors
1476    ///
1477    /// Returns a storage error if the read transaction fails.
1478    #[instrument(skip(self))]
1479    pub fn get_related_experiences(
1480        &self,
1481        experience_id: ExperienceId,
1482        direction: crate::relation::RelationDirection,
1483    ) -> Result<Vec<(Experience, crate::relation::ExperienceRelation)>> {
1484        self.get_related_experiences_filtered(experience_id, direction, None)
1485    }
1486
1487    /// Retrieves experiences related to the given experience, with optional
1488    /// type filtering.
1489    ///
1490    /// Like [`get_related_experiences()`](Self::get_related_experiences), but
1491    /// accepts an optional [`RelationType`](crate::RelationType) filter.
1492    /// When `Some(rt)`, only relations matching that type are returned.
1493    ///
1494    /// # Arguments
1495    ///
1496    /// * `experience_id` - The experience to query relations for
1497    /// * `direction` - Which direction(s) to traverse
1498    /// * `relation_type` - If `Some`, only return relations of this type
1499    ///
1500    /// # Example
1501    ///
1502    /// ```rust
1503    /// # fn main() -> pulsedb::Result<()> {
1504    /// # let dir = tempfile::tempdir().unwrap();
1505    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
1506    /// # let cid = db.create_collective("example")?;
1507    /// # let exp_a = db.record_experience(pulsedb::NewExperience {
1508    /// #     collective_id: cid,
1509    /// #     content: "a".into(),
1510    /// #     embedding: Some(vec![0.1f32; 384]),
1511    /// #     ..Default::default()
1512    /// # })?;
1513    /// use pulsedb::{RelationType, RelationDirection};
1514    ///
1515    /// // Only "Supports" relations outgoing from exp_a
1516    /// let supports = db.get_related_experiences_filtered(
1517    ///     exp_a,
1518    ///     RelationDirection::Outgoing,
1519    ///     Some(RelationType::Supports),
1520    /// )?;
1521    /// # Ok(())
1522    /// # }
1523    /// ```
1524    #[instrument(skip(self))]
1525    pub fn get_related_experiences_filtered(
1526        &self,
1527        experience_id: ExperienceId,
1528        direction: crate::relation::RelationDirection,
1529        relation_type: Option<crate::relation::RelationType>,
1530    ) -> Result<Vec<(Experience, crate::relation::ExperienceRelation)>> {
1531        use crate::relation::RelationDirection;
1532
1533        let mut results = Vec::new();
1534
1535        // Outgoing: this experience is the source → fetch target experiences
1536        if matches!(
1537            direction,
1538            RelationDirection::Outgoing | RelationDirection::Both
1539        ) {
1540            let rel_ids = self.storage.get_relation_ids_by_source(experience_id)?;
1541            for rel_id in rel_ids {
1542                if let Some(relation) = self.storage.get_relation(rel_id)? {
1543                    if relation_type.is_some_and(|rt| rt != relation.relation_type) {
1544                        continue;
1545                    }
1546                    if let Some(experience) = self.storage.get_experience(relation.target_id)? {
1547                        results.push((experience, relation));
1548                    }
1549                }
1550            }
1551        }
1552
1553        // Incoming: this experience is the target → fetch source experiences
1554        if matches!(
1555            direction,
1556            RelationDirection::Incoming | RelationDirection::Both
1557        ) {
1558            let rel_ids = self.storage.get_relation_ids_by_target(experience_id)?;
1559            for rel_id in rel_ids {
1560                if let Some(relation) = self.storage.get_relation(rel_id)? {
1561                    if relation_type.is_some_and(|rt| rt != relation.relation_type) {
1562                        continue;
1563                    }
1564                    if let Some(experience) = self.storage.get_experience(relation.source_id)? {
1565                        results.push((experience, relation));
1566                    }
1567                }
1568            }
1569        }
1570
1571        Ok(results)
1572    }
1573
1574    /// Retrieves a relation by ID.
1575    ///
1576    /// Returns `None` if no relation with the given ID exists.
1577    pub fn get_relation(
1578        &self,
1579        id: crate::types::RelationId,
1580    ) -> Result<Option<crate::relation::ExperienceRelation>> {
1581        self.storage.get_relation(id)
1582    }
1583
1584    /// Deletes a relation by ID.
1585    ///
1586    /// # Errors
1587    ///
1588    /// Returns [`NotFoundError::Relation`] if no relation with the given ID exists.
1589    #[instrument(skip(self))]
1590    pub fn delete_relation(&self, id: crate::types::RelationId) -> Result<()> {
1591        self.check_writable()?;
1592        let deleted = self.storage.delete_relation(id)?;
1593        if !deleted {
1594            return Err(PulseDBError::from(NotFoundError::relation(id)));
1595        }
1596        info!(id = %id, "Relation deleted");
1597        Ok(())
1598    }
1599
1600    // =========================================================================
1601    // Derived Insights (E3-S02)
1602    // =========================================================================
1603
1604    /// Stores a new derived insight.
1605    ///
1606    /// Creates a synthesized knowledge record from multiple source experiences.
1607    /// The method:
1608    /// 1. Validates the input (content, confidence, sources)
1609    /// 2. Verifies the collective exists
1610    /// 3. Verifies all source experiences exist and belong to the same collective
1611    /// 4. Resolves the embedding (generates if Builtin, requires if External)
1612    /// 5. Stores the insight with inline embedding
1613    /// 6. Inserts into the insight HNSW index
1614    ///
1615    /// # Arguments
1616    ///
1617    /// * `insight` - The insight to store (see [`NewDerivedInsight`])
1618    ///
1619    /// # Errors
1620    ///
1621    /// - [`ValidationError`](crate::ValidationError) if input is invalid
1622    /// - [`NotFoundError::Collective`] if the collective doesn't exist
1623    /// - [`NotFoundError::Experience`] if any source experience doesn't exist
1624    /// - [`ValidationError::InvalidField`] if source experiences belong to
1625    ///   different collectives
1626    /// - [`ValidationError::DimensionMismatch`] if embedding dimension is wrong
1627    #[instrument(skip(self, insight), fields(collective_id = %insight.collective_id))]
1628    pub fn store_insight(&self, insight: NewDerivedInsight) -> Result<InsightId> {
1629        self.check_writable()?;
1630        let is_external = matches!(self.config.embedding_provider, EmbeddingProvider::External);
1631
1632        // Validate input fields
1633        validate_new_insight(&insight)?;
1634
1635        // Verify collective exists
1636        let collective = self
1637            .storage
1638            .get_collective(insight.collective_id)?
1639            .ok_or_else(|| PulseDBError::from(NotFoundError::collective(insight.collective_id)))?;
1640
1641        // Verify all source experiences exist and belong to this collective
1642        for source_id in &insight.source_experience_ids {
1643            let source_exp = self
1644                .storage
1645                .get_experience(*source_id)?
1646                .ok_or_else(|| PulseDBError::from(NotFoundError::experience(*source_id)))?;
1647            if source_exp.collective_id != insight.collective_id {
1648                return Err(PulseDBError::from(ValidationError::invalid_field(
1649                    "source_experience_ids",
1650                    format!(
1651                        "experience {} belongs to collective {}, not {}",
1652                        source_id, source_exp.collective_id, insight.collective_id
1653                    ),
1654                )));
1655            }
1656        }
1657
1658        // Resolve embedding
1659        let embedding = match insight.embedding {
1660            Some(ref emb) => {
1661                // Validate dimension
1662                let expected_dim = collective.embedding_dimension as usize;
1663                if emb.len() != expected_dim {
1664                    return Err(ValidationError::dimension_mismatch(expected_dim, emb.len()).into());
1665                }
1666                emb.clone()
1667            }
1668            None => {
1669                if is_external {
1670                    return Err(PulseDBError::embedding(
1671                        "embedding is required when using External embedding provider",
1672                    ));
1673                }
1674                self.embedding.embed(&insight.content)?
1675            }
1676        };
1677
1678        let embedding_for_hnsw = embedding.clone();
1679        let now = Timestamp::now();
1680        let id = InsightId::new();
1681
1682        // Construct the full insight record
1683        let derived_insight = DerivedInsight {
1684            id,
1685            collective_id: insight.collective_id,
1686            content: insight.content,
1687            embedding,
1688            source_experience_ids: insight.source_experience_ids,
1689            insight_type: insight.insight_type,
1690            confidence: insight.confidence,
1691            domain: insight.domain,
1692            created_at: now,
1693            updated_at: now,
1694        };
1695
1696        // Write to redb FIRST (source of truth)
1697        self.storage.save_insight(&derived_insight)?;
1698
1699        // Insert into insight HNSW index (using InsightId→ExperienceId byte conversion)
1700        let exp_id = ExperienceId::from_bytes(*id.as_bytes());
1701        let insight_vectors = self
1702            .insight_vectors
1703            .read()
1704            .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?;
1705        if let Some(index) = insight_vectors.get(&insight.collective_id) {
1706            index.insert_experience(exp_id, &embedding_for_hnsw)?;
1707        }
1708
1709        info!(id = %id, "Insight stored");
1710        Ok(id)
1711    }
1712
1713    /// Retrieves a derived insight by ID.
1714    ///
1715    /// Returns `None` if no insight with the given ID exists.
1716    #[instrument(skip(self))]
1717    pub fn get_insight(&self, id: InsightId) -> Result<Option<DerivedInsight>> {
1718        self.storage.get_insight(id)
1719    }
1720
1721    /// Searches for insights semantically similar to the query embedding.
1722    ///
1723    /// Uses the insight-specific HNSW index for approximate nearest neighbor
1724    /// search, then fetches full insight records from storage.
1725    ///
1726    /// # Arguments
1727    ///
1728    /// * `collective_id` - The collective to search within
1729    /// * `query` - Query embedding vector (must match collective's dimension)
1730    /// * `k` - Maximum number of results to return
1731    ///
1732    /// # Errors
1733    ///
1734    /// - [`ValidationError::DimensionMismatch`] if `query.len()` doesn't match
1735    /// - [`NotFoundError::Collective`] if the collective doesn't exist
1736    #[instrument(skip(self, query))]
1737    pub fn get_insights(
1738        &self,
1739        collective_id: CollectiveId,
1740        query: &[f32],
1741        k: usize,
1742    ) -> Result<Vec<(DerivedInsight, f32)>> {
1743        // Verify collective exists and check embedding dimension
1744        let collective = self
1745            .storage
1746            .get_collective(collective_id)?
1747            .ok_or_else(|| PulseDBError::from(NotFoundError::collective(collective_id)))?;
1748
1749        let expected_dim = collective.embedding_dimension as usize;
1750        if query.len() != expected_dim {
1751            return Err(ValidationError::dimension_mismatch(expected_dim, query.len()).into());
1752        }
1753
1754        let ef_search = self.config.hnsw.ef_search;
1755
1756        // Search insight HNSW — returns (ExperienceId, distance) pairs
1757        let insight_vectors = self
1758            .insight_vectors
1759            .read()
1760            .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?;
1761
1762        let candidates = match insight_vectors.get(&collective_id) {
1763            Some(index) => index.search_experiences(query, k, ef_search)?,
1764            None => return Ok(vec![]),
1765        };
1766        drop(insight_vectors);
1767
1768        // Convert ExperienceId back to InsightId and fetch records
1769        let mut results = Vec::with_capacity(candidates.len());
1770        for (exp_id, distance) in candidates {
1771            let insight_id = InsightId::from_bytes(*exp_id.as_bytes());
1772            if let Some(insight) = self.storage.get_insight(insight_id)? {
1773                // Convert HNSW distance to similarity (1.0 - distance), matching search_similar pattern
1774                results.push((insight, 1.0 - distance));
1775            }
1776        }
1777
1778        Ok(results)
1779    }
1780
1781    /// Deletes a derived insight by ID.
1782    ///
1783    /// Removes the insight from storage and soft-deletes it from the HNSW index.
1784    ///
1785    /// # Errors
1786    ///
1787    /// Returns [`NotFoundError::Insight`] if no insight with the given ID exists.
1788    #[instrument(skip(self))]
1789    pub fn delete_insight(&self, id: InsightId) -> Result<()> {
1790        self.check_writable()?;
1791        // Read insight first to get collective_id for HNSW lookup
1792        let insight = self
1793            .storage
1794            .get_insight(id)?
1795            .ok_or_else(|| PulseDBError::from(NotFoundError::insight(id)))?;
1796
1797        // Delete from redb FIRST (source of truth)
1798        self.storage.delete_insight(id)?;
1799
1800        // Soft-delete from insight HNSW (using InsightId→ExperienceId byte conversion)
1801        let exp_id = ExperienceId::from_bytes(*id.as_bytes());
1802        let insight_vectors = self
1803            .insight_vectors
1804            .read()
1805            .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?;
1806        if let Some(index) = insight_vectors.get(&insight.collective_id) {
1807            index.delete_experience(exp_id)?;
1808        }
1809
1810        info!(id = %id, "Insight deleted");
1811        Ok(())
1812    }
1813
1814    // =========================================================================
1815    // Activity Tracking (E3-S03)
1816    // =========================================================================
1817
1818    /// Registers an agent's presence in a collective.
1819    ///
1820    /// Creates a new activity record or replaces an existing one for the
1821    /// same `(collective_id, agent_id)` pair (upsert semantics). Both
1822    /// `started_at` and `last_heartbeat` are set to `Timestamp::now()`.
1823    ///
1824    /// # Arguments
1825    ///
1826    /// * `activity` - The activity registration (see [`NewActivity`])
1827    ///
1828    /// # Errors
1829    ///
1830    /// - [`ValidationError`] if agent_id is empty or fields exceed size limits
1831    /// - [`NotFoundError::Collective`] if the collective doesn't exist
1832    ///
1833    /// # Example
1834    ///
1835    /// ```rust
1836    /// # fn main() -> pulsedb::Result<()> {
1837    /// # let dir = tempfile::tempdir().unwrap();
1838    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
1839    /// # let collective_id = db.create_collective("example")?;
1840    /// use pulsedb::NewActivity;
1841    ///
1842    /// db.register_activity(NewActivity {
1843    ///     agent_id: "claude-opus".to_string(),
1844    ///     collective_id,
1845    ///     current_task: Some("Reviewing pull request".to_string()),
1846    ///     context_summary: None,
1847    /// })?;
1848    /// # Ok(())
1849    /// # }
1850    /// ```
1851    #[instrument(skip(self, activity), fields(agent_id = %activity.agent_id, collective_id = %activity.collective_id))]
1852    pub fn register_activity(&self, activity: NewActivity) -> Result<()> {
1853        // Validate input
1854        validate_new_activity(&activity)?;
1855
1856        // Verify collective exists
1857        self.storage
1858            .get_collective(activity.collective_id)?
1859            .ok_or_else(|| PulseDBError::from(NotFoundError::collective(activity.collective_id)))?;
1860
1861        // Build stored activity with timestamps
1862        let now = Timestamp::now();
1863        let stored = Activity {
1864            agent_id: activity.agent_id,
1865            collective_id: activity.collective_id,
1866            current_task: activity.current_task,
1867            context_summary: activity.context_summary,
1868            started_at: now,
1869            last_heartbeat: now,
1870        };
1871
1872        self.storage.save_activity(&stored)?;
1873
1874        info!(
1875            agent_id = %stored.agent_id,
1876            collective_id = %stored.collective_id,
1877            "Activity registered"
1878        );
1879        Ok(())
1880    }
1881
1882    /// Updates an agent's heartbeat timestamp.
1883    ///
1884    /// Refreshes the `last_heartbeat` to `Timestamp::now()` without changing
1885    /// any other fields. The agent must have an existing activity registered.
1886    ///
1887    /// # Errors
1888    ///
1889    /// - [`NotFoundError::Activity`] if no activity exists for the agent/collective pair
1890    #[instrument(skip(self))]
1891    pub fn update_heartbeat(&self, agent_id: &str, collective_id: CollectiveId) -> Result<()> {
1892        self.check_writable()?;
1893        let mut activity = self
1894            .storage
1895            .get_activity(agent_id, collective_id)?
1896            .ok_or_else(|| {
1897                PulseDBError::from(NotFoundError::activity(format!(
1898                    "{} in {}",
1899                    agent_id, collective_id
1900                )))
1901            })?;
1902
1903        activity.last_heartbeat = Timestamp::now();
1904        self.storage.save_activity(&activity)?;
1905
1906        info!(agent_id = %agent_id, collective_id = %collective_id, "Heartbeat updated");
1907        Ok(())
1908    }
1909
1910    /// Ends an agent's activity in a collective.
1911    ///
1912    /// Removes the activity record. After calling this, the agent will no
1913    /// longer appear in `get_active_agents()` results.
1914    ///
1915    /// # Errors
1916    ///
1917    /// - [`NotFoundError::Activity`] if no activity exists for the agent/collective pair
1918    #[instrument(skip(self))]
1919    pub fn end_activity(&self, agent_id: &str, collective_id: CollectiveId) -> Result<()> {
1920        let deleted = self.storage.delete_activity(agent_id, collective_id)?;
1921
1922        if !deleted {
1923            return Err(PulseDBError::from(NotFoundError::activity(format!(
1924                "{} in {}",
1925                agent_id, collective_id
1926            ))));
1927        }
1928
1929        info!(agent_id = %agent_id, collective_id = %collective_id, "Activity ended");
1930        Ok(())
1931    }
1932
1933    /// Returns all active (non-stale) agents in a collective.
1934    ///
1935    /// Fetches all activities, filters out those whose `last_heartbeat` is
1936    /// older than `config.activity.stale_threshold`, and returns the rest
1937    /// sorted by `last_heartbeat` descending (most recently active first).
1938    ///
1939    /// # Errors
1940    ///
1941    /// - [`NotFoundError::Collective`] if the collective doesn't exist
1942    #[instrument(skip(self))]
1943    pub fn get_active_agents(&self, collective_id: CollectiveId) -> Result<Vec<Activity>> {
1944        // Verify collective exists
1945        self.storage
1946            .get_collective(collective_id)?
1947            .ok_or_else(|| PulseDBError::from(NotFoundError::collective(collective_id)))?;
1948
1949        let all_activities = self.storage.list_activities_in_collective(collective_id)?;
1950
1951        // Filter stale activities
1952        let now = Timestamp::now();
1953        let threshold_ms = self.config.activity.stale_threshold.as_millis() as i64;
1954        let cutoff = now.as_millis() - threshold_ms;
1955
1956        let mut active: Vec<Activity> = all_activities
1957            .into_iter()
1958            .filter(|a| a.last_heartbeat.as_millis() >= cutoff)
1959            .collect();
1960
1961        // Sort by last_heartbeat descending (most recently active first)
1962        active.sort_by(|a, b| b.last_heartbeat.cmp(&a.last_heartbeat));
1963
1964        Ok(active)
1965    }
1966
1967    // =========================================================================
1968    // Context Candidates (E2-S04)
1969    // =========================================================================
1970
1971    /// Retrieves unified context candidates from all retrieval primitives.
1972    ///
1973    /// This is the primary API for context assembly. It orchestrates:
1974    /// 1. Similarity search ([`search_similar_filtered`](Self::search_similar_filtered))
1975    /// 2. Recent experiences ([`get_recent_experiences_filtered`](Self::get_recent_experiences_filtered))
1976    /// 3. Insight search ([`get_insights`](Self::get_insights)) — if requested
1977    /// 4. Relation collection ([`get_related_experiences`](Self::get_related_experiences)) — if requested
1978    /// 5. Active agents ([`get_active_agents`](Self::get_active_agents)) — if requested
1979    ///
1980    /// # Arguments
1981    ///
1982    /// * `request` - Configuration for which primitives to query and limits
1983    ///
1984    /// # Errors
1985    ///
1986    /// - [`ValidationError::InvalidField`] if `max_similar` or `max_recent` is 0 or > 1000
1987    /// - [`ValidationError::DimensionMismatch`] if `query_embedding.len()` doesn't match
1988    ///   the collective's embedding dimension
1989    /// - [`NotFoundError::Collective`] if the collective doesn't exist
1990    ///
1991    /// # Performance
1992    ///
1993    /// Target: < 100ms at 100K experiences. The similarity search (~50ms) dominates;
1994    /// all other sub-calls are < 10ms each.
1995    ///
1996    /// # Example
1997    ///
1998    /// ```rust
1999    /// # fn main() -> pulsedb::Result<()> {
2000    /// # let dir = tempfile::tempdir().unwrap();
2001    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
2002    /// # let collective_id = db.create_collective("example")?;
2003    /// # let query_vec = vec![0.1f32; 384];
2004    /// use pulsedb::{ContextRequest, SearchFilter};
2005    ///
2006    /// let candidates = db.get_context_candidates(ContextRequest {
2007    ///     collective_id,
2008    ///     query_embedding: query_vec,
2009    ///     max_similar: 10,
2010    ///     max_recent: 5,
2011    ///     include_insights: true,
2012    ///     include_relations: true,
2013    ///     include_active_agents: true,
2014    ///     filter: SearchFilter {
2015    ///         domains: Some(vec!["rust".to_string()]),
2016    ///         ..SearchFilter::default()
2017    ///     },
2018    ///     ..ContextRequest::default()
2019    /// })?;
2020    /// # Ok(())
2021    /// # }
2022    /// ```
2023    #[instrument(skip(self, request), fields(collective_id = %request.collective_id))]
2024    pub fn get_context_candidates(&self, request: ContextRequest) -> Result<ContextCandidates> {
2025        // ── Validate limits ──────────────────────────────────────
2026        if request.max_similar == 0 || request.max_similar > 1000 {
2027            return Err(ValidationError::invalid_field(
2028                "max_similar",
2029                "must be between 1 and 1000",
2030            )
2031            .into());
2032        }
2033        if request.max_recent == 0 || request.max_recent > 1000 {
2034            return Err(
2035                ValidationError::invalid_field("max_recent", "must be between 1 and 1000").into(),
2036            );
2037        }
2038
2039        // ── Verify collective exists and check dimension ─────────
2040        let collective = self
2041            .storage
2042            .get_collective(request.collective_id)?
2043            .ok_or_else(|| PulseDBError::from(NotFoundError::collective(request.collective_id)))?;
2044
2045        let expected_dim = collective.embedding_dimension as usize;
2046        if request.query_embedding.len() != expected_dim {
2047            return Err(ValidationError::dimension_mismatch(
2048                expected_dim,
2049                request.query_embedding.len(),
2050            )
2051            .into());
2052        }
2053
2054        // ── 1. Similar experiences (HNSW vector search) ──────────
2055        let similar_experiences = self.search_similar_filtered(
2056            request.collective_id,
2057            &request.query_embedding,
2058            request.max_similar,
2059            request.filter.clone(),
2060        )?;
2061
2062        // ── 2. Recent experiences (timestamp index scan) ─────────
2063        let recent_experiences = self.get_recent_experiences_filtered(
2064            request.collective_id,
2065            request.max_recent,
2066            request.filter,
2067        )?;
2068
2069        // ── 3. Insights (HNSW vector search on insight index) ────
2070        let insights = if request.include_insights {
2071            self.get_insights(
2072                request.collective_id,
2073                &request.query_embedding,
2074                request.max_similar,
2075            )?
2076            .into_iter()
2077            .map(|(insight, _score)| insight)
2078            .collect()
2079        } else {
2080            vec![]
2081        };
2082
2083        // ── 4. Relations (graph traversal from result experiences) ─
2084        let relations = if request.include_relations {
2085            use std::collections::HashSet;
2086
2087            let mut seen = HashSet::new();
2088            let mut all_relations = Vec::new();
2089
2090            // Collect unique experience IDs from both result sets
2091            let exp_ids: Vec<_> = similar_experiences
2092                .iter()
2093                .map(|r| r.experience.id)
2094                .chain(recent_experiences.iter().map(|e| e.id))
2095                .collect();
2096
2097            for exp_id in exp_ids {
2098                let related =
2099                    self.get_related_experiences(exp_id, crate::relation::RelationDirection::Both)?;
2100
2101                for (_experience, relation) in related {
2102                    if seen.insert(relation.id) {
2103                        all_relations.push(relation);
2104                    }
2105                }
2106            }
2107
2108            all_relations
2109        } else {
2110            vec![]
2111        };
2112
2113        // ── 5. Active agents (staleness-filtered activity records) ─
2114        let active_agents = if request.include_active_agents {
2115            self.get_active_agents(request.collective_id)?
2116        } else {
2117            vec![]
2118        };
2119
2120        Ok(ContextCandidates {
2121            similar_experiences,
2122            recent_experiences,
2123            insights,
2124            relations,
2125            active_agents,
2126        })
2127    }
2128
2129    // =========================================================================
2130    // Watch System (E4-S01)
2131    // =========================================================================
2132
2133    /// Subscribes to all experience changes in a collective.
2134    ///
2135    /// Returns a [`WatchStream`] that yields [`WatchEvent`] values for every
2136    /// create, update, archive, and delete operation. The stream ends when
2137    /// dropped or when the `PulseDB` instance is closed.
2138    ///
2139    /// Multiple subscribers per collective are supported. Each gets an
2140    /// independent copy of every event.
2141    ///
2142    /// # Example
2143    ///
2144    /// ```rust,no_run
2145    /// # #[tokio::main]
2146    /// # async fn main() -> pulsedb::Result<()> {
2147    /// # let dir = tempfile::tempdir().unwrap();
2148    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
2149    /// # let collective_id = db.create_collective("example")?;
2150    /// use futures::StreamExt;
2151    ///
2152    /// let mut stream = db.watch_experiences(collective_id)?;
2153    /// while let Some(event) = stream.next().await {
2154    ///     println!("{:?}: {}", event.event_type, event.experience_id);
2155    /// }
2156    /// # Ok(())
2157    /// # }
2158    /// ```
2159    pub fn watch_experiences(&self, collective_id: CollectiveId) -> Result<WatchStream> {
2160        self.watch.subscribe(collective_id, None)
2161    }
2162
2163    /// Subscribes to filtered experience changes in a collective.
2164    ///
2165    /// Like [`watch_experiences`](Self::watch_experiences), but only delivers
2166    /// events that match the filter criteria. Filters are applied on the
2167    /// sender side before channel delivery.
2168    ///
2169    /// # Example
2170    ///
2171    /// ```rust
2172    /// # fn main() -> pulsedb::Result<()> {
2173    /// # let dir = tempfile::tempdir().unwrap();
2174    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
2175    /// # let collective_id = db.create_collective("example")?;
2176    /// use pulsedb::WatchFilter;
2177    ///
2178    /// let filter = WatchFilter {
2179    ///     domains: Some(vec!["security".to_string()]),
2180    ///     min_importance: Some(0.7),
2181    ///     ..Default::default()
2182    /// };
2183    /// let mut stream = db.watch_experiences_filtered(collective_id, filter)?;
2184    /// # Ok(())
2185    /// # }
2186    /// ```
2187    pub fn watch_experiences_filtered(
2188        &self,
2189        collective_id: CollectiveId,
2190        filter: WatchFilter,
2191    ) -> Result<WatchStream> {
2192        self.watch.subscribe(collective_id, Some(filter))
2193    }
2194
2195    // =========================================================================
2196    // Cross-Process Watch (E4-S02)
2197    // =========================================================================
2198
2199    /// Returns the current WAL sequence number.
2200    ///
2201    /// Use this to establish a baseline before starting to poll for changes.
2202    /// Returns 0 if no experience writes have occurred yet.
2203    ///
2204    /// # Example
2205    ///
2206    /// ```rust
2207    /// # fn main() -> pulsedb::Result<()> {
2208    /// # let dir = tempfile::tempdir().unwrap();
2209    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
2210    /// let seq = db.get_current_sequence()?;
2211    /// // ... later ...
2212    /// let (events, new_seq) = db.poll_changes(seq)?;
2213    /// # Ok(())
2214    /// # }
2215    /// ```
2216    pub fn get_current_sequence(&self) -> Result<u64> {
2217        self.storage.get_wal_sequence()
2218    }
2219
2220    /// Polls for experience changes since the given sequence number.
2221    ///
2222    /// Returns a tuple of `(events, new_sequence)`:
2223    /// - `events`: New [`WatchEvent`]s in sequence order
2224    /// - `new_sequence`: Pass this value back on the next call
2225    ///
2226    /// Returns an empty vec and the same sequence if no changes exist.
2227    ///
2228    /// # Arguments
2229    ///
2230    /// * `since_seq` - The last sequence number you received (0 for first call)
2231    ///
2232    /// # Performance
2233    ///
2234    /// Target: < 10ms per call. Internally performs a range scan on the
2235    /// watch_events table, O(k) where k is the number of new events.
2236    ///
2237    /// # Example
2238    ///
2239    /// ```rust,no_run
2240    /// # fn main() -> pulsedb::Result<()> {
2241    /// # let dir = tempfile::tempdir().unwrap();
2242    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
2243    /// use std::time::Duration;
2244    ///
2245    /// let mut seq = 0u64;
2246    /// loop {
2247    ///     let (events, new_seq) = db.poll_changes(seq)?;
2248    ///     seq = new_seq;
2249    ///     for event in events {
2250    ///         println!("{:?}: {}", event.event_type, event.experience_id);
2251    ///     }
2252    ///     std::thread::sleep(Duration::from_millis(100));
2253    /// }
2254    /// # }
2255    /// ```
2256    pub fn poll_changes(&self, since_seq: u64) -> Result<(Vec<WatchEvent>, u64)> {
2257        use crate::storage::schema::EntityTypeTag;
2258        let (records, new_seq) = self.storage.poll_watch_events(since_seq, 1000)?;
2259        let events = records
2260            .into_iter()
2261            .filter(|r| r.entity_type == EntityTypeTag::Experience)
2262            .map(WatchEvent::from)
2263            .collect();
2264        Ok((events, new_seq))
2265    }
2266
2267    /// Polls for changes with a custom batch size limit.
2268    ///
2269    /// Same as [`poll_changes`](Self::poll_changes) but returns at most
2270    /// `limit` events per call. Use this for backpressure control.
2271    pub fn poll_changes_batch(
2272        &self,
2273        since_seq: u64,
2274        limit: usize,
2275    ) -> Result<(Vec<WatchEvent>, u64)> {
2276        use crate::storage::schema::EntityTypeTag;
2277        let (records, new_seq) = self.storage.poll_watch_events(since_seq, limit)?;
2278        let events = records
2279            .into_iter()
2280            .filter(|r| r.entity_type == EntityTypeTag::Experience)
2281            .map(WatchEvent::from)
2282            .collect();
2283        Ok((events, new_seq))
2284    }
2285
2286    // =========================================================================
2287    // Sync WAL Compaction (feature: sync)
2288    // =========================================================================
2289
2290    /// Compacts the WAL by removing events that all peers have already synced.
2291    ///
2292    /// Finds the minimum cursor across all known peers and deletes WAL events
2293    /// up to that sequence. If no peers exist, no compaction occurs (events
2294    /// may be needed when a peer connects later).
2295    ///
2296    /// Call this periodically (e.g., daily) to reclaim disk space.
2297    /// Returns the number of WAL events deleted.
2298    ///
2299    /// # Example
2300    ///
2301    /// ```rust,no_run
2302    /// # fn main() -> pulsedb::Result<()> {
2303    /// # let dir = tempfile::tempdir().unwrap();
2304    /// # let db = pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default())?;
2305    /// let deleted = db.compact_wal()?;
2306    /// println!("Compacted {} WAL events", deleted);
2307    /// # Ok(())
2308    /// # }
2309    /// ```
2310    #[cfg(feature = "sync")]
2311    pub fn compact_wal(&self) -> Result<u64> {
2312        let cursors = self
2313            .storage
2314            .list_sync_cursors()
2315            .map_err(|e| PulseDBError::internal(format!("Failed to list sync cursors: {}", e)))?;
2316
2317        if cursors.is_empty() {
2318            // No peers — don't compact (events may be needed later)
2319            return Ok(0);
2320        }
2321
2322        let min_seq = cursors.iter().map(|c| c.last_sequence).min().unwrap_or(0);
2323
2324        if min_seq == 0 {
2325            return Ok(0);
2326        }
2327
2328        let deleted = self.storage.compact_wal_events(min_seq)?;
2329        info!(deleted, min_seq, "WAL compacted");
2330        Ok(deleted)
2331    }
2332
2333    // =========================================================================
2334    // Sync Apply Methods (feature: sync)
2335    // =========================================================================
2336    //
2337    // These methods apply remote changes received via sync. They bypass
2338    // validation and embedding generation (data was validated on the source).
2339    // WAL recording is suppressed by the SyncApplyGuard (entered by the caller).
2340    // Watch emit is skipped (no in-process notifications for sync changes).
2341    //
2342    // These are pub(crate) and will be called by the sync applier in Phase 3.
2343
2344    /// Applies a synced experience from a remote peer.
2345    ///
2346    /// Writes the full experience to storage and inserts into HNSW.
2347    /// Caller must hold `SyncApplyGuard` to suppress WAL recording.
2348    #[cfg(feature = "sync")]
2349    #[allow(dead_code)] // Called by sync applier (Phase 3)
2350    pub fn apply_synced_experience(&self, experience: Experience) -> Result<()> {
2351        let collective_id = experience.collective_id;
2352        let id = experience.id;
2353        let embedding = experience.embedding.clone();
2354
2355        self.storage.save_experience(&experience)?;
2356
2357        // Insert into HNSW index
2358        let vectors = self
2359            .vectors
2360            .read()
2361            .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?;
2362        if let Some(index) = vectors.get(&collective_id) {
2363            index.insert_experience(id, &embedding)?;
2364        }
2365
2366        debug!(id = %id, "Synced experience applied");
2367        Ok(())
2368    }
2369
2370    /// Applies a synced experience update from a remote peer.
2371    ///
2372    /// Caller must hold `SyncApplyGuard` to suppress WAL recording.
2373    #[cfg(feature = "sync")]
2374    #[allow(dead_code)] // Called by sync applier (Phase 3)
2375    pub fn apply_synced_experience_update(
2376        &self,
2377        id: ExperienceId,
2378        update: ExperienceUpdate,
2379    ) -> Result<()> {
2380        self.storage.update_experience(id, &update)?;
2381        debug!(id = %id, "Synced experience update applied");
2382        Ok(())
2383    }
2384
2385    /// Applies a synced experience deletion from a remote peer.
2386    ///
2387    /// Removes from storage and soft-deletes from HNSW.
2388    /// Caller must hold `SyncApplyGuard` to suppress WAL recording.
2389    #[cfg(feature = "sync")]
2390    #[allow(dead_code)] // Called by sync applier (Phase 3)
2391    pub fn apply_synced_experience_delete(&self, id: ExperienceId) -> Result<()> {
2392        // Get collective_id for HNSW lookup before deleting
2393        if let Some(exp) = self.storage.get_experience(id)? {
2394            let collective_id = exp.collective_id;
2395
2396            // Cascade delete relations
2397            self.storage.delete_relations_for_experience(id)?;
2398
2399            self.storage.delete_experience(id)?;
2400
2401            // Soft-delete from HNSW
2402            let vectors = self
2403                .vectors
2404                .read()
2405                .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?;
2406            if let Some(index) = vectors.get(&collective_id) {
2407                index.delete_experience(id)?;
2408            }
2409        }
2410
2411        debug!(id = %id, "Synced experience delete applied");
2412        Ok(())
2413    }
2414
2415    /// Applies a synced relation from a remote peer.
2416    ///
2417    /// Caller must hold `SyncApplyGuard` to suppress WAL recording.
2418    #[cfg(feature = "sync")]
2419    #[allow(dead_code)] // Called by sync applier (Phase 3)
2420    pub fn apply_synced_relation(&self, relation: ExperienceRelation) -> Result<()> {
2421        let id = relation.id;
2422        self.storage.save_relation(&relation)?;
2423        debug!(id = %id, "Synced relation applied");
2424        Ok(())
2425    }
2426
2427    /// Applies a synced relation deletion from a remote peer.
2428    ///
2429    /// Caller must hold `SyncApplyGuard` to suppress WAL recording.
2430    #[cfg(feature = "sync")]
2431    #[allow(dead_code)] // Called by sync applier (Phase 3)
2432    pub fn apply_synced_relation_delete(&self, id: RelationId) -> Result<()> {
2433        self.storage.delete_relation(id)?;
2434        debug!(id = %id, "Synced relation delete applied");
2435        Ok(())
2436    }
2437
2438    /// Applies a synced insight from a remote peer.
2439    ///
2440    /// Writes to storage and inserts into insight HNSW index.
2441    /// Caller must hold `SyncApplyGuard` to suppress WAL recording.
2442    #[cfg(feature = "sync")]
2443    #[allow(dead_code)] // Called by sync applier (Phase 3)
2444    pub fn apply_synced_insight(&self, insight: DerivedInsight) -> Result<()> {
2445        let id = insight.id;
2446        let collective_id = insight.collective_id;
2447        let embedding = insight.embedding.clone();
2448
2449        self.storage.save_insight(&insight)?;
2450
2451        // Insert into insight HNSW (using InsightId→ExperienceId byte conversion)
2452        let exp_id = ExperienceId::from_bytes(*id.as_bytes());
2453        let insight_vectors = self
2454            .insight_vectors
2455            .read()
2456            .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?;
2457        if let Some(index) = insight_vectors.get(&collective_id) {
2458            index.insert_experience(exp_id, &embedding)?;
2459        }
2460
2461        debug!(id = %id, "Synced insight applied");
2462        Ok(())
2463    }
2464
2465    /// Applies a synced insight deletion from a remote peer.
2466    ///
2467    /// Removes from storage and soft-deletes from insight HNSW.
2468    /// Caller must hold `SyncApplyGuard` to suppress WAL recording.
2469    #[cfg(feature = "sync")]
2470    #[allow(dead_code)] // Called by sync applier (Phase 3)
2471    pub fn apply_synced_insight_delete(&self, id: InsightId) -> Result<()> {
2472        if let Some(insight) = self.storage.get_insight(id)? {
2473            self.storage.delete_insight(id)?;
2474
2475            // Soft-delete from insight HNSW
2476            let exp_id = ExperienceId::from_bytes(*id.as_bytes());
2477            let insight_vectors = self
2478                .insight_vectors
2479                .read()
2480                .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?;
2481            if let Some(index) = insight_vectors.get(&insight.collective_id) {
2482                index.delete_experience(exp_id)?;
2483            }
2484        }
2485
2486        debug!(id = %id, "Synced insight delete applied");
2487        Ok(())
2488    }
2489
2490    /// Applies a synced collective from a remote peer.
2491    ///
2492    /// Writes to storage and creates HNSW indexes for the collective.
2493    /// Caller must hold `SyncApplyGuard` to suppress WAL recording.
2494    #[cfg(feature = "sync")]
2495    #[allow(dead_code)] // Called by sync applier (Phase 3)
2496    pub fn apply_synced_collective(&self, collective: Collective) -> Result<()> {
2497        let id = collective.id;
2498        let dimension = collective.embedding_dimension as usize;
2499
2500        self.storage.save_collective(&collective)?;
2501
2502        // Create HNSW indexes (same as create_collective)
2503        let exp_index = crate::vector::HnswIndex::new(dimension, &self.config.hnsw);
2504        let insight_index = crate::vector::HnswIndex::new(dimension, &self.config.hnsw);
2505        self.vectors
2506            .write()
2507            .map_err(|_| PulseDBError::vector("Vectors lock poisoned"))?
2508            .insert(id, exp_index);
2509        self.insight_vectors
2510            .write()
2511            .map_err(|_| PulseDBError::vector("Insight vectors lock poisoned"))?
2512            .insert(id, insight_index);
2513
2514        debug!(id = %id, "Synced collective applied");
2515        Ok(())
2516    }
2517}
2518
2519// PulseDB is auto Send + Sync: Box<dyn StorageEngine + Send + Sync>,
2520// Box<dyn EmbeddingService + Send + Sync>, and Config are all Send + Sync.
2521
2522#[cfg(test)]
2523mod tests {
2524    use super::*;
2525    use crate::config::EmbeddingDimension;
2526    use tempfile::tempdir;
2527
2528    #[test]
2529    fn test_open_creates_database() {
2530        let dir = tempdir().unwrap();
2531        let path = dir.path().join("test.db");
2532
2533        let db = PulseDB::open(&path, Config::default()).unwrap();
2534
2535        assert!(path.exists());
2536        assert_eq!(db.embedding_dimension(), 384);
2537
2538        db.close().unwrap();
2539    }
2540
2541    #[test]
2542    fn test_open_existing_database() {
2543        let dir = tempdir().unwrap();
2544        let path = dir.path().join("test.db");
2545
2546        // Create
2547        let db = PulseDB::open(&path, Config::default()).unwrap();
2548        db.close().unwrap();
2549
2550        // Reopen
2551        let db = PulseDB::open(&path, Config::default()).unwrap();
2552        assert_eq!(db.embedding_dimension(), 384);
2553        db.close().unwrap();
2554    }
2555
2556    #[test]
2557    fn test_config_validation() {
2558        let dir = tempdir().unwrap();
2559        let path = dir.path().join("test.db");
2560
2561        let invalid_config = Config {
2562            cache_size_mb: 0, // Invalid
2563            ..Default::default()
2564        };
2565
2566        let result = PulseDB::open(&path, invalid_config);
2567        assert!(result.is_err());
2568    }
2569
2570    #[test]
2571    fn test_dimension_mismatch() {
2572        let dir = tempdir().unwrap();
2573        let path = dir.path().join("test.db");
2574
2575        // Create with D384
2576        let db = PulseDB::open(
2577            &path,
2578            Config {
2579                embedding_dimension: EmbeddingDimension::D384,
2580                ..Default::default()
2581            },
2582        )
2583        .unwrap();
2584        db.close().unwrap();
2585
2586        // Try to reopen with D768
2587        let result = PulseDB::open(
2588            &path,
2589            Config {
2590                embedding_dimension: EmbeddingDimension::D768,
2591                ..Default::default()
2592            },
2593        );
2594
2595        assert!(result.is_err());
2596    }
2597
2598    #[test]
2599    fn test_metadata_access() {
2600        let dir = tempdir().unwrap();
2601        let path = dir.path().join("test.db");
2602
2603        let db = PulseDB::open(&path, Config::default()).unwrap();
2604
2605        let metadata = db.metadata();
2606        assert_eq!(metadata.embedding_dimension, EmbeddingDimension::D384);
2607
2608        db.close().unwrap();
2609    }
2610
2611    #[test]
2612    fn test_pulsedb_is_send_sync() {
2613        fn assert_send_sync<T: Send + Sync>() {}
2614        assert_send_sync::<PulseDB>();
2615    }
2616}