Skip to main content

pulsedb/storage/
redb.rs

1//! redb storage engine implementation.
2//!
3//! This module provides the primary storage backend for PulseDB using
4//! [redb](https://docs.rs/redb), a pure Rust embedded key-value store.
5//!
6//! # Features
7//!
8//! - ACID transactions with MVCC
9//! - Single-writer, multiple-reader concurrency
10//! - Automatic crash recovery
11//! - Zero external dependencies (pure Rust)
12//!
13//! # File Layout
14//!
15//! When you open a database at `./pulse.db`, redb creates:
16//! - `./pulse.db` - Main database file
17//! - `./pulse.db.lock` - Lock file for writer coordination (may not be visible)
18
19use std::path::{Path, PathBuf};
20
21use ::redb::{Database, ReadableTable};
22use tracing::{debug, info, instrument, warn};
23
24use crate::activity::Activity;
25use crate::collective::Collective;
26use crate::experience::{Experience, ExperienceUpdate};
27use crate::insight::DerivedInsight;
28use crate::relation::{ExperienceRelation, RelationType};
29use crate::types::{CollectiveId, ExperienceId, InsightId, RelationId, Timestamp};
30
31use super::schema::{
32    decode_collective_from_activity_key, encode_activity_key, encode_type_index_key,
33    DatabaseMetadata, EntityTypeTag, ExperienceTypeTag, WatchEventRecord, WatchEventTypeTag,
34    ACTIVITIES_TABLE, COLLECTIVES_TABLE, EMBEDDINGS_TABLE, EXPERIENCES_BY_COLLECTIVE_TABLE,
35    EXPERIENCES_BY_TYPE_TABLE, EXPERIENCES_TABLE, INSIGHTS_BY_COLLECTIVE_TABLE, INSIGHTS_TABLE,
36    METADATA_TABLE, RELATIONS_BY_SOURCE_TABLE, RELATIONS_BY_TARGET_TABLE, RELATIONS_TABLE,
37    SCHEMA_VERSION, WAL_SEQUENCE_KEY, WATCH_EVENTS_TABLE,
38};
39#[cfg(feature = "sync")]
40use super::schema::{INSTANCE_ID_KEY, SYNC_CURSORS_TABLE};
41use super::StorageEngine;
42use crate::config::{Config, EmbeddingDimension};
43use crate::error::{PulseDBError, Result, StorageError, ValidationError};
44
45/// Metadata key in the metadata table.
46const METADATA_KEY: &str = "db_metadata";
47
48/// redb storage engine wrapper.
49///
50/// This struct holds the redb database handle and cached metadata.
51/// It implements [`StorageEngine`] for use with PulseDB.
52///
53/// # Thread Safety
54///
55/// `RedbStorage` is `Send + Sync`. redb handles internal synchronization
56/// using MVCC for readers and exclusive locking for writers.
57#[derive(Debug)]
58pub struct RedbStorage {
59    /// The redb database handle.
60    db: Database,
61
62    /// Cached database metadata.
63    metadata: DatabaseMetadata,
64
65    /// Path to the database file.
66    path: PathBuf,
67
68    /// Persistent instance ID for sync protocol (only with `sync` feature).
69    #[cfg(feature = "sync")]
70    instance_id: crate::sync::InstanceId,
71}
72
73impl RedbStorage {
74    /// Opens or creates a database at the given path.
75    ///
76    /// If the database doesn't exist, it will be created and initialized
77    /// with the configuration settings. If it exists, the configuration
78    /// will be validated against the stored metadata.
79    ///
80    /// # Arguments
81    ///
82    /// * `path` - Path to the database file
83    /// * `config` - Database configuration
84    ///
85    /// # Errors
86    ///
87    /// Returns an error if:
88    /// - The database file is corrupted
89    /// - The database is locked by another process
90    /// - Schema version doesn't match
91    /// - Embedding dimension doesn't match (for existing databases)
92    ///
93    /// # Example
94    ///
95    /// ```rust
96    /// # fn main() -> pulsedb::Result<()> {
97    /// # let dir = tempfile::tempdir().unwrap();
98    /// use pulsedb::{Config, storage::RedbStorage};
99    ///
100    /// let storage = RedbStorage::open(dir.path().join("test.db"), &Config::default())?;
101    /// # Ok(())
102    /// # }
103    /// ```
104    #[instrument(skip(config), fields(path = %path.as_ref().display()))]
105    pub fn open(path: impl AsRef<Path>, config: &Config) -> Result<Self> {
106        let path = path.as_ref();
107        let db_exists = path.exists();
108
109        debug!(db_exists = db_exists, "Opening storage engine");
110
111        // Create or open the database
112        let db = Self::create_database(path, config)?;
113
114        if db_exists {
115            // Validate existing database
116            Self::open_existing(db, path.to_path_buf(), config)
117        } else {
118            // Initialize new database
119            Self::initialize_new(db, path.to_path_buf(), config)
120        }
121    }
122
123    /// Creates the redb database with appropriate settings.
124    fn create_database(path: &Path, _config: &Config) -> Result<Database> {
125        let builder = Database::builder();
126
127        // Note: redb 2.x doesn't have set_cache_size, it manages memory internally
128        // The cache_size_mb config will be used for future optimizations
129
130        // Note: redb doesn't expose a typed error variant for lock conflicts,
131        // so we detect them via error message string matching. This may need
132        // updating if redb changes its error messages in a future version.
133        let db = builder.create(path).map_err(|e| {
134            if e.to_string().contains("locked") {
135                StorageError::DatabaseLocked
136            } else {
137                StorageError::Redb(e.to_string())
138            }
139        })?;
140
141        debug!("Database file opened successfully");
142        Ok(db)
143    }
144
145    /// Initializes a new database with tables and metadata.
146    #[instrument(skip(db, config), fields(path = %path.display()))]
147    fn initialize_new(db: Database, path: PathBuf, config: &Config) -> Result<Self> {
148        info!("Initializing new database");
149
150        let metadata = DatabaseMetadata::new(config.embedding_dimension);
151
152        // Create all tables and write metadata in a single transaction
153        let write_txn = db.begin_write().map_err(StorageError::from)?;
154
155        {
156            // Create the metadata table and write metadata
157            let mut meta_table = write_txn.open_table(METADATA_TABLE)?;
158            let metadata_bytes = bincode::serialize(&metadata)
159                .map_err(|e| StorageError::serialization(e.to_string()))?;
160            meta_table.insert(METADATA_KEY, metadata_bytes.as_slice())?;
161
162            // Create other tables (they're created on first access)
163            let _ = write_txn.open_table(COLLECTIVES_TABLE)?;
164            let _ = write_txn.open_table(EXPERIENCES_TABLE)?;
165            let _ = write_txn.open_table(EMBEDDINGS_TABLE)?;
166            let _ = write_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
167            let _ = write_txn.open_multimap_table(EXPERIENCES_BY_TYPE_TABLE)?;
168            let _ = write_txn.open_table(RELATIONS_TABLE)?;
169            let _ = write_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
170            let _ = write_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
171            let _ = write_txn.open_table(INSIGHTS_TABLE)?;
172            let _ = write_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
173            let _ = write_txn.open_table(ACTIVITIES_TABLE)?;
174            let _ = write_txn.open_table(WATCH_EVENTS_TABLE)?;
175
176            // Sync tables and instance ID (behind feature gate)
177            #[cfg(feature = "sync")]
178            {
179                let instance_id = crate::sync::InstanceId::new();
180                meta_table.insert(INSTANCE_ID_KEY, instance_id.as_bytes().as_slice())?;
181                let _ = write_txn.open_table(SYNC_CURSORS_TABLE)?;
182            }
183        }
184
185        write_txn.commit().map_err(StorageError::from)?;
186
187        // Load instance ID for the struct (behind feature gate)
188        #[cfg(feature = "sync")]
189        let instance_id = {
190            let read_txn = db.begin_read().map_err(StorageError::from)?;
191            let meta_table = read_txn.open_table(METADATA_TABLE)?;
192            let entry = meta_table
193                .get(INSTANCE_ID_KEY)?
194                .ok_or_else(|| StorageError::corrupted("Missing instance_id after init"))?;
195            let bytes: [u8; 16] = entry
196                .value()
197                .try_into()
198                .map_err(|_| StorageError::corrupted("invalid instance_id bytes"))?;
199            crate::sync::InstanceId::from_bytes(bytes)
200        };
201
202        info!(
203            schema_version = SCHEMA_VERSION,
204            dimension = config.embedding_dimension.size(),
205            "Database initialized"
206        );
207
208        Ok(Self {
209            db,
210            metadata,
211            path,
212            #[cfg(feature = "sync")]
213            instance_id,
214        })
215    }
216
217    /// Opens and validates an existing database.
218    #[instrument(skip(db, config), fields(path = %path.display()))]
219    fn open_existing(db: Database, path: PathBuf, config: &Config) -> Result<Self> {
220        info!("Opening existing database");
221
222        // Read metadata from the database
223        let read_txn = db.begin_read().map_err(StorageError::from)?;
224
225        let metadata = {
226            let meta_table = read_txn.open_table(METADATA_TABLE).map_err(|e| {
227                StorageError::corrupted(format!("Cannot open metadata table: {}", e))
228            })?;
229
230            let metadata_bytes = meta_table
231                .get(METADATA_KEY)
232                .map_err(StorageError::from)?
233                .ok_or_else(|| StorageError::corrupted("Missing database metadata"))?;
234
235            bincode::deserialize::<DatabaseMetadata>(metadata_bytes.value())
236                .map_err(|e| StorageError::corrupted(format!("Invalid metadata format: {}", e)))?
237        };
238
239        drop(read_txn);
240
241        // Validate schema version (allow migration from v1 → v2)
242        if metadata.schema_version != SCHEMA_VERSION && metadata.schema_version != 1 {
243            warn!(
244                expected = SCHEMA_VERSION,
245                found = metadata.schema_version,
246                "Schema version mismatch"
247            );
248            return Err(PulseDBError::Storage(StorageError::SchemaVersionMismatch {
249                expected: SCHEMA_VERSION,
250                found: metadata.schema_version,
251            }));
252        }
253        let needs_v2_migration = metadata.schema_version == 1;
254
255        // Validate embedding dimension
256        if metadata.embedding_dimension != config.embedding_dimension {
257            warn!(
258                expected = config.embedding_dimension.size(),
259                found = metadata.embedding_dimension.size(),
260                "Embedding dimension mismatch"
261            );
262            return Err(PulseDBError::Validation(
263                ValidationError::DimensionMismatch {
264                    expected: config.embedding_dimension.size(),
265                    got: metadata.embedding_dimension.size(),
266                },
267            ));
268        }
269
270        // Update last_opened_at timestamp and bump schema version if migrating
271        let mut metadata = metadata;
272        metadata.touch();
273        if needs_v2_migration {
274            metadata.schema_version = SCHEMA_VERSION;
275        }
276
277        let write_txn = db.begin_write().map_err(StorageError::from)?;
278        {
279            // Ensure watch_events table exists (migration for pre-E4-S02 databases)
280            let _ = write_txn.open_table(WATCH_EVENTS_TABLE)?;
281
282            // Migrate WAL records from v1 → v2 (add entity_type field)
283            if needs_v2_migration {
284                Self::migrate_wal_v1_to_v2(&write_txn)?;
285                info!("Migrated WAL records from schema v1 to v2");
286            }
287
288            let mut meta_table = write_txn.open_table(METADATA_TABLE)?;
289            let metadata_bytes = bincode::serialize(&metadata)
290                .map_err(|e| StorageError::serialization(e.to_string()))?;
291            meta_table.insert(METADATA_KEY, metadata_bytes.as_slice())?;
292
293            // Ensure sync tables and instance ID exist (migration for pre-sync databases)
294            #[cfg(feature = "sync")]
295            {
296                // Generate instance_id if missing (first open with sync feature)
297                if meta_table.get(INSTANCE_ID_KEY)?.is_none() {
298                    let instance_id = crate::sync::InstanceId::new();
299                    meta_table.insert(INSTANCE_ID_KEY, instance_id.as_bytes().as_slice())?;
300                    debug!("Generated new instance_id for existing database");
301                }
302                let _ = write_txn.open_table(SYNC_CURSORS_TABLE)?;
303            }
304        }
305        write_txn.commit().map_err(StorageError::from)?;
306
307        // Load instance ID for the struct (behind feature gate)
308        #[cfg(feature = "sync")]
309        let instance_id = {
310            let read_txn = db.begin_read().map_err(StorageError::from)?;
311            let meta_table = read_txn.open_table(METADATA_TABLE)?;
312            let entry = meta_table
313                .get(INSTANCE_ID_KEY)?
314                .ok_or_else(|| StorageError::corrupted("Missing instance_id"))?;
315            let bytes: [u8; 16] = entry
316                .value()
317                .try_into()
318                .map_err(|_| StorageError::corrupted("invalid instance_id bytes"))?;
319            crate::sync::InstanceId::from_bytes(bytes)
320        };
321
322        info!(
323            schema_version = metadata.schema_version,
324            dimension = metadata.embedding_dimension.size(),
325            "Database opened successfully"
326        );
327
328        Ok(Self {
329            db,
330            metadata,
331            path,
332            #[cfg(feature = "sync")]
333            instance_id,
334        })
335    }
336
337    /// Returns a reference to the underlying redb database.
338    ///
339    /// This is for internal use by other PulseDB modules.
340    #[inline]
341    #[allow(dead_code)] // Used by Collective CRUD (E1-S02) and Experience CRUD (E1-S03)
342    pub(crate) fn database(&self) -> &Database {
343        &self.db
344    }
345
346    /// Increments the WAL sequence and records a watch event within an existing write transaction.
347    ///
348    /// This is the core of cross-process change detection. By executing within the caller's
349    /// transaction, the sequence increment and event record are atomic with the data mutation:
350    /// if the transaction commits, both are durable; if it rolls back, neither is visible.
351    ///
352    /// # Arguments
353    ///
354    /// * `write_txn` - The caller's open write transaction
355    /// * `entity_id` - The entity that changed (16-byte UUID)
356    /// * `collective_id` - The collective it belongs to
357    /// * `entity_type` - What kind of entity changed (Experience, Relation, etc.)
358    /// * `event_type` - What kind of change occurred (Created, Updated, etc.)
359    /// * `timestamp` - When the change occurred
360    fn increment_wal_and_record(
361        &self,
362        write_txn: &::redb::WriteTransaction,
363        entity_id: &[u8; 16],
364        collective_id: CollectiveId,
365        entity_type: EntityTypeTag,
366        event_type: WatchEventTypeTag,
367        timestamp: Timestamp,
368    ) -> Result<u64> {
369        // Echo prevention: skip WAL recording when applying sync changes
370        #[cfg(feature = "sync")]
371        if crate::sync::guard::is_sync_applying() {
372            return Ok(0);
373        }
374
375        // Read current sequence (0 if key doesn't exist yet)
376        let mut meta_table = write_txn.open_table(METADATA_TABLE)?;
377        let current_seq = match meta_table.get(WAL_SEQUENCE_KEY)? {
378            Some(entry) => {
379                let bytes: [u8; 8] = entry
380                    .value()
381                    .try_into()
382                    .map_err(|_| StorageError::corrupted("invalid wal_sequence bytes"))?;
383                u64::from_be_bytes(bytes)
384            }
385            None => 0,
386        };
387        let new_seq = current_seq + 1;
388
389        // Write new sequence number
390        let seq_bytes = new_seq.to_be_bytes();
391        meta_table.insert(WAL_SEQUENCE_KEY, seq_bytes.as_slice())?;
392
393        // Record the event (schema v2 with entity_type)
394        let record = WatchEventRecord {
395            entity_id: *entity_id,
396            collective_id: *collective_id.as_bytes(),
397            event_type,
398            timestamp_ms: timestamp.as_millis(),
399            entity_type,
400        };
401        let record_bytes =
402            bincode::serialize(&record).map_err(|e| StorageError::serialization(e.to_string()))?;
403
404        let mut events_table = write_txn.open_table(WATCH_EVENTS_TABLE)?;
405        events_table.insert(&seq_bytes, record_bytes.as_slice())?;
406
407        Ok(new_seq)
408    }
409
410    /// Migrates WAL records from schema v1 to v2.
411    ///
412    /// V1 records have 4 fields: experience_id, collective_id, event_type, timestamp_ms.
413    /// V2 adds entity_type (defaults to Experience for existing records).
414    fn migrate_wal_v1_to_v2(write_txn: &::redb::WriteTransaction) -> Result<()> {
415        use super::schema::WatchEventRecordV1;
416
417        let events_table = write_txn.open_table(WATCH_EVENTS_TABLE)?;
418
419        // Collect all (key, v1_record) pairs
420        let mut entries: Vec<([u8; 8], WatchEventRecordV1)> = Vec::new();
421        for entry in events_table.iter()? {
422            let (key, value) = entry.map_err(StorageError::from)?;
423            let seq_bytes: [u8; 8] = *key.value();
424            let v1_record: WatchEventRecordV1 = bincode::deserialize(value.value())
425                .map_err(|e| StorageError::serialization(format!("v1 WAL record: {}", e)))?;
426            entries.push((seq_bytes, v1_record));
427        }
428        drop(events_table);
429
430        // Rewrite as v2 records
431        let mut events_table = write_txn.open_table(WATCH_EVENTS_TABLE)?;
432        for (seq_bytes, v1) in &entries {
433            let v2 = WatchEventRecord {
434                entity_id: v1.experience_id,
435                collective_id: v1.collective_id,
436                event_type: v1.event_type,
437                timestamp_ms: v1.timestamp_ms,
438                entity_type: EntityTypeTag::Experience,
439            };
440            let v2_bytes =
441                bincode::serialize(&v2).map_err(|e| StorageError::serialization(e.to_string()))?;
442            events_table.insert(seq_bytes, v2_bytes.as_slice())?;
443        }
444
445        debug!(count = entries.len(), "Migrated WAL records to v2");
446        Ok(())
447    }
448
449    /// Returns the embedding dimension configured for this database.
450    #[inline]
451    pub fn embedding_dimension(&self) -> EmbeddingDimension {
452        self.metadata.embedding_dimension
453    }
454}
455
456impl StorageEngine for RedbStorage {
457    // =========================================================================
458    // Lifecycle
459    // =========================================================================
460
461    fn metadata(&self) -> &DatabaseMetadata {
462        &self.metadata
463    }
464
465    #[instrument(skip(self))]
466    fn close(self: Box<Self>) -> Result<()> {
467        info!("Closing storage engine");
468
469        // redb flushes all data durably on drop. Since `Database::drop` is
470        // infallible, this method currently always returns Ok(()). The Result
471        // return type is retained for API forward-compatibility if a future
472        // storage backend can report flush errors.
473        drop(self.db);
474
475        info!("Storage engine closed");
476        Ok(())
477    }
478
479    fn path(&self) -> Option<&Path> {
480        Some(&self.path)
481    }
482
483    // =========================================================================
484    // Collective Storage Operations
485    // =========================================================================
486
487    fn save_collective(&self, collective: &Collective) -> Result<()> {
488        let bytes = bincode::serialize(collective)
489            .map_err(|e| StorageError::serialization(e.to_string()))?;
490
491        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
492        {
493            let mut table = write_txn.open_table(COLLECTIVES_TABLE)?;
494            table.insert(collective.id.as_bytes(), bytes.as_slice())?;
495        }
496        self.increment_wal_and_record(
497            &write_txn,
498            collective.id.as_bytes(),
499            collective.id,
500            EntityTypeTag::Collective,
501            WatchEventTypeTag::Created,
502            collective.created_at,
503        )?;
504        write_txn.commit().map_err(StorageError::from)?;
505
506        debug!(id = %collective.id, name = %collective.name, "Collective saved");
507        Ok(())
508    }
509
510    fn get_collective(&self, id: CollectiveId) -> Result<Option<Collective>> {
511        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
512        let table = read_txn.open_table(COLLECTIVES_TABLE)?;
513
514        match table.get(id.as_bytes())? {
515            Some(value) => {
516                let collective: Collective = bincode::deserialize(value.value())
517                    .map_err(|e| StorageError::serialization(e.to_string()))?;
518                Ok(Some(collective))
519            }
520            None => Ok(None),
521        }
522    }
523
524    fn list_collectives(&self) -> Result<Vec<Collective>> {
525        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
526        let table = read_txn.open_table(COLLECTIVES_TABLE)?;
527
528        let mut collectives = Vec::new();
529        for result in table.iter()? {
530            let (_, value) = result.map_err(StorageError::from)?;
531            let collective: Collective = bincode::deserialize(value.value())
532                .map_err(|e| StorageError::serialization(e.to_string()))?;
533            collectives.push(collective);
534        }
535
536        Ok(collectives)
537    }
538
539    fn delete_collective(&self, id: CollectiveId) -> Result<bool> {
540        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
541        let existed;
542        {
543            let mut table = write_txn.open_table(COLLECTIVES_TABLE)?;
544            existed = table.remove(id.as_bytes())?.is_some();
545        }
546        write_txn.commit().map_err(StorageError::from)?;
547
548        if existed {
549            debug!(id = %id, "Collective deleted");
550        }
551        Ok(existed)
552    }
553
554    // =========================================================================
555    // Experience Index Operations
556    // =========================================================================
557
558    fn count_experiences_in_collective(&self, id: CollectiveId) -> Result<u64> {
559        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
560        let table = read_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
561
562        let count = table.get(id.as_bytes())?.count() as u64;
563
564        Ok(count)
565    }
566
567    fn delete_experiences_by_collective(&self, id: CollectiveId) -> Result<u64> {
568        // Phase 1: Read — collect experience IDs and relation IDs to delete
569        let (exp_ids, relation_ids): (Vec<[u8; 16]>, Vec<[u8; 16]>) = {
570            let read_txn = self.db.begin_read().map_err(StorageError::from)?;
571            let table = read_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
572
573            let mut ids = Vec::new();
574            for result in table.get(id.as_bytes())? {
575                let value = result.map_err(StorageError::from)?;
576                let entry = value.value();
577                // Entry is [timestamp: 8 bytes][experience_id: 16 bytes]
578                let mut exp_id = [0u8; 16];
579                exp_id.copy_from_slice(&entry[8..24]);
580                ids.push(exp_id);
581            }
582
583            // Collect all relation IDs for these experiences (deduplicated)
584            let mut rel_ids = std::collections::HashSet::new();
585            let source_table = read_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
586            let target_table = read_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
587            for exp_id in &ids {
588                for result in source_table.get(exp_id)? {
589                    let value = result.map_err(StorageError::from)?;
590                    rel_ids.insert(*value.value());
591                }
592                for result in target_table.get(exp_id)? {
593                    let value = result.map_err(StorageError::from)?;
594                    rel_ids.insert(*value.value());
595                }
596            }
597
598            (ids, rel_ids.into_iter().collect())
599        };
600
601        let count = exp_ids.len() as u64;
602        if count == 0 {
603            return Ok(0);
604        }
605
606        // Phase 2: Write — delete from all tables in a single transaction
607        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
608        {
609            // Delete experience records
610            let mut exp_table = write_txn.open_table(EXPERIENCES_TABLE)?;
611            for exp_id in &exp_ids {
612                exp_table.remove(exp_id)?;
613            }
614        }
615        {
616            // Delete embedding vectors
617            let mut emb_table = write_txn.open_table(EMBEDDINGS_TABLE)?;
618            for exp_id in &exp_ids {
619                emb_table.remove(exp_id)?;
620            }
621        }
622        {
623            // Clear the by-collective index for this collective
624            let mut idx_table = write_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
625            idx_table.remove_all(id.as_bytes())?;
626        }
627        {
628            // Clear the by-type index for all type variants of this collective
629            let mut type_table = write_txn.open_multimap_table(EXPERIENCES_BY_TYPE_TABLE)?;
630            for tag in ExperienceTypeTag::all() {
631                let key = encode_type_index_key(id.as_bytes(), *tag);
632                type_table.remove_all(&key)?;
633            }
634        }
635        {
636            // Delete relations and their index entries
637            if !relation_ids.is_empty() {
638                let mut rel_table = write_txn.open_table(RELATIONS_TABLE)?;
639                let mut source_idx = write_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
640                let mut target_idx = write_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
641
642                // Clear relation indexes for all affected experiences
643                for exp_id in &exp_ids {
644                    source_idx.remove_all(exp_id)?;
645                    target_idx.remove_all(exp_id)?;
646                }
647                // Delete the relation records themselves
648                for rel_id in &relation_ids {
649                    rel_table.remove(rel_id)?;
650                }
651
652                debug!(
653                    count = relation_ids.len(),
654                    "Cascade-deleted relations for collective"
655                );
656            }
657        }
658        write_txn.commit().map_err(StorageError::from)?;
659
660        debug!(id = %id, count = count, "Cascade-deleted experiences for collective");
661        Ok(count)
662    }
663
664    fn list_experience_ids_in_collective(&self, id: CollectiveId) -> Result<Vec<ExperienceId>> {
665        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
666        let table = read_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
667
668        let mut ids = Vec::new();
669        for result in table.get(id.as_bytes())? {
670            let value = result.map_err(StorageError::from)?;
671            let entry = value.value();
672            // Entry is [timestamp: 8 bytes][experience_id: 16 bytes]
673            let mut exp_bytes = [0u8; 16];
674            exp_bytes.copy_from_slice(&entry[8..24]);
675            ids.push(ExperienceId::from_bytes(exp_bytes));
676        }
677
678        Ok(ids)
679    }
680
681    fn get_recent_experience_ids(
682        &self,
683        collective_id: CollectiveId,
684        limit: usize,
685    ) -> Result<Vec<(ExperienceId, Timestamp)>> {
686        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
687        let table = read_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
688
689        // Collect all (ExperienceId, Timestamp) pairs for this collective.
690        // Multimap values are sorted ascending by [timestamp_be][exp_id],
691        // so we collect all and then take from the end for newest-first.
692        let mut entries = Vec::new();
693        for result in table.get(collective_id.as_bytes())? {
694            let value = result.map_err(StorageError::from)?;
695            let entry = value.value();
696            // Entry layout: [timestamp_be: 8 bytes][experience_id: 16 bytes]
697            let mut ts_bytes = [0u8; 8];
698            ts_bytes.copy_from_slice(&entry[..8]);
699            let timestamp = Timestamp::from_millis(i64::from_be_bytes(ts_bytes));
700
701            let mut exp_bytes = [0u8; 16];
702            exp_bytes.copy_from_slice(&entry[8..24]);
703            entries.push((ExperienceId::from_bytes(exp_bytes), timestamp));
704        }
705
706        // Take the last `limit` entries (newest) and reverse to get descending order
707        let start = entries.len().saturating_sub(limit);
708        let mut recent = entries.split_off(start);
709        recent.reverse();
710
711        Ok(recent)
712    }
713
714    // =========================================================================
715    // Experience Storage Operations
716    // =========================================================================
717
718    fn save_experience(&self, experience: &Experience) -> Result<()> {
719        // Serialize experience (embedding is #[serde(skip)], excluded automatically)
720        let exp_bytes = bincode::serialize(experience)
721            .map_err(|e| StorageError::serialization(e.to_string()))?;
722
723        // Convert embedding to raw little-endian bytes
724        let emb_bytes = f32_slice_to_bytes(&experience.embedding);
725
726        // Build index keys
727        let type_key = encode_type_index_key(
728            experience.collective_id.as_bytes(),
729            experience.experience_type.type_tag(),
730        );
731
732        // Write to all 4 tables in a single atomic transaction
733        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
734        {
735            // Main experience record
736            let mut exp_table = write_txn.open_table(EXPERIENCES_TABLE)?;
737            exp_table.insert(experience.id.as_bytes(), exp_bytes.as_slice())?;
738        }
739        {
740            // Embedding vector (stored separately for compactness)
741            let mut emb_table = write_txn.open_table(EMBEDDINGS_TABLE)?;
742            emb_table.insert(experience.id.as_bytes(), emb_bytes.as_slice())?;
743        }
744        {
745            // By-collective index: key=collective_id, value=timestamp+experience_id
746            let mut idx_table = write_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
747            // Value is [timestamp_be: 8 bytes][experience_id: 16 bytes] = 24 bytes
748            let mut value = [0u8; 24];
749            value[..8].copy_from_slice(&experience.timestamp.to_be_bytes());
750            value[8..24].copy_from_slice(experience.id.as_bytes());
751            idx_table.insert(experience.collective_id.as_bytes(), &value)?;
752        }
753        {
754            // By-type index: key=collective_id+type_tag, value=experience_id
755            let mut type_table = write_txn.open_multimap_table(EXPERIENCES_BY_TYPE_TABLE)?;
756            type_table.insert(&type_key, experience.id.as_bytes())?;
757        }
758        // Record WAL event for cross-process change detection
759        self.increment_wal_and_record(
760            &write_txn,
761            experience.id.as_bytes(),
762            experience.collective_id,
763            EntityTypeTag::Experience,
764            WatchEventTypeTag::Created,
765            experience.timestamp,
766        )?;
767        write_txn.commit().map_err(StorageError::from)?;
768
769        debug!(
770            id = %experience.id,
771            collective_id = %experience.collective_id,
772            "Experience saved"
773        );
774        Ok(())
775    }
776
777    fn get_experience(&self, id: ExperienceId) -> Result<Option<Experience>> {
778        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
779
780        // Read main experience record
781        let exp_table = read_txn.open_table(EXPERIENCES_TABLE)?;
782        let exp_entry = match exp_table.get(id.as_bytes())? {
783            Some(v) => v,
784            None => return Ok(None),
785        };
786
787        let mut experience: Experience = bincode::deserialize(exp_entry.value())
788            .map_err(|e| StorageError::serialization(e.to_string()))?;
789
790        // Read embedding from separate table and reconstitute
791        let emb_table = read_txn.open_table(EMBEDDINGS_TABLE)?;
792        if let Some(emb_entry) = emb_table.get(id.as_bytes())? {
793            experience.embedding = bytes_to_f32_vec(emb_entry.value());
794        }
795
796        Ok(Some(experience))
797    }
798
799    fn update_experience(&self, id: ExperienceId, update: &ExperienceUpdate) -> Result<bool> {
800        // Read-modify-write: read the current record, apply updates, write back
801        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
802        let collective_id;
803        let timestamp;
804        let is_archive;
805        {
806            let mut exp_table = write_txn.open_table(EXPERIENCES_TABLE)?;
807
808            let entry = match exp_table.get(id.as_bytes())? {
809                Some(v) => v,
810                None => return Ok(false),
811            };
812
813            let mut experience: Experience = bincode::deserialize(entry.value())
814                .map_err(|e| StorageError::serialization(e.to_string()))?;
815
816            // Drop the borrow on entry before mutating the table
817            drop(entry);
818
819            // Capture metadata for WAL event before applying updates
820            collective_id = experience.collective_id;
821            timestamp = experience.timestamp;
822            is_archive = update.archived == Some(true);
823
824            // Apply updates (only Some fields)
825            if let Some(importance) = update.importance {
826                experience.importance = importance;
827            }
828            if let Some(confidence) = update.confidence {
829                experience.confidence = confidence;
830            }
831            if let Some(ref domain) = update.domain {
832                experience.domain = domain.clone();
833            }
834            if let Some(ref related_files) = update.related_files {
835                experience.related_files = related_files.clone();
836            }
837            if let Some(archived) = update.archived {
838                experience.archived = archived;
839            }
840
841            // Re-serialize and write back
842            let bytes = bincode::serialize(&experience)
843                .map_err(|e| StorageError::serialization(e.to_string()))?;
844            exp_table.insert(id.as_bytes(), bytes.as_slice())?;
845        }
846        // Record WAL event for cross-process change detection
847        let event_type = if is_archive {
848            WatchEventTypeTag::Archived
849        } else {
850            WatchEventTypeTag::Updated
851        };
852        self.increment_wal_and_record(
853            &write_txn,
854            id.as_bytes(),
855            collective_id,
856            EntityTypeTag::Experience,
857            event_type,
858            timestamp,
859        )?;
860        write_txn.commit().map_err(StorageError::from)?;
861
862        debug!(id = %id, "Experience updated");
863        Ok(true)
864    }
865
866    fn delete_experience(&self, id: ExperienceId) -> Result<bool> {
867        // First read the experience to get collective_id, timestamp, and type_tag
868        // (needed for cleaning up secondary indices and WAL event)
869        let (collective_id, timestamp, type_tag) = {
870            let read_txn = self.db.begin_read().map_err(StorageError::from)?;
871            let exp_table = read_txn.open_table(EXPERIENCES_TABLE)?;
872
873            match exp_table.get(id.as_bytes())? {
874                Some(entry) => {
875                    let exp: Experience = bincode::deserialize(entry.value())
876                        .map_err(|e| StorageError::serialization(e.to_string()))?;
877                    (
878                        exp.collective_id,
879                        exp.timestamp,
880                        exp.experience_type.type_tag(),
881                    )
882                }
883                None => return Ok(false),
884            }
885        };
886
887        // Delete from all 4 tables in a single transaction
888        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
889        {
890            let mut exp_table = write_txn.open_table(EXPERIENCES_TABLE)?;
891            exp_table.remove(id.as_bytes())?;
892        }
893        {
894            let mut emb_table = write_txn.open_table(EMBEDDINGS_TABLE)?;
895            emb_table.remove(id.as_bytes())?;
896        }
897        {
898            // Remove specific entry from by-collective multimap
899            let mut idx_table = write_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
900            let mut value = [0u8; 24];
901            value[..8].copy_from_slice(&timestamp.to_be_bytes());
902            value[8..24].copy_from_slice(id.as_bytes());
903            idx_table.remove(collective_id.as_bytes(), &value)?;
904        }
905        {
906            // Remove specific entry from by-type multimap
907            let mut type_table = write_txn.open_multimap_table(EXPERIENCES_BY_TYPE_TABLE)?;
908            let type_key = encode_type_index_key(collective_id.as_bytes(), type_tag);
909            type_table.remove(&type_key, id.as_bytes())?;
910        }
911        // Record WAL event for cross-process change detection
912        self.increment_wal_and_record(
913            &write_txn,
914            id.as_bytes(),
915            collective_id,
916            EntityTypeTag::Experience,
917            WatchEventTypeTag::Deleted,
918            timestamp,
919        )?;
920        write_txn.commit().map_err(StorageError::from)?;
921
922        debug!(id = %id, "Experience deleted");
923        Ok(true)
924    }
925
926    fn reinforce_experience(&self, id: ExperienceId) -> Result<Option<u32>> {
927        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
928        let (new_count, collective_id, timestamp) = {
929            let mut exp_table = write_txn.open_table(EXPERIENCES_TABLE)?;
930
931            let entry = match exp_table.get(id.as_bytes())? {
932                Some(v) => v,
933                None => return Ok(None),
934            };
935
936            let mut experience: Experience = bincode::deserialize(entry.value())
937                .map_err(|e| StorageError::serialization(e.to_string()))?;
938            drop(entry);
939
940            experience.applications = experience.applications.saturating_add(1);
941            let new_count = experience.applications;
942            let collective_id = experience.collective_id;
943            let timestamp = experience.timestamp;
944
945            let bytes = bincode::serialize(&experience)
946                .map_err(|e| StorageError::serialization(e.to_string()))?;
947            exp_table.insert(id.as_bytes(), bytes.as_slice())?;
948            (new_count, collective_id, timestamp)
949        };
950        // Record WAL event for cross-process change detection
951        self.increment_wal_and_record(
952            &write_txn,
953            id.as_bytes(),
954            collective_id,
955            EntityTypeTag::Experience,
956            WatchEventTypeTag::Updated,
957            timestamp,
958        )?;
959        write_txn.commit().map_err(StorageError::from)?;
960
961        debug!(id = %id, applications = new_count, "Experience reinforced");
962        Ok(Some(new_count))
963    }
964
965    fn save_embedding(&self, id: ExperienceId, embedding: &[f32]) -> Result<()> {
966        let bytes = f32_slice_to_bytes(embedding);
967
968        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
969        {
970            let mut table = write_txn.open_table(EMBEDDINGS_TABLE)?;
971            table.insert(id.as_bytes(), bytes.as_slice())?;
972        }
973        write_txn.commit().map_err(StorageError::from)?;
974
975        debug!(id = %id, dim = embedding.len(), "Embedding saved");
976        Ok(())
977    }
978
979    fn get_embedding(&self, id: ExperienceId) -> Result<Option<Vec<f32>>> {
980        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
981        let table = read_txn.open_table(EMBEDDINGS_TABLE)?;
982
983        match table.get(id.as_bytes())? {
984            Some(entry) => Ok(Some(bytes_to_f32_vec(entry.value()))),
985            None => Ok(None),
986        }
987    }
988
989    // =========================================================================
990    // Relation Storage Operations (E3-S01)
991    // =========================================================================
992
993    fn save_relation(&self, relation: &ExperienceRelation) -> Result<()> {
994        let bytes =
995            bincode::serialize(relation).map_err(|e| StorageError::serialization(e.to_string()))?;
996
997        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
998        {
999            let mut table = write_txn.open_table(RELATIONS_TABLE)?;
1000            table.insert(relation.id.as_bytes(), bytes.as_slice())?;
1001        }
1002        {
1003            let mut table = write_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
1004            table.insert(relation.source_id.as_bytes(), relation.id.as_bytes())?;
1005        }
1006        {
1007            let mut table = write_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
1008            table.insert(relation.target_id.as_bytes(), relation.id.as_bytes())?;
1009        }
1010        // Look up collective_id from source experience for WAL record
1011        let collective_id = {
1012            let exp_table = write_txn.open_table(EXPERIENCES_TABLE)?;
1013            let entry = exp_table
1014                .get(relation.source_id.as_bytes())?
1015                .ok_or_else(|| {
1016                    StorageError::corrupted("relation source experience not found for WAL record")
1017                })?;
1018            let exp: Experience = bincode::deserialize(entry.value())
1019                .map_err(|e| StorageError::serialization(e.to_string()))?;
1020            exp.collective_id
1021        };
1022        self.increment_wal_and_record(
1023            &write_txn,
1024            relation.id.as_bytes(),
1025            collective_id,
1026            EntityTypeTag::Relation,
1027            WatchEventTypeTag::Created,
1028            relation.created_at,
1029        )?;
1030        write_txn.commit().map_err(StorageError::from)?;
1031
1032        debug!(id = %relation.id, "Relation saved");
1033        Ok(())
1034    }
1035
1036    fn get_relation(&self, id: RelationId) -> Result<Option<ExperienceRelation>> {
1037        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1038        let table = read_txn.open_table(RELATIONS_TABLE)?;
1039
1040        match table.get(id.as_bytes())? {
1041            Some(value) => {
1042                let relation: ExperienceRelation = bincode::deserialize(value.value())
1043                    .map_err(|e| StorageError::serialization(e.to_string()))?;
1044                Ok(Some(relation))
1045            }
1046            None => Ok(None),
1047        }
1048    }
1049
1050    fn delete_relation(&self, id: RelationId) -> Result<bool> {
1051        // Read the relation first to get source/target IDs for index cleanup
1052        // and source experience's collective_id for WAL record
1053        let (source_id, target_id, collective_id) = {
1054            let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1055            let rel_table = read_txn.open_table(RELATIONS_TABLE)?;
1056
1057            match rel_table.get(id.as_bytes())? {
1058                Some(entry) => {
1059                    let rel: ExperienceRelation = bincode::deserialize(entry.value())
1060                        .map_err(|e| StorageError::serialization(e.to_string()))?;
1061                    // Look up collective_id from source experience
1062                    let exp_table = read_txn.open_table(EXPERIENCES_TABLE)?;
1063                    let cid = match exp_table.get(rel.source_id.as_bytes())? {
1064                        Some(exp_entry) => {
1065                            let exp: Experience = bincode::deserialize(exp_entry.value())
1066                                .map_err(|e| StorageError::serialization(e.to_string()))?;
1067                            exp.collective_id
1068                        }
1069                        // Source experience may have been deleted; use nil collective
1070                        None => CollectiveId::nil(),
1071                    };
1072                    (rel.source_id, rel.target_id, cid)
1073                }
1074                None => return Ok(false),
1075            }
1076        };
1077
1078        // Delete from all 3 tables atomically
1079        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1080        {
1081            let mut table = write_txn.open_table(RELATIONS_TABLE)?;
1082            table.remove(id.as_bytes())?;
1083        }
1084        {
1085            let mut table = write_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
1086            table.remove(source_id.as_bytes(), id.as_bytes())?;
1087        }
1088        {
1089            let mut table = write_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
1090            table.remove(target_id.as_bytes(), id.as_bytes())?;
1091        }
1092        self.increment_wal_and_record(
1093            &write_txn,
1094            id.as_bytes(),
1095            collective_id,
1096            EntityTypeTag::Relation,
1097            WatchEventTypeTag::Deleted,
1098            Timestamp::now(),
1099        )?;
1100        write_txn.commit().map_err(StorageError::from)?;
1101
1102        debug!(id = %id, "Relation deleted");
1103        Ok(true)
1104    }
1105
1106    fn get_relation_ids_by_source(&self, experience_id: ExperienceId) -> Result<Vec<RelationId>> {
1107        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1108        let table = read_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
1109
1110        let mut ids = Vec::new();
1111        for result in table.get(experience_id.as_bytes())? {
1112            let value = result.map_err(StorageError::from)?;
1113            let bytes = value.value();
1114            ids.push(RelationId::from_bytes(*bytes));
1115        }
1116
1117        Ok(ids)
1118    }
1119
1120    fn get_relation_ids_by_target(&self, experience_id: ExperienceId) -> Result<Vec<RelationId>> {
1121        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1122        let table = read_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
1123
1124        let mut ids = Vec::new();
1125        for result in table.get(experience_id.as_bytes())? {
1126            let value = result.map_err(StorageError::from)?;
1127            let bytes = value.value();
1128            ids.push(RelationId::from_bytes(*bytes));
1129        }
1130
1131        Ok(ids)
1132    }
1133
1134    fn delete_relations_for_experience(&self, experience_id: ExperienceId) -> Result<u64> {
1135        // Phase 1: Read — collect all relation IDs from both indexes
1136        let relation_ids: Vec<RelationId> = {
1137            let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1138            let source_table = read_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
1139            let target_table = read_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
1140
1141            let mut ids = std::collections::HashSet::new();
1142
1143            // Outgoing relations (this experience is source)
1144            for result in source_table.get(experience_id.as_bytes())? {
1145                let value = result.map_err(StorageError::from)?;
1146                ids.insert(RelationId::from_bytes(*value.value()));
1147            }
1148
1149            // Incoming relations (this experience is target)
1150            for result in target_table.get(experience_id.as_bytes())? {
1151                let value = result.map_err(StorageError::from)?;
1152                ids.insert(RelationId::from_bytes(*value.value()));
1153            }
1154
1155            ids.into_iter().collect()
1156        };
1157
1158        let count = relation_ids.len() as u64;
1159        if count == 0 {
1160            return Ok(0);
1161        }
1162
1163        // Phase 2: Read each relation to get source/target IDs for index cleanup
1164        let relations: Vec<ExperienceRelation> = {
1165            let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1166            let table = read_txn.open_table(RELATIONS_TABLE)?;
1167
1168            let mut rels = Vec::with_capacity(relation_ids.len());
1169            for rel_id in &relation_ids {
1170                if let Some(entry) = table.get(rel_id.as_bytes())? {
1171                    let rel: ExperienceRelation = bincode::deserialize(entry.value())
1172                        .map_err(|e| StorageError::serialization(e.to_string()))?;
1173                    rels.push(rel);
1174                }
1175            }
1176            rels
1177        };
1178
1179        // Phase 3: Write — delete from all 3 tables atomically
1180        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1181        {
1182            let mut rel_table = write_txn.open_table(RELATIONS_TABLE)?;
1183            for rel in &relations {
1184                rel_table.remove(rel.id.as_bytes())?;
1185            }
1186        }
1187        {
1188            let mut source_table = write_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
1189            for rel in &relations {
1190                source_table.remove(rel.source_id.as_bytes(), rel.id.as_bytes())?;
1191            }
1192        }
1193        {
1194            let mut target_table = write_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
1195            for rel in &relations {
1196                target_table.remove(rel.target_id.as_bytes(), rel.id.as_bytes())?;
1197            }
1198        }
1199        write_txn.commit().map_err(StorageError::from)?;
1200
1201        debug!(
1202            experience_id = %experience_id,
1203            count = count,
1204            "Cascade-deleted relations for experience"
1205        );
1206        Ok(count)
1207    }
1208
1209    fn relation_exists(
1210        &self,
1211        source_id: ExperienceId,
1212        target_id: ExperienceId,
1213        relation_type: RelationType,
1214    ) -> Result<bool> {
1215        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1216        let index_table = read_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
1217        let rel_table = read_txn.open_table(RELATIONS_TABLE)?;
1218
1219        // Scan all relations for this source and check each
1220        for result in index_table.get(source_id.as_bytes())? {
1221            let value = result.map_err(StorageError::from)?;
1222            let rel_id = RelationId::from_bytes(*value.value());
1223
1224            if let Some(entry) = rel_table.get(rel_id.as_bytes())? {
1225                let rel: ExperienceRelation = bincode::deserialize(entry.value())
1226                    .map_err(|e| StorageError::serialization(e.to_string()))?;
1227                if rel.target_id == target_id && rel.relation_type == relation_type {
1228                    return Ok(true);
1229                }
1230            }
1231        }
1232
1233        Ok(false)
1234    }
1235
1236    // =========================================================================
1237    // Insight Storage Operations (E3-S02)
1238    // =========================================================================
1239
1240    fn save_insight(&self, insight: &DerivedInsight) -> Result<()> {
1241        let bytes =
1242            bincode::serialize(insight).map_err(|e| StorageError::serialization(e.to_string()))?;
1243
1244        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1245        {
1246            let mut table = write_txn.open_table(INSIGHTS_TABLE)?;
1247            table.insert(insight.id.as_bytes(), bytes.as_slice())?;
1248        }
1249        {
1250            let mut table = write_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
1251            table.insert(insight.collective_id.as_bytes(), insight.id.as_bytes())?;
1252        }
1253        self.increment_wal_and_record(
1254            &write_txn,
1255            insight.id.as_bytes(),
1256            insight.collective_id,
1257            EntityTypeTag::Insight,
1258            WatchEventTypeTag::Created,
1259            insight.created_at,
1260        )?;
1261        write_txn.commit().map_err(StorageError::from)?;
1262
1263        debug!(id = %insight.id, collective_id = %insight.collective_id, "Insight saved");
1264        Ok(())
1265    }
1266
1267    fn get_insight(&self, id: InsightId) -> Result<Option<DerivedInsight>> {
1268        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1269        let table = read_txn.open_table(INSIGHTS_TABLE)?;
1270
1271        match table.get(id.as_bytes())? {
1272            Some(value) => {
1273                let insight: DerivedInsight = bincode::deserialize(value.value())
1274                    .map_err(|e| StorageError::serialization(e.to_string()))?;
1275                Ok(Some(insight))
1276            }
1277            None => Ok(None),
1278        }
1279    }
1280
1281    fn delete_insight(&self, id: InsightId) -> Result<bool> {
1282        // Read the insight first to get collective_id for index cleanup
1283        let collective_id = {
1284            let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1285            let table = read_txn.open_table(INSIGHTS_TABLE)?;
1286
1287            match table.get(id.as_bytes())? {
1288                Some(entry) => {
1289                    let insight: DerivedInsight = bincode::deserialize(entry.value())
1290                        .map_err(|e| StorageError::serialization(e.to_string()))?;
1291                    insight.collective_id
1292                }
1293                None => return Ok(false),
1294            }
1295        };
1296
1297        // Delete from both tables atomically
1298        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1299        {
1300            let mut table = write_txn.open_table(INSIGHTS_TABLE)?;
1301            table.remove(id.as_bytes())?;
1302        }
1303        {
1304            let mut table = write_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
1305            table.remove(collective_id.as_bytes(), id.as_bytes())?;
1306        }
1307        self.increment_wal_and_record(
1308            &write_txn,
1309            id.as_bytes(),
1310            collective_id,
1311            EntityTypeTag::Insight,
1312            WatchEventTypeTag::Deleted,
1313            Timestamp::now(),
1314        )?;
1315        write_txn.commit().map_err(StorageError::from)?;
1316
1317        debug!(id = %id, "Insight deleted");
1318        Ok(true)
1319    }
1320
1321    fn list_insight_ids_in_collective(&self, id: CollectiveId) -> Result<Vec<InsightId>> {
1322        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1323        let table = read_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
1324
1325        let mut ids = Vec::new();
1326        for result in table.get(id.as_bytes())? {
1327            let value = result.map_err(StorageError::from)?;
1328            ids.push(InsightId::from_bytes(*value.value()));
1329        }
1330
1331        Ok(ids)
1332    }
1333
1334    fn delete_insights_by_collective(&self, id: CollectiveId) -> Result<u64> {
1335        // Phase 1: Read — collect insight IDs
1336        let insight_ids: Vec<[u8; 16]> = {
1337            let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1338            let table = read_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
1339
1340            let mut ids = Vec::new();
1341            for result in table.get(id.as_bytes())? {
1342                let value = result.map_err(StorageError::from)?;
1343                ids.push(*value.value());
1344            }
1345            ids
1346        };
1347
1348        let count = insight_ids.len() as u64;
1349        if count == 0 {
1350            return Ok(0);
1351        }
1352
1353        // Phase 2: Write — delete from both tables atomically
1354        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1355        {
1356            let mut table = write_txn.open_table(INSIGHTS_TABLE)?;
1357            for insight_id in &insight_ids {
1358                table.remove(insight_id)?;
1359            }
1360        }
1361        {
1362            let mut table = write_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
1363            table.remove_all(id.as_bytes())?;
1364        }
1365        write_txn.commit().map_err(StorageError::from)?;
1366
1367        debug!(id = %id, count = count, "Cascade-deleted insights for collective");
1368        Ok(count)
1369    }
1370
1371    // =========================================================================
1372    // Activity Storage Operations (E3-S03)
1373    // =========================================================================
1374
1375    fn save_activity(&self, activity: &Activity) -> Result<()> {
1376        let key = encode_activity_key(activity.collective_id.as_bytes(), &activity.agent_id);
1377        let bytes =
1378            bincode::serialize(activity).map_err(|e| StorageError::serialization(e.to_string()))?;
1379
1380        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1381        {
1382            let mut table = write_txn.open_table(ACTIVITIES_TABLE)?;
1383            table.insert(key.as_slice(), bytes.as_slice())?;
1384        }
1385        write_txn.commit().map_err(StorageError::from)?;
1386
1387        debug!(
1388            agent_id = %activity.agent_id,
1389            collective_id = %activity.collective_id,
1390            "Activity saved"
1391        );
1392        Ok(())
1393    }
1394
1395    fn get_activity(
1396        &self,
1397        agent_id: &str,
1398        collective_id: CollectiveId,
1399    ) -> Result<Option<Activity>> {
1400        let key = encode_activity_key(collective_id.as_bytes(), agent_id);
1401
1402        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1403        let table = read_txn.open_table(ACTIVITIES_TABLE)?;
1404
1405        match table.get(key.as_slice())? {
1406            Some(value) => {
1407                let activity: Activity = bincode::deserialize(value.value())
1408                    .map_err(|e| StorageError::serialization(e.to_string()))?;
1409                Ok(Some(activity))
1410            }
1411            None => Ok(None),
1412        }
1413    }
1414
1415    fn delete_activity(&self, agent_id: &str, collective_id: CollectiveId) -> Result<bool> {
1416        let key = encode_activity_key(collective_id.as_bytes(), agent_id);
1417
1418        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1419        let existed = {
1420            let mut table = write_txn.open_table(ACTIVITIES_TABLE)?;
1421            let removed = table.remove(key.as_slice())?;
1422            removed.is_some()
1423        };
1424        write_txn.commit().map_err(StorageError::from)?;
1425
1426        if existed {
1427            debug!(agent_id = %agent_id, collective_id = %collective_id, "Activity deleted");
1428        }
1429        Ok(existed)
1430    }
1431
1432    fn list_activities_in_collective(&self, collective_id: CollectiveId) -> Result<Vec<Activity>> {
1433        let prefix = collective_id.as_bytes();
1434
1435        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1436        let table = read_txn.open_table(ACTIVITIES_TABLE)?;
1437
1438        let mut activities = Vec::new();
1439        for result in table.iter()? {
1440            let (key, value) = result.map_err(StorageError::from)?;
1441            let key_bytes = key.value();
1442
1443            // Check if this key belongs to the requested collective (16-byte prefix)
1444            if key_bytes.len() >= 16 && decode_collective_from_activity_key(key_bytes) == *prefix {
1445                let activity: Activity = bincode::deserialize(value.value())
1446                    .map_err(|e| StorageError::serialization(e.to_string()))?;
1447                activities.push(activity);
1448            }
1449        }
1450
1451        Ok(activities)
1452    }
1453
1454    fn delete_activities_by_collective(&self, collective_id: CollectiveId) -> Result<u64> {
1455        let prefix = collective_id.as_bytes();
1456
1457        // Phase 1: Read — collect matching keys
1458        let keys_to_delete: Vec<Vec<u8>> = {
1459            let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1460            let table = read_txn.open_table(ACTIVITIES_TABLE)?;
1461
1462            let mut keys = Vec::new();
1463            for result in table.iter()? {
1464                let (key, _) = result.map_err(StorageError::from)?;
1465                let key_bytes = key.value();
1466
1467                if key_bytes.len() >= 16
1468                    && decode_collective_from_activity_key(key_bytes) == *prefix
1469                {
1470                    keys.push(key_bytes.to_vec());
1471                }
1472            }
1473            keys
1474        };
1475
1476        let count = keys_to_delete.len() as u64;
1477        if count == 0 {
1478            return Ok(0);
1479        }
1480
1481        // Phase 2: Write — delete all collected keys
1482        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1483        {
1484            let mut table = write_txn.open_table(ACTIVITIES_TABLE)?;
1485            for key in &keys_to_delete {
1486                table.remove(key.as_slice())?;
1487            }
1488        }
1489        write_txn.commit().map_err(StorageError::from)?;
1490
1491        debug!(
1492            collective_id = %collective_id,
1493            count = count,
1494            "Cascade-deleted activities for collective"
1495        );
1496        Ok(count)
1497    }
1498
1499    // =========================================================================
1500    // Paginated List Operations (PulseVision)
1501    // =========================================================================
1502
1503    fn list_experience_ids_paginated(
1504        &self,
1505        collective_id: CollectiveId,
1506        limit: usize,
1507        offset: usize,
1508    ) -> Result<Vec<ExperienceId>> {
1509        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1510        let table = read_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
1511
1512        let mut ids = Vec::new();
1513        let mut skipped = 0usize;
1514
1515        // Multimap: key=collective_id (16 bytes), values=[timestamp_be:8][exp_id:16] (24 bytes)
1516        for result in table.get(collective_id.as_bytes())? {
1517            let value = result.map_err(StorageError::from)?;
1518            if skipped < offset {
1519                skipped += 1;
1520                continue;
1521            }
1522            let entry = value.value();
1523            let mut id_bytes = [0u8; 16];
1524            id_bytes.copy_from_slice(&entry[8..24]);
1525            ids.push(ExperienceId::from_bytes(id_bytes));
1526            if ids.len() >= limit {
1527                return Ok(ids);
1528            }
1529        }
1530
1531        Ok(ids)
1532    }
1533
1534    fn list_relations_in_collective(
1535        &self,
1536        collective_id: CollectiveId,
1537        limit: usize,
1538        offset: usize,
1539    ) -> Result<Vec<crate::relation::ExperienceRelation>> {
1540        // Get all experience IDs in this collective first
1541        let exp_ids = self.list_experience_ids_in_collective(collective_id)?;
1542
1543        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1544        let source_table = read_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
1545        let rel_table = read_txn.open_table(RELATIONS_TABLE)?;
1546
1547        let mut relations = Vec::new();
1548        let mut skipped = 0usize;
1549
1550        for exp_id in &exp_ids {
1551            for result in source_table.get(exp_id.as_bytes())? {
1552                let rel_id_value = result.map_err(StorageError::from)?;
1553                let rel_id = RelationId::from_bytes(*rel_id_value.value());
1554
1555                if skipped < offset {
1556                    skipped += 1;
1557                    continue;
1558                }
1559
1560                if let Some(entry) = rel_table.get(rel_id.as_bytes())? {
1561                    let relation: crate::relation::ExperienceRelation =
1562                        bincode::deserialize(entry.value())
1563                            .map_err(|e| StorageError::serialization(e.to_string()))?;
1564                    relations.push(relation);
1565                    if relations.len() >= limit {
1566                        return Ok(relations);
1567                    }
1568                }
1569            }
1570        }
1571
1572        Ok(relations)
1573    }
1574
1575    fn list_insight_ids_paginated(
1576        &self,
1577        collective_id: CollectiveId,
1578        limit: usize,
1579        offset: usize,
1580    ) -> Result<Vec<InsightId>> {
1581        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1582        let table = read_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
1583
1584        let mut ids = Vec::new();
1585        let mut skipped = 0usize;
1586
1587        for result in table.get(collective_id.as_bytes())? {
1588            let value = result.map_err(StorageError::from)?;
1589            if skipped < offset {
1590                skipped += 1;
1591                continue;
1592            }
1593            ids.push(InsightId::from_bytes(*value.value()));
1594            if ids.len() >= limit {
1595                return Ok(ids);
1596            }
1597        }
1598
1599        Ok(ids)
1600    }
1601
1602    // =========================================================================
1603    // Watch Event Operations (E4-S02)
1604    // =========================================================================
1605
1606    fn get_wal_sequence(&self) -> Result<u64> {
1607        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1608        let meta_table = read_txn.open_table(METADATA_TABLE)?;
1609        match meta_table.get(WAL_SEQUENCE_KEY)? {
1610            Some(entry) => {
1611                let bytes: [u8; 8] = entry
1612                    .value()
1613                    .try_into()
1614                    .map_err(|_| StorageError::corrupted("invalid wal_sequence bytes"))?;
1615                Ok(u64::from_be_bytes(bytes))
1616            }
1617            None => Ok(0),
1618        }
1619    }
1620
1621    fn poll_watch_events(
1622        &self,
1623        since_seq: u64,
1624        limit: usize,
1625    ) -> Result<(Vec<WatchEventRecord>, u64)> {
1626        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1627        let events_table = read_txn.open_table(WATCH_EVENTS_TABLE)?;
1628
1629        let start_key = (since_seq + 1).to_be_bytes();
1630        let end_key = u64::MAX.to_be_bytes();
1631        let mut events = Vec::new();
1632        let mut max_seq = since_seq;
1633
1634        for entry in events_table.range::<&[u8; 8]>(&start_key..=&end_key)? {
1635            let (key, value) = entry.map_err(StorageError::from)?;
1636            let seq = u64::from_be_bytes(*key.value());
1637            let record: WatchEventRecord = bincode::deserialize(value.value())
1638                .map_err(|e| StorageError::serialization(e.to_string()))?;
1639            events.push(record);
1640            max_seq = seq;
1641            if events.len() >= limit {
1642                break;
1643            }
1644        }
1645
1646        Ok((events, max_seq))
1647    }
1648
1649    // =========================================================================
1650    // Sync Operations (feature: sync)
1651    // =========================================================================
1652
1653    #[cfg(feature = "sync")]
1654    fn poll_sync_events(
1655        &self,
1656        since_seq: u64,
1657        limit: usize,
1658    ) -> Result<Vec<(u64, WatchEventRecord)>> {
1659        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1660        let events_table = read_txn.open_table(WATCH_EVENTS_TABLE)?;
1661
1662        let start_key = (since_seq + 1).to_be_bytes();
1663        let end_key = u64::MAX.to_be_bytes();
1664        let mut events = Vec::new();
1665
1666        for entry in events_table.range::<&[u8; 8]>(&start_key..=&end_key)? {
1667            let (key, value) = entry.map_err(StorageError::from)?;
1668            let seq = u64::from_be_bytes(*key.value());
1669            let record: WatchEventRecord = bincode::deserialize(value.value())
1670                .map_err(|e| StorageError::serialization(e.to_string()))?;
1671            events.push((seq, record));
1672            if events.len() >= limit {
1673                break;
1674            }
1675        }
1676
1677        Ok(events)
1678    }
1679
1680    #[cfg(feature = "sync")]
1681    fn instance_id(&self) -> crate::sync::InstanceId {
1682        self.instance_id
1683    }
1684
1685    #[cfg(feature = "sync")]
1686    fn save_sync_cursor(&self, cursor: &crate::sync::SyncCursor) -> Result<()> {
1687        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1688        {
1689            let mut table = write_txn.open_table(SYNC_CURSORS_TABLE)?;
1690            let bytes = bincode::serialize(cursor)
1691                .map_err(|e| StorageError::serialization(e.to_string()))?;
1692            table.insert(cursor.instance_id.as_bytes(), bytes.as_slice())?;
1693        }
1694        write_txn.commit().map_err(StorageError::from)?;
1695        debug!(
1696            peer = %cursor.instance_id,
1697            last_sequence = cursor.last_sequence,
1698            "Saved sync cursor"
1699        );
1700        Ok(())
1701    }
1702
1703    #[cfg(feature = "sync")]
1704    fn load_sync_cursor(
1705        &self,
1706        instance_id: &crate::sync::InstanceId,
1707    ) -> Result<Option<crate::sync::SyncCursor>> {
1708        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1709        let table = read_txn.open_table(SYNC_CURSORS_TABLE)?;
1710        match table.get(instance_id.as_bytes())? {
1711            Some(entry) => {
1712                let cursor: crate::sync::SyncCursor = bincode::deserialize(entry.value())
1713                    .map_err(|e| StorageError::serialization(e.to_string()))?;
1714                Ok(Some(cursor))
1715            }
1716            None => Ok(None),
1717        }
1718    }
1719
1720    #[cfg(feature = "sync")]
1721    fn list_sync_cursors(&self) -> Result<Vec<crate::sync::SyncCursor>> {
1722        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1723        let table = read_txn.open_table(SYNC_CURSORS_TABLE)?;
1724        let mut cursors = Vec::new();
1725        for entry in table.iter()? {
1726            let (_, value) = entry.map_err(StorageError::from)?;
1727            let cursor: crate::sync::SyncCursor = bincode::deserialize(value.value())
1728                .map_err(|e| StorageError::serialization(e.to_string()))?;
1729            cursors.push(cursor);
1730        }
1731        Ok(cursors)
1732    }
1733
1734    #[cfg(feature = "sync")]
1735    fn compact_wal_events(&self, up_to_seq: u64) -> Result<u64> {
1736        if up_to_seq == 0 {
1737            return Ok(0);
1738        }
1739
1740        // Collect keys to delete in a read pass
1741        let keys_to_delete: Vec<[u8; 8]> = {
1742            let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1743            let events_table = read_txn.open_table(WATCH_EVENTS_TABLE)?;
1744
1745            let start_key = 1u64.to_be_bytes();
1746            let end_key = up_to_seq.to_be_bytes();
1747            let mut keys = Vec::new();
1748
1749            for entry in events_table.range::<&[u8; 8]>(&start_key..=&end_key)? {
1750                let (key, _) = entry.map_err(StorageError::from)?;
1751                keys.push(*key.value());
1752            }
1753            keys
1754        };
1755
1756        if keys_to_delete.is_empty() {
1757            return Ok(0);
1758        }
1759
1760        let count = keys_to_delete.len() as u64;
1761
1762        // Delete in a write transaction
1763        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1764        {
1765            let mut events_table = write_txn.open_table(WATCH_EVENTS_TABLE)?;
1766            for key in &keys_to_delete {
1767                events_table.remove(key)?;
1768            }
1769        }
1770        write_txn.commit().map_err(StorageError::from)?;
1771
1772        debug!(count, up_to_seq, "Compacted WAL events");
1773        Ok(count)
1774    }
1775}
1776
1777// ============================================================================
1778// Embedding byte conversion helpers
1779// ============================================================================
1780
1781/// Converts a slice of f32 values to raw little-endian bytes.
1782#[inline]
1783fn f32_slice_to_bytes(data: &[f32]) -> Vec<u8> {
1784    let mut bytes = Vec::with_capacity(data.len() * 4);
1785    for &val in data {
1786        bytes.extend_from_slice(&val.to_le_bytes());
1787    }
1788    bytes
1789}
1790
1791/// Converts raw little-endian bytes back to a Vec<f32>.
1792#[inline]
1793fn bytes_to_f32_vec(data: &[u8]) -> Vec<f32> {
1794    data.chunks_exact(4)
1795        .map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
1796        .collect()
1797}
1798
1799// RedbStorage is auto Send + Sync: Database, DatabaseMetadata, and PathBuf
1800// are all Send + Sync.
1801
1802#[cfg(test)]
1803mod tests {
1804    use super::*;
1805    use tempfile::tempdir;
1806
1807    fn default_config() -> Config {
1808        Config::default()
1809    }
1810
1811    #[test]
1812    fn test_open_creates_new_database() {
1813        let dir = tempdir().unwrap();
1814        let path = dir.path().join("test.db");
1815
1816        assert!(!path.exists());
1817
1818        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1819
1820        assert!(path.exists());
1821        assert_eq!(storage.metadata().schema_version, SCHEMA_VERSION);
1822        assert_eq!(
1823            storage.metadata().embedding_dimension,
1824            EmbeddingDimension::D384
1825        );
1826
1827        Box::new(storage).close().unwrap();
1828    }
1829
1830    #[test]
1831    fn test_open_existing_database() {
1832        let dir = tempdir().unwrap();
1833        let path = dir.path().join("test.db");
1834
1835        // Create database
1836        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1837        let created_at = storage.metadata().created_at;
1838        Box::new(storage).close().unwrap();
1839
1840        // Reopen
1841        std::thread::sleep(std::time::Duration::from_millis(10));
1842        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1843
1844        // created_at should be preserved
1845        assert_eq!(storage.metadata().created_at, created_at);
1846        // last_opened_at should be updated
1847        assert!(storage.metadata().last_opened_at > created_at);
1848
1849        Box::new(storage).close().unwrap();
1850    }
1851
1852    #[test]
1853    fn test_dimension_mismatch_returns_error() {
1854        let dir = tempdir().unwrap();
1855        let path = dir.path().join("test.db");
1856
1857        // Create with D384
1858        let config_384 = Config {
1859            embedding_dimension: EmbeddingDimension::D384,
1860            ..Default::default()
1861        };
1862        let storage = RedbStorage::open(&path, &config_384).unwrap();
1863        Box::new(storage).close().unwrap();
1864
1865        // Try to reopen with D768
1866        let config_768 = Config {
1867            embedding_dimension: EmbeddingDimension::D768,
1868            ..Default::default()
1869        };
1870        let result = RedbStorage::open(&path, &config_768);
1871
1872        assert!(result.is_err());
1873        let err = result.unwrap_err();
1874        assert!(matches!(
1875            err,
1876            PulseDBError::Validation(ValidationError::DimensionMismatch { .. })
1877        ));
1878    }
1879
1880    #[test]
1881    fn test_database_files_created() {
1882        let dir = tempdir().unwrap();
1883        let path = dir.path().join("pulse.db");
1884
1885        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1886
1887        // Main database file should exist
1888        assert!(path.exists());
1889        assert!(storage.path().is_some());
1890        assert_eq!(storage.path().unwrap(), path);
1891
1892        Box::new(storage).close().unwrap();
1893    }
1894
1895    #[test]
1896    fn test_metadata_preserved_across_opens() {
1897        let dir = tempdir().unwrap();
1898        let path = dir.path().join("test.db");
1899
1900        let config = Config {
1901            embedding_dimension: EmbeddingDimension::Custom(512),
1902            ..Default::default()
1903        };
1904
1905        // Create
1906        let storage = RedbStorage::open(&path, &config).unwrap();
1907        assert_eq!(
1908            storage.metadata().embedding_dimension,
1909            EmbeddingDimension::Custom(512)
1910        );
1911        Box::new(storage).close().unwrap();
1912
1913        // Reopen
1914        let storage = RedbStorage::open(&path, &config).unwrap();
1915        assert_eq!(
1916            storage.metadata().embedding_dimension,
1917            EmbeddingDimension::Custom(512)
1918        );
1919        Box::new(storage).close().unwrap();
1920    }
1921
1922    #[test]
1923    fn test_embedding_dimension_accessor() {
1924        let dir = tempdir().unwrap();
1925        let path = dir.path().join("test.db");
1926
1927        let config = Config {
1928            embedding_dimension: EmbeddingDimension::D768,
1929            ..Default::default()
1930        };
1931
1932        let storage = RedbStorage::open(&path, &config).unwrap();
1933        assert_eq!(storage.embedding_dimension(), EmbeddingDimension::D768);
1934
1935        Box::new(storage).close().unwrap();
1936    }
1937
1938    #[test]
1939    fn test_all_six_tables_created() {
1940        let dir = tempdir().unwrap();
1941        let path = dir.path().join("test.db");
1942
1943        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1944
1945        // Verify all 6 tables exist by opening each in a read transaction.
1946        // If any table wasn't created during initialize_new(), this would
1947        // return a TableDoesNotExist error.
1948        let read_txn = storage.database().begin_read().unwrap();
1949
1950        read_txn.open_table(METADATA_TABLE).unwrap();
1951        read_txn.open_table(COLLECTIVES_TABLE).unwrap();
1952        read_txn.open_table(EXPERIENCES_TABLE).unwrap();
1953        read_txn.open_table(EMBEDDINGS_TABLE).unwrap();
1954        read_txn
1955            .open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)
1956            .unwrap();
1957        read_txn
1958            .open_multimap_table(EXPERIENCES_BY_TYPE_TABLE)
1959            .unwrap();
1960
1961        Box::new(storage).close().unwrap();
1962    }
1963
1964    // ====================================================================
1965    // Collective CRUD tests
1966    // ====================================================================
1967
1968    #[test]
1969    fn test_save_and_get_collective() {
1970        let dir = tempdir().unwrap();
1971        let path = dir.path().join("test.db");
1972        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1973
1974        let collective = Collective::new("test-project", 384);
1975        let id = collective.id;
1976
1977        storage.save_collective(&collective).unwrap();
1978
1979        let retrieved = storage.get_collective(id).unwrap().unwrap();
1980        assert_eq!(retrieved.id, id);
1981        assert_eq!(retrieved.name, "test-project");
1982        assert_eq!(retrieved.embedding_dimension, 384);
1983        assert!(retrieved.owner_id.is_none());
1984
1985        Box::new(storage).close().unwrap();
1986    }
1987
1988    #[test]
1989    fn test_get_nonexistent_collective_returns_none() {
1990        let dir = tempdir().unwrap();
1991        let path = dir.path().join("test.db");
1992        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1993
1994        let result = storage.get_collective(CollectiveId::new()).unwrap();
1995        assert!(result.is_none());
1996
1997        Box::new(storage).close().unwrap();
1998    }
1999
2000    #[test]
2001    fn test_save_collective_overwrites_existing() {
2002        let dir = tempdir().unwrap();
2003        let path = dir.path().join("test.db");
2004        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2005
2006        let mut collective = Collective::new("original-name", 384);
2007        let id = collective.id;
2008        storage.save_collective(&collective).unwrap();
2009
2010        // Overwrite with updated name
2011        collective.name = "updated-name".to_string();
2012        storage.save_collective(&collective).unwrap();
2013
2014        let retrieved = storage.get_collective(id).unwrap().unwrap();
2015        assert_eq!(retrieved.name, "updated-name");
2016
2017        Box::new(storage).close().unwrap();
2018    }
2019
2020    #[test]
2021    fn test_list_collectives_empty() {
2022        let dir = tempdir().unwrap();
2023        let path = dir.path().join("test.db");
2024        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2025
2026        let collectives = storage.list_collectives().unwrap();
2027        assert!(collectives.is_empty());
2028
2029        Box::new(storage).close().unwrap();
2030    }
2031
2032    #[test]
2033    fn test_list_collectives_returns_all() {
2034        let dir = tempdir().unwrap();
2035        let path = dir.path().join("test.db");
2036        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2037
2038        let c1 = Collective::new("project-alpha", 384);
2039        let c2 = Collective::new("project-beta", 384);
2040        let c3 = Collective::new("project-gamma", 384);
2041
2042        storage.save_collective(&c1).unwrap();
2043        storage.save_collective(&c2).unwrap();
2044        storage.save_collective(&c3).unwrap();
2045
2046        let collectives = storage.list_collectives().unwrap();
2047        assert_eq!(collectives.len(), 3);
2048
2049        // Verify all IDs are present
2050        let ids: Vec<CollectiveId> = collectives.iter().map(|c| c.id).collect();
2051        assert!(ids.contains(&c1.id));
2052        assert!(ids.contains(&c2.id));
2053        assert!(ids.contains(&c3.id));
2054
2055        Box::new(storage).close().unwrap();
2056    }
2057
2058    #[test]
2059    fn test_delete_collective_existing() {
2060        let dir = tempdir().unwrap();
2061        let path = dir.path().join("test.db");
2062        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2063
2064        let collective = Collective::new("to-delete", 384);
2065        let id = collective.id;
2066        storage.save_collective(&collective).unwrap();
2067
2068        // Delete it
2069        let deleted = storage.delete_collective(id).unwrap();
2070        assert!(deleted);
2071
2072        // Verify it's gone
2073        assert!(storage.get_collective(id).unwrap().is_none());
2074
2075        Box::new(storage).close().unwrap();
2076    }
2077
2078    #[test]
2079    fn test_delete_collective_nonexistent() {
2080        let dir = tempdir().unwrap();
2081        let path = dir.path().join("test.db");
2082        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2083
2084        let deleted = storage.delete_collective(CollectiveId::new()).unwrap();
2085        assert!(!deleted);
2086
2087        Box::new(storage).close().unwrap();
2088    }
2089
2090    // ====================================================================
2091    // ACID Guarantee Tests
2092    // ====================================================================
2093
2094    #[test]
2095    fn test_uncommitted_transaction_is_invisible() {
2096        // ATOMICITY: If we don't commit a write transaction, the data
2097        // must not be visible to subsequent reads.
2098        let dir = tempdir().unwrap();
2099        let path = dir.path().join("test.db");
2100        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2101
2102        let collective = Collective::new("phantom", 384);
2103        let id = collective.id;
2104        let bytes = bincode::serialize(&collective).unwrap();
2105
2106        // Open a write transaction, insert data, but DON'T commit -- just drop
2107        {
2108            let write_txn = storage.database().begin_write().unwrap();
2109            {
2110                let mut table = write_txn.open_table(COLLECTIVES_TABLE).unwrap();
2111                table.insert(id.as_bytes(), bytes.as_slice()).unwrap();
2112            }
2113            // write_txn is dropped here without commit() -- rolled back
2114        }
2115
2116        // The collective should NOT be visible
2117        let result = storage.get_collective(id).unwrap();
2118        assert!(result.is_none(), "Uncommitted data must not be visible");
2119
2120        Box::new(storage).close().unwrap();
2121    }
2122
2123    #[test]
2124    fn test_committed_transaction_is_visible() {
2125        // DURABILITY (within session): committed data must be immediately
2126        // visible to subsequent reads.
2127        let dir = tempdir().unwrap();
2128        let path = dir.path().join("test.db");
2129        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2130
2131        let collective = Collective::new("committed", 384);
2132        let id = collective.id;
2133
2134        storage.save_collective(&collective).unwrap();
2135
2136        let result = storage.get_collective(id).unwrap();
2137        assert!(result.is_some(), "Committed data must be visible");
2138
2139        Box::new(storage).close().unwrap();
2140    }
2141
2142    #[test]
2143    fn test_multi_table_atomicity() {
2144        // ATOMICITY: A single transaction writing to multiple tables
2145        // is all-or-nothing. Here we write to both COLLECTIVES and METADATA
2146        // in one transaction and verify both are visible after commit.
2147        let dir = tempdir().unwrap();
2148        let path = dir.path().join("test.db");
2149        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2150
2151        let collective = Collective::new("multi-table", 384);
2152        let id = collective.id;
2153        let collective_bytes = bincode::serialize(&collective).unwrap();
2154
2155        // Write to TWO tables in a single transaction
2156        let write_txn = storage.database().begin_write().unwrap();
2157        {
2158            let mut coll_table = write_txn.open_table(COLLECTIVES_TABLE).unwrap();
2159            coll_table
2160                .insert(id.as_bytes(), collective_bytes.as_slice())
2161                .unwrap();
2162        }
2163        {
2164            let mut meta_table = write_txn.open_table(METADATA_TABLE).unwrap();
2165            meta_table
2166                .insert("test_marker", b"multi_table_test".as_slice())
2167                .unwrap();
2168        }
2169        write_txn.commit().unwrap();
2170
2171        // Verify BOTH writes are visible
2172        let coll = storage.get_collective(id).unwrap();
2173        assert!(coll.is_some(), "Collective from multi-table txn must exist");
2174
2175        let read_txn = storage.database().begin_read().unwrap();
2176        let meta_table = read_txn.open_table(METADATA_TABLE).unwrap();
2177        let marker = meta_table.get("test_marker").unwrap();
2178        assert!(marker.is_some(), "Metadata from multi-table txn must exist");
2179
2180        Box::new(storage).close().unwrap();
2181    }
2182
2183    #[test]
2184    fn test_mvcc_read_consistency() {
2185        // ISOLATION (MVCC): A single read transaction sees a consistent
2186        // snapshot reflecting all committed writes up to the moment the
2187        // read was opened, and none of the uncommitted or subsequent ones.
2188        //
2189        // We write across multiple separate transactions, then verify a
2190        // read sees the expected consistent state. Combined with
2191        // test_uncommitted_transaction_is_invisible (atomicity), this
2192        // covers the key ACID isolation properties.
2193        //
2194        // Note: redb 2.6.3 has a page allocation constraint that prevents
2195        // holding a read transaction open while a write commits on the
2196        // same Database handle. redb guarantees MVCC isolation internally
2197        // via shadow paging; this test verifies our usage is correct.
2198        let dir = tempdir().unwrap();
2199        let path = dir.path().join("test.db");
2200        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2201
2202        // Write 3 collectives across separate transactions
2203        let c1 = Collective::new("alpha", 384);
2204        let c2 = Collective::new("beta", 384);
2205        let c3 = Collective::new("gamma", 384);
2206
2207        storage.save_collective(&c1).unwrap();
2208        storage.save_collective(&c2).unwrap();
2209        storage.save_collective(&c3).unwrap();
2210
2211        // Delete c2 (another transaction)
2212        storage.delete_collective(c2.id).unwrap();
2213
2214        // A read transaction must see the consistent state:
2215        // c1 and c3 present, c2 absent
2216        let read_txn = storage.database().begin_read().unwrap();
2217        let table = read_txn.open_table(COLLECTIVES_TABLE).unwrap();
2218
2219        assert!(
2220            table.get(c1.id.as_bytes()).unwrap().is_some(),
2221            "c1 must be visible (committed)"
2222        );
2223        assert!(
2224            table.get(c2.id.as_bytes()).unwrap().is_none(),
2225            "c2 must be absent (deleted)"
2226        );
2227        assert!(
2228            table.get(c3.id.as_bytes()).unwrap().is_some(),
2229            "c3 must be visible (committed)"
2230        );
2231
2232        // Count should be exactly 2
2233        let count = table.iter().unwrap().count();
2234        // +1 for the metadata entry? No -- COLLECTIVES_TABLE is separate.
2235        assert_eq!(count, 2, "Exactly 2 collectives should exist");
2236
2237        drop(table);
2238        drop(read_txn);
2239
2240        Box::new(storage).close().unwrap();
2241    }
2242
2243    // ====================================================================
2244    // Corruption Detection Tests
2245    // ====================================================================
2246
2247    #[test]
2248    fn test_corruption_detection_invalid_metadata_bytes() {
2249        // Opening a database whose metadata contains garbage bytes
2250        // must return a Corrupted error, not a panic or deserialization UB.
2251        let dir = tempdir().unwrap();
2252        let path = dir.path().join("corrupt.db");
2253
2254        // Create a valid database, then corrupt the metadata
2255        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2256        let write_txn = storage.database().begin_write().unwrap();
2257        {
2258            let mut meta = write_txn.open_table(METADATA_TABLE).unwrap();
2259            meta.insert(METADATA_KEY, b"not-valid-bincode-data".as_slice())
2260                .unwrap();
2261        }
2262        write_txn.commit().unwrap();
2263        Box::new(storage).close().unwrap();
2264
2265        // Reopen must detect the corruption
2266        let result = RedbStorage::open(&path, &default_config());
2267        assert!(result.is_err(), "Corrupted metadata must be rejected");
2268        let err = result.unwrap_err();
2269        match err {
2270            PulseDBError::Storage(StorageError::Corrupted(msg)) => {
2271                assert!(
2272                    msg.contains("Invalid metadata format"),
2273                    "Error should mention invalid format, got: {}",
2274                    msg
2275                );
2276            }
2277            other => panic!("Expected StorageError::Corrupted, got: {:?}", other),
2278        }
2279    }
2280
2281    #[test]
2282    fn test_corruption_detection_missing_metadata_key() {
2283        // If the metadata table exists but the "db_metadata" key is absent,
2284        // open_existing must return a Corrupted error.
2285        let dir = tempdir().unwrap();
2286        let path = dir.path().join("no_key.db");
2287
2288        // Create a valid database, then delete the metadata key
2289        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2290        let write_txn = storage.database().begin_write().unwrap();
2291        {
2292            let mut meta = write_txn.open_table(METADATA_TABLE).unwrap();
2293            meta.remove(METADATA_KEY).unwrap();
2294        }
2295        write_txn.commit().unwrap();
2296        Box::new(storage).close().unwrap();
2297
2298        // Reopen must detect the missing key
2299        let result = RedbStorage::open(&path, &default_config());
2300        assert!(result.is_err(), "Missing metadata key must be rejected");
2301        let err = result.unwrap_err();
2302        match err {
2303            PulseDBError::Storage(StorageError::Corrupted(msg)) => {
2304                assert!(
2305                    msg.contains("Missing database metadata"),
2306                    "Error should mention missing metadata, got: {}",
2307                    msg
2308                );
2309            }
2310            other => panic!("Expected StorageError::Corrupted, got: {:?}", other),
2311        }
2312    }
2313
2314    #[test]
2315    fn test_corruption_detection_missing_metadata_table() {
2316        // If the metadata table doesn't exist at all, open_existing must
2317        // return a Corrupted error. We simulate this by creating a raw
2318        // redb database without our schema tables.
2319        let dir = tempdir().unwrap();
2320        let path = dir.path().join("no_table.db");
2321
2322        // Create a raw redb database with a dummy table (not our schema)
2323        {
2324            let db = ::redb::Database::create(&path).unwrap();
2325            let write_txn = db.begin_write().unwrap();
2326            {
2327                let dummy: ::redb::TableDefinition<&str, &str> =
2328                    ::redb::TableDefinition::new("dummy");
2329                let mut table = write_txn.open_table(dummy).unwrap();
2330                table.insert("key", "value").unwrap();
2331            }
2332            write_txn.commit().unwrap();
2333        }
2334
2335        // Opening this as a PulseDB must detect the missing metadata table
2336        let result = RedbStorage::open(&path, &default_config());
2337        assert!(result.is_err(), "Missing metadata table must be rejected");
2338        let err = result.unwrap_err();
2339        match err {
2340            PulseDBError::Storage(StorageError::Corrupted(msg)) => {
2341                assert!(
2342                    msg.contains("Cannot open metadata table"),
2343                    "Error should mention metadata table, got: {}",
2344                    msg
2345                );
2346            }
2347            other => panic!("Expected StorageError::Corrupted, got: {:?}", other),
2348        }
2349    }
2350
2351    // ====================================================================
2352    // Experience CRUD tests
2353    // ====================================================================
2354
2355    use crate::experience::{Experience, ExperienceType, ExperienceUpdate, Severity};
2356    use crate::types::{AgentId, ExperienceId, Timestamp};
2357
2358    /// Creates a test experience with a given collective_id and embedding dimension.
2359    fn test_experience(collective_id: CollectiveId, dim: usize) -> Experience {
2360        Experience {
2361            id: ExperienceId::new(),
2362            collective_id,
2363            content: "Test experience content".into(),
2364            embedding: vec![0.42; dim],
2365            experience_type: ExperienceType::Fact {
2366                statement: "redb uses shadow paging".into(),
2367                source: "docs".into(),
2368            },
2369            importance: 0.8,
2370            confidence: 0.7,
2371            applications: 0,
2372            domain: vec!["rust".into(), "databases".into()],
2373            related_files: vec!["src/storage/redb.rs".into()],
2374            source_agent: AgentId::new("test-agent"),
2375            source_task: None,
2376            timestamp: Timestamp::now(),
2377            archived: false,
2378        }
2379    }
2380
2381    #[test]
2382    fn test_save_and_get_experience() {
2383        let dir = tempdir().unwrap();
2384        let path = dir.path().join("test.db");
2385        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2386
2387        let collective = Collective::new("test", 384);
2388        storage.save_collective(&collective).unwrap();
2389
2390        let exp = test_experience(collective.id, 384);
2391        let exp_id = exp.id;
2392
2393        storage.save_experience(&exp).unwrap();
2394
2395        let retrieved = storage.get_experience(exp_id).unwrap().unwrap();
2396        assert_eq!(retrieved.id, exp_id);
2397        assert_eq!(retrieved.collective_id, collective.id);
2398        assert_eq!(retrieved.content, "Test experience content");
2399        assert_eq!(retrieved.importance, 0.8);
2400        assert_eq!(retrieved.confidence, 0.7);
2401        assert_eq!(retrieved.applications, 0);
2402        assert_eq!(retrieved.domain, vec!["rust", "databases"]);
2403        assert!(!retrieved.archived);
2404        // Embedding should be reconstituted from EMBEDDINGS_TABLE
2405        assert_eq!(retrieved.embedding.len(), 384);
2406        assert_eq!(retrieved.embedding[0], 0.42);
2407
2408        Box::new(storage).close().unwrap();
2409    }
2410
2411    #[test]
2412    fn test_get_nonexistent_experience_returns_none() {
2413        let dir = tempdir().unwrap();
2414        let path = dir.path().join("test.db");
2415        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2416
2417        let result = storage.get_experience(ExperienceId::new()).unwrap();
2418        assert!(result.is_none());
2419
2420        Box::new(storage).close().unwrap();
2421    }
2422
2423    #[test]
2424    fn test_update_experience_fields() {
2425        let dir = tempdir().unwrap();
2426        let path = dir.path().join("test.db");
2427        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2428
2429        let collective = Collective::new("test", 384);
2430        storage.save_collective(&collective).unwrap();
2431
2432        let exp = test_experience(collective.id, 384);
2433        let exp_id = exp.id;
2434        storage.save_experience(&exp).unwrap();
2435
2436        // Update importance and domain
2437        let update = ExperienceUpdate {
2438            importance: Some(0.95),
2439            domain: Some(vec!["updated-tag".into()]),
2440            ..Default::default()
2441        };
2442        let updated = storage.update_experience(exp_id, &update).unwrap();
2443        assert!(updated);
2444
2445        let retrieved = storage.get_experience(exp_id).unwrap().unwrap();
2446        assert_eq!(retrieved.importance, 0.95);
2447        assert_eq!(retrieved.domain, vec!["updated-tag"]);
2448        // Unchanged fields
2449        assert_eq!(retrieved.confidence, 0.7);
2450
2451        Box::new(storage).close().unwrap();
2452    }
2453
2454    #[test]
2455    fn test_update_nonexistent_experience_returns_false() {
2456        let dir = tempdir().unwrap();
2457        let path = dir.path().join("test.db");
2458        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2459
2460        let update = ExperienceUpdate {
2461            importance: Some(0.5),
2462            ..Default::default()
2463        };
2464        let result = storage
2465            .update_experience(ExperienceId::new(), &update)
2466            .unwrap();
2467        assert!(!result);
2468
2469        Box::new(storage).close().unwrap();
2470    }
2471
2472    #[test]
2473    fn test_delete_experience() {
2474        let dir = tempdir().unwrap();
2475        let path = dir.path().join("test.db");
2476        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2477
2478        let collective = Collective::new("test", 384);
2479        storage.save_collective(&collective).unwrap();
2480
2481        let exp = test_experience(collective.id, 384);
2482        let exp_id = exp.id;
2483        storage.save_experience(&exp).unwrap();
2484
2485        // Verify exists
2486        assert!(storage.get_experience(exp_id).unwrap().is_some());
2487
2488        // Delete
2489        let deleted = storage.delete_experience(exp_id).unwrap();
2490        assert!(deleted);
2491
2492        // Verify gone
2493        assert!(storage.get_experience(exp_id).unwrap().is_none());
2494        assert!(storage.get_embedding(exp_id).unwrap().is_none());
2495
2496        // Verify index cleaned up
2497        assert_eq!(
2498            storage
2499                .count_experiences_in_collective(collective.id)
2500                .unwrap(),
2501            0
2502        );
2503
2504        Box::new(storage).close().unwrap();
2505    }
2506
2507    #[test]
2508    fn test_delete_nonexistent_experience_returns_false() {
2509        let dir = tempdir().unwrap();
2510        let path = dir.path().join("test.db");
2511        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2512
2513        let result = storage.delete_experience(ExperienceId::new()).unwrap();
2514        assert!(!result);
2515
2516        Box::new(storage).close().unwrap();
2517    }
2518
2519    #[test]
2520    fn test_save_and_get_embedding() {
2521        let dir = tempdir().unwrap();
2522        let path = dir.path().join("test.db");
2523        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2524
2525        let id = ExperienceId::new();
2526        let embedding = vec![0.1, 0.2, 0.3, -0.5, 1.0, f32::MIN_POSITIVE];
2527
2528        storage.save_embedding(id, &embedding).unwrap();
2529
2530        let retrieved = storage.get_embedding(id).unwrap().unwrap();
2531        assert_eq!(retrieved, embedding);
2532
2533        Box::new(storage).close().unwrap();
2534    }
2535
2536    #[test]
2537    fn test_experience_by_collective_index() {
2538        let dir = tempdir().unwrap();
2539        let path = dir.path().join("test.db");
2540        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2541
2542        let collective = Collective::new("test", 384);
2543        storage.save_collective(&collective).unwrap();
2544
2545        // Add 3 experiences
2546        for _ in 0..3 {
2547            let exp = test_experience(collective.id, 384);
2548            storage.save_experience(&exp).unwrap();
2549        }
2550
2551        // Count should be 3
2552        assert_eq!(
2553            storage
2554                .count_experiences_in_collective(collective.id)
2555                .unwrap(),
2556            3
2557        );
2558
2559        Box::new(storage).close().unwrap();
2560    }
2561
2562    #[test]
2563    fn test_cascade_delete_includes_experiences() {
2564        let dir = tempdir().unwrap();
2565        let path = dir.path().join("test.db");
2566        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2567
2568        let collective = Collective::new("test", 384);
2569        storage.save_collective(&collective).unwrap();
2570
2571        let exp1 = test_experience(collective.id, 384);
2572        let exp2 = test_experience(collective.id, 384);
2573        let id1 = exp1.id;
2574        let id2 = exp2.id;
2575        storage.save_experience(&exp1).unwrap();
2576        storage.save_experience(&exp2).unwrap();
2577
2578        // Cascade delete
2579        let count = storage
2580            .delete_experiences_by_collective(collective.id)
2581            .unwrap();
2582        assert_eq!(count, 2);
2583
2584        // Verify experiences are gone
2585        assert!(storage.get_experience(id1).unwrap().is_none());
2586        assert!(storage.get_experience(id2).unwrap().is_none());
2587        assert!(storage.get_embedding(id1).unwrap().is_none());
2588        assert!(storage.get_embedding(id2).unwrap().is_none());
2589
2590        Box::new(storage).close().unwrap();
2591    }
2592
2593    #[test]
2594    fn test_update_experience_archived_flag() {
2595        let dir = tempdir().unwrap();
2596        let path = dir.path().join("test.db");
2597        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2598
2599        let collective = Collective::new("test", 384);
2600        storage.save_collective(&collective).unwrap();
2601
2602        let exp = test_experience(collective.id, 384);
2603        let exp_id = exp.id;
2604        storage.save_experience(&exp).unwrap();
2605
2606        // Archive
2607        let update = ExperienceUpdate {
2608            archived: Some(true),
2609            ..Default::default()
2610        };
2611        storage.update_experience(exp_id, &update).unwrap();
2612
2613        let retrieved = storage.get_experience(exp_id).unwrap().unwrap();
2614        assert!(retrieved.archived);
2615
2616        // Unarchive
2617        let update = ExperienceUpdate {
2618            archived: Some(false),
2619            ..Default::default()
2620        };
2621        storage.update_experience(exp_id, &update).unwrap();
2622
2623        let retrieved = storage.get_experience(exp_id).unwrap().unwrap();
2624        assert!(!retrieved.archived);
2625
2626        Box::new(storage).close().unwrap();
2627    }
2628
2629    #[test]
2630    fn test_f32_byte_conversion_roundtrip() {
2631        let original = vec![0.0, 1.0, -1.0, f32::MAX, f32::MIN, std::f32::consts::PI];
2632        let bytes = f32_slice_to_bytes(&original);
2633        assert_eq!(bytes.len(), original.len() * 4);
2634
2635        let restored = bytes_to_f32_vec(&bytes);
2636        assert_eq!(original, restored);
2637    }
2638
2639    #[test]
2640    fn test_experience_with_all_type_variants() {
2641        let dir = tempdir().unwrap();
2642        let path = dir.path().join("test.db");
2643        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2644
2645        let collective = Collective::new("test", 384);
2646        storage.save_collective(&collective).unwrap();
2647
2648        // Save one experience per type variant
2649        let types = vec![
2650            ExperienceType::Difficulty {
2651                description: "test".into(),
2652                severity: Severity::High,
2653            },
2654            ExperienceType::Solution {
2655                problem_ref: None,
2656                approach: "test".into(),
2657                worked: true,
2658            },
2659            ExperienceType::ErrorPattern {
2660                signature: "E0308".into(),
2661                fix: "check types".into(),
2662                prevention: "use clippy".into(),
2663            },
2664            ExperienceType::SuccessPattern {
2665                task_type: "refactor".into(),
2666                approach: "extract method".into(),
2667                quality: 0.9,
2668            },
2669            ExperienceType::UserPreference {
2670                category: "style".into(),
2671                preference: "snake_case".into(),
2672                strength: 1.0,
2673            },
2674            ExperienceType::ArchitecturalDecision {
2675                decision: "use redb".into(),
2676                rationale: "pure Rust".into(),
2677            },
2678            ExperienceType::TechInsight {
2679                technology: "tokio".into(),
2680                insight: "spawn_blocking".into(),
2681            },
2682            ExperienceType::Fact {
2683                statement: "Rust is safe".into(),
2684                source: "docs".into(),
2685            },
2686            ExperienceType::Generic { category: None },
2687        ];
2688
2689        for experience_type in types {
2690            let mut exp = test_experience(collective.id, 384);
2691            exp.experience_type = experience_type;
2692            storage.save_experience(&exp).unwrap();
2693
2694            // Verify roundtrip
2695            let retrieved = storage.get_experience(exp.id).unwrap().unwrap();
2696            assert_eq!(
2697                retrieved.experience_type.type_tag(),
2698                exp.experience_type.type_tag()
2699            );
2700        }
2701
2702        assert_eq!(
2703            storage
2704                .count_experiences_in_collective(collective.id)
2705                .unwrap(),
2706            9
2707        );
2708
2709        Box::new(storage).close().unwrap();
2710    }
2711
2712    #[test]
2713    fn test_reinforce_experience_atomic() {
2714        let dir = tempdir().unwrap();
2715        let path = dir.path().join("test.db");
2716        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2717
2718        let collective = Collective::new("test", 384);
2719        storage.save_collective(&collective).unwrap();
2720
2721        let exp = test_experience(collective.id, 384);
2722        let exp_id = exp.id;
2723        storage.save_experience(&exp).unwrap();
2724
2725        // Reinforce 3 times
2726        assert_eq!(storage.reinforce_experience(exp_id).unwrap(), Some(1));
2727        assert_eq!(storage.reinforce_experience(exp_id).unwrap(), Some(2));
2728        assert_eq!(storage.reinforce_experience(exp_id).unwrap(), Some(3));
2729
2730        // Verify the stored value
2731        let retrieved = storage.get_experience(exp_id).unwrap().unwrap();
2732        assert_eq!(retrieved.applications, 3);
2733
2734        // Verify embedding was NOT re-written (still intact)
2735        let emb = storage.get_embedding(exp_id).unwrap().unwrap();
2736        assert_eq!(emb.len(), 384);
2737
2738        Box::new(storage).close().unwrap();
2739    }
2740
2741    #[test]
2742    fn test_reinforce_experience_nonexistent() {
2743        let dir = tempdir().unwrap();
2744        let path = dir.path().join("test.db");
2745        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2746
2747        let result = storage.reinforce_experience(ExperienceId::new()).unwrap();
2748        assert!(result.is_none());
2749
2750        Box::new(storage).close().unwrap();
2751    }
2752
2753    // ====================================================================
2754    // WAL Sequence Tracking Tests (E4-S02)
2755    // ====================================================================
2756
2757    #[test]
2758    fn test_wal_sequence_starts_at_zero() {
2759        let dir = tempdir().unwrap();
2760        let path = dir.path().join("test.db");
2761        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2762
2763        assert_eq!(storage.get_wal_sequence().unwrap(), 0);
2764
2765        Box::new(storage).close().unwrap();
2766    }
2767
2768    #[test]
2769    fn test_save_experience_increments_wal_sequence() {
2770        let dir = tempdir().unwrap();
2771        let path = dir.path().join("test.db");
2772        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2773
2774        let collective = Collective::new("test", 384);
2775        storage.save_collective(&collective).unwrap();
2776        // save_collective now records WAL event #1
2777        assert_eq!(storage.get_wal_sequence().unwrap(), 1);
2778
2779        let exp1 = test_experience(collective.id, 384);
2780        storage.save_experience(&exp1).unwrap();
2781        assert_eq!(storage.get_wal_sequence().unwrap(), 2);
2782
2783        let exp2 = test_experience(collective.id, 384);
2784        storage.save_experience(&exp2).unwrap();
2785        assert_eq!(storage.get_wal_sequence().unwrap(), 3);
2786
2787        Box::new(storage).close().unwrap();
2788    }
2789
2790    #[test]
2791    fn test_poll_watch_events_returns_correct_events() {
2792        let dir = tempdir().unwrap();
2793        let path = dir.path().join("test.db");
2794        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2795
2796        let collective = Collective::new("test", 384);
2797        storage.save_collective(&collective).unwrap();
2798        // Collective creates WAL event #1
2799
2800        let exp1 = test_experience(collective.id, 384);
2801        let exp2 = test_experience(collective.id, 384);
2802        let exp3 = test_experience(collective.id, 384);
2803        storage.save_experience(&exp1).unwrap();
2804        storage.save_experience(&exp2).unwrap();
2805        storage.save_experience(&exp3).unwrap();
2806
2807        // Poll all events (collective + 3 experiences = 4 total)
2808        let (events, max_seq) = storage.poll_watch_events(0, 100).unwrap();
2809        assert_eq!(events.len(), 4);
2810        assert_eq!(max_seq, 4);
2811        // First event is collective creation, rest are experience creations
2812        assert!(events
2813            .iter()
2814            .all(|e| e.event_type == WatchEventTypeTag::Created));
2815        // Skip collective event (index 0), experience IDs should match
2816        assert_eq!(events[1].entity_id, *exp1.id.as_bytes());
2817        assert_eq!(events[2].entity_id, *exp2.id.as_bytes());
2818        assert_eq!(events[3].entity_id, *exp3.id.as_bytes());
2819
2820        Box::new(storage).close().unwrap();
2821    }
2822
2823    #[test]
2824    fn test_poll_watch_events_since_midpoint() {
2825        let dir = tempdir().unwrap();
2826        let path = dir.path().join("test.db");
2827        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2828
2829        let collective = Collective::new("test", 384);
2830        storage.save_collective(&collective).unwrap();
2831
2832        // Create 5 experiences
2833        for _ in 0..5 {
2834            let exp = test_experience(collective.id, 384);
2835            storage.save_experience(&exp).unwrap();
2836        }
2837
2838        // Collective = seq 1, 5 experiences = seq 2-6. Total = 6.
2839        // Poll from seq 4 — should get 2 events (seq 5 and 6)
2840        let (events, max_seq) = storage.poll_watch_events(4, 100).unwrap();
2841        assert_eq!(events.len(), 2);
2842        assert_eq!(max_seq, 6);
2843
2844        Box::new(storage).close().unwrap();
2845    }
2846
2847    #[test]
2848    fn test_poll_watch_events_empty_when_caught_up() {
2849        let dir = tempdir().unwrap();
2850        let path = dir.path().join("test.db");
2851        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2852
2853        let collective = Collective::new("test", 384);
2854        storage.save_collective(&collective).unwrap();
2855
2856        let exp = test_experience(collective.id, 384);
2857        storage.save_experience(&exp).unwrap();
2858
2859        // Poll everything (collective + experience = 2 events)
2860        let (events, max_seq) = storage.poll_watch_events(0, 100).unwrap();
2861        assert_eq!(events.len(), 2);
2862        assert_eq!(max_seq, 2);
2863
2864        // Poll again from same position — empty
2865        let (events, max_seq) = storage.poll_watch_events(2, 100).unwrap();
2866        assert_eq!(events.len(), 0);
2867        assert_eq!(max_seq, 2); // stays the same
2868
2869        Box::new(storage).close().unwrap();
2870    }
2871
2872    #[test]
2873    fn test_delete_records_watch_event() {
2874        let dir = tempdir().unwrap();
2875        let path = dir.path().join("test.db");
2876        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2877
2878        let collective = Collective::new("test", 384);
2879        storage.save_collective(&collective).unwrap();
2880
2881        let exp = test_experience(collective.id, 384);
2882        storage.save_experience(&exp).unwrap();
2883        storage.delete_experience(exp.id).unwrap();
2884
2885        // Collective(1) + Created(2) + Deleted(3) = 3 events
2886        let (events, max_seq) = storage.poll_watch_events(0, 100).unwrap();
2887        assert_eq!(events.len(), 3);
2888        assert_eq!(max_seq, 3);
2889        assert_eq!(events[0].event_type, WatchEventTypeTag::Created); // collective
2890        assert_eq!(events[1].event_type, WatchEventTypeTag::Created); // experience
2891        assert_eq!(events[2].event_type, WatchEventTypeTag::Deleted); // experience deleted
2892        assert_eq!(events[2].entity_id, *exp.id.as_bytes());
2893
2894        Box::new(storage).close().unwrap();
2895    }
2896
2897    #[test]
2898    fn test_update_records_watch_event() {
2899        let dir = tempdir().unwrap();
2900        let path = dir.path().join("test.db");
2901        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2902
2903        let collective = Collective::new("test", 384);
2904        storage.save_collective(&collective).unwrap();
2905
2906        let exp = test_experience(collective.id, 384);
2907        storage.save_experience(&exp).unwrap();
2908
2909        let update = ExperienceUpdate {
2910            importance: Some(0.99),
2911            ..Default::default()
2912        };
2913        storage.update_experience(exp.id, &update).unwrap();
2914
2915        // Collective(1) + Created(2) + Updated(3) = 3 events
2916        let (events, _) = storage.poll_watch_events(0, 100).unwrap();
2917        assert_eq!(events.len(), 3);
2918        assert_eq!(events[2].event_type, WatchEventTypeTag::Updated);
2919
2920        Box::new(storage).close().unwrap();
2921    }
2922
2923    #[test]
2924    fn test_reinforce_records_watch_event() {
2925        let dir = tempdir().unwrap();
2926        let path = dir.path().join("test.db");
2927        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2928
2929        let collective = Collective::new("test", 384);
2930        storage.save_collective(&collective).unwrap();
2931
2932        let exp = test_experience(collective.id, 384);
2933        storage.save_experience(&exp).unwrap();
2934        storage.reinforce_experience(exp.id).unwrap();
2935
2936        // Collective(1) + Created(2) + Updated(3) = 3 events
2937        let (events, _) = storage.poll_watch_events(0, 100).unwrap();
2938        assert_eq!(events.len(), 3);
2939        assert_eq!(events[1].event_type, WatchEventTypeTag::Created);
2940        assert_eq!(events[2].event_type, WatchEventTypeTag::Updated);
2941
2942        Box::new(storage).close().unwrap();
2943    }
2944
2945    #[test]
2946    fn test_archive_records_archived_event() {
2947        let dir = tempdir().unwrap();
2948        let path = dir.path().join("test.db");
2949        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2950
2951        let collective = Collective::new("test", 384);
2952        storage.save_collective(&collective).unwrap();
2953
2954        let exp = test_experience(collective.id, 384);
2955        storage.save_experience(&exp).unwrap();
2956
2957        let update = ExperienceUpdate {
2958            archived: Some(true),
2959            ..Default::default()
2960        };
2961        storage.update_experience(exp.id, &update).unwrap();
2962
2963        // Collective(1) + Created(2) + Archived(3) = 3 events
2964        let (events, _) = storage.poll_watch_events(0, 100).unwrap();
2965        assert_eq!(events.len(), 3);
2966        assert_eq!(events[2].event_type, WatchEventTypeTag::Archived);
2967
2968        Box::new(storage).close().unwrap();
2969    }
2970
2971    #[test]
2972    fn test_poll_watch_events_batch_limit() {
2973        let dir = tempdir().unwrap();
2974        let path = dir.path().join("test.db");
2975        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2976
2977        let collective = Collective::new("test", 384);
2978        storage.save_collective(&collective).unwrap();
2979
2980        // Create 10 experiences
2981        for _ in 0..10 {
2982            let exp = test_experience(collective.id, 384);
2983            storage.save_experience(&exp).unwrap();
2984        }
2985
2986        // Poll with limit of 3
2987        let (events, max_seq) = storage.poll_watch_events(0, 3).unwrap();
2988        assert_eq!(events.len(), 3);
2989        assert_eq!(max_seq, 3);
2990
2991        // Continue from where we left off
2992        let (events, max_seq) = storage.poll_watch_events(3, 3).unwrap();
2993        assert_eq!(events.len(), 3);
2994        assert_eq!(max_seq, 6);
2995
2996        Box::new(storage).close().unwrap();
2997    }
2998}