Skip to main content

pulsedb/storage/
redb.rs

1//! redb storage engine implementation.
2//!
3//! This module provides the primary storage backend for PulseDB using
4//! [redb](https://docs.rs/redb), a pure Rust embedded key-value store.
5//!
6//! # Features
7//!
8//! - ACID transactions with MVCC
9//! - Single-writer, multiple-reader concurrency
10//! - Automatic crash recovery
11//! - Zero external dependencies (pure Rust)
12//!
13//! # File Layout
14//!
15//! When you open a database at `./pulse.db`, redb creates:
16//! - `./pulse.db` - Main database file
17//! - `./pulse.db.lock` - Lock file for writer coordination (may not be visible)
18
19use std::path::{Path, PathBuf};
20
21use ::redb::{Database, ReadableTable};
22use tracing::{debug, info, instrument, warn};
23
24use crate::activity::Activity;
25use crate::collective::Collective;
26use crate::experience::{Experience, ExperienceUpdate};
27use crate::insight::DerivedInsight;
28use crate::relation::{ExperienceRelation, RelationType};
29use crate::types::{CollectiveId, ExperienceId, InsightId, RelationId, Timestamp};
30
31use super::schema::{
32    decode_collective_from_activity_key, encode_activity_key, encode_type_index_key,
33    DatabaseMetadata, EntityTypeTag, ExperienceTypeTag, WatchEventRecord, WatchEventTypeTag,
34    ACTIVITIES_TABLE, COLLECTIVES_TABLE, EMBEDDINGS_TABLE, EXPERIENCES_BY_COLLECTIVE_TABLE,
35    EXPERIENCES_BY_TYPE_TABLE, EXPERIENCES_TABLE, INSIGHTS_BY_COLLECTIVE_TABLE, INSIGHTS_TABLE,
36    METADATA_TABLE, RELATIONS_BY_SOURCE_TABLE, RELATIONS_BY_TARGET_TABLE, RELATIONS_TABLE,
37    SCHEMA_VERSION, WAL_SEQUENCE_KEY, WATCH_EVENTS_TABLE,
38};
39#[cfg(feature = "sync")]
40use super::schema::{INSTANCE_ID_KEY, SYNC_CURSORS_TABLE};
41use super::StorageEngine;
42use crate::config::{Config, EmbeddingDimension};
43use crate::error::{PulseDBError, Result, StorageError, ValidationError};
44
45/// Metadata key in the metadata table.
46const METADATA_KEY: &str = "db_metadata";
47
48/// redb storage engine wrapper.
49///
50/// This struct holds the redb database handle and cached metadata.
51/// It implements [`StorageEngine`] for use with PulseDB.
52///
53/// # Thread Safety
54///
55/// `RedbStorage` is `Send + Sync`. redb handles internal synchronization
56/// using MVCC for readers and exclusive locking for writers.
57#[derive(Debug)]
58pub struct RedbStorage {
59    /// The redb database handle.
60    db: Database,
61
62    /// Cached database metadata.
63    metadata: DatabaseMetadata,
64
65    /// Path to the database file.
66    path: PathBuf,
67
68    /// Persistent instance ID for sync protocol (only with `sync` feature).
69    #[cfg(feature = "sync")]
70    instance_id: crate::sync::InstanceId,
71}
72
73impl RedbStorage {
74    /// Opens or creates a database at the given path.
75    ///
76    /// If the database doesn't exist, it will be created and initialized
77    /// with the configuration settings. If it exists, the configuration
78    /// will be validated against the stored metadata.
79    ///
80    /// # Arguments
81    ///
82    /// * `path` - Path to the database file
83    /// * `config` - Database configuration
84    ///
85    /// # Errors
86    ///
87    /// Returns an error if:
88    /// - The database file is corrupted
89    /// - The database is locked by another process
90    /// - Schema version doesn't match
91    /// - Embedding dimension doesn't match (for existing databases)
92    ///
93    /// # Example
94    ///
95    /// ```rust
96    /// # fn main() -> pulsedb::Result<()> {
97    /// # let dir = tempfile::tempdir().unwrap();
98    /// use pulsedb::{Config, storage::RedbStorage};
99    ///
100    /// let storage = RedbStorage::open(dir.path().join("test.db"), &Config::default())?;
101    /// # Ok(())
102    /// # }
103    /// ```
104    #[instrument(skip(config), fields(path = %path.as_ref().display()))]
105    pub fn open(path: impl AsRef<Path>, config: &Config) -> Result<Self> {
106        let path = path.as_ref();
107        let db_exists = path.exists();
108
109        debug!(db_exists = db_exists, "Opening storage engine");
110
111        // Create or open the database
112        let db = Self::create_database(path, config)?;
113
114        if db_exists {
115            // Validate existing database
116            Self::open_existing(db, path.to_path_buf(), config)
117        } else {
118            // Initialize new database
119            Self::initialize_new(db, path.to_path_buf(), config)
120        }
121    }
122
123    /// Creates the redb database with appropriate settings.
124    fn create_database(path: &Path, _config: &Config) -> Result<Database> {
125        let builder = Database::builder();
126
127        // Note: redb 2.x doesn't have set_cache_size, it manages memory internally
128        // The cache_size_mb config will be used for future optimizations
129
130        // Note: redb doesn't expose a typed error variant for lock conflicts,
131        // so we detect them via error message string matching. This may need
132        // updating if redb changes its error messages in a future version.
133        let db = builder.create(path).map_err(|e| {
134            if e.to_string().contains("locked") {
135                StorageError::DatabaseLocked
136            } else {
137                StorageError::Redb(e.to_string())
138            }
139        })?;
140
141        debug!("Database file opened successfully");
142        Ok(db)
143    }
144
145    /// Initializes a new database with tables and metadata.
146    #[instrument(skip(db, config), fields(path = %path.display()))]
147    fn initialize_new(db: Database, path: PathBuf, config: &Config) -> Result<Self> {
148        info!("Initializing new database");
149
150        let metadata = DatabaseMetadata::new(config.embedding_dimension);
151
152        // Create all tables and write metadata in a single transaction
153        let write_txn = db.begin_write().map_err(StorageError::from)?;
154
155        {
156            // Create the metadata table and write metadata
157            let mut meta_table = write_txn.open_table(METADATA_TABLE)?;
158            let metadata_bytes = bincode::serialize(&metadata)
159                .map_err(|e| StorageError::serialization(e.to_string()))?;
160            meta_table.insert(METADATA_KEY, metadata_bytes.as_slice())?;
161
162            // Create other tables (they're created on first access)
163            let _ = write_txn.open_table(COLLECTIVES_TABLE)?;
164            let _ = write_txn.open_table(EXPERIENCES_TABLE)?;
165            let _ = write_txn.open_table(EMBEDDINGS_TABLE)?;
166            let _ = write_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
167            let _ = write_txn.open_multimap_table(EXPERIENCES_BY_TYPE_TABLE)?;
168            let _ = write_txn.open_table(RELATIONS_TABLE)?;
169            let _ = write_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
170            let _ = write_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
171            let _ = write_txn.open_table(INSIGHTS_TABLE)?;
172            let _ = write_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
173            let _ = write_txn.open_table(ACTIVITIES_TABLE)?;
174            let _ = write_txn.open_table(WATCH_EVENTS_TABLE)?;
175
176            // Sync tables and instance ID (behind feature gate)
177            #[cfg(feature = "sync")]
178            {
179                let instance_id = crate::sync::InstanceId::new();
180                meta_table.insert(INSTANCE_ID_KEY, instance_id.as_bytes().as_slice())?;
181                let _ = write_txn.open_table(SYNC_CURSORS_TABLE)?;
182            }
183        }
184
185        write_txn.commit().map_err(StorageError::from)?;
186
187        // Load instance ID for the struct (behind feature gate)
188        #[cfg(feature = "sync")]
189        let instance_id = {
190            let read_txn = db.begin_read().map_err(StorageError::from)?;
191            let meta_table = read_txn.open_table(METADATA_TABLE)?;
192            let entry = meta_table
193                .get(INSTANCE_ID_KEY)?
194                .ok_or_else(|| StorageError::corrupted("Missing instance_id after init"))?;
195            let bytes: [u8; 16] = entry
196                .value()
197                .try_into()
198                .map_err(|_| StorageError::corrupted("invalid instance_id bytes"))?;
199            crate::sync::InstanceId::from_bytes(bytes)
200        };
201
202        info!(
203            schema_version = SCHEMA_VERSION,
204            dimension = config.embedding_dimension.size(),
205            "Database initialized"
206        );
207
208        Ok(Self {
209            db,
210            metadata,
211            path,
212            #[cfg(feature = "sync")]
213            instance_id,
214        })
215    }
216
217    /// Opens and validates an existing database.
218    #[instrument(skip(db, config), fields(path = %path.display()))]
219    fn open_existing(db: Database, path: PathBuf, config: &Config) -> Result<Self> {
220        info!("Opening existing database");
221
222        // Read metadata from the database
223        let read_txn = db.begin_read().map_err(StorageError::from)?;
224
225        let metadata = {
226            let meta_table = read_txn.open_table(METADATA_TABLE).map_err(|e| {
227                StorageError::corrupted(format!("Cannot open metadata table: {}", e))
228            })?;
229
230            let metadata_bytes = meta_table
231                .get(METADATA_KEY)
232                .map_err(StorageError::from)?
233                .ok_or_else(|| StorageError::corrupted("Missing database metadata"))?;
234
235            bincode::deserialize::<DatabaseMetadata>(metadata_bytes.value())
236                .map_err(|e| StorageError::corrupted(format!("Invalid metadata format: {}", e)))?
237        };
238
239        drop(read_txn);
240
241        // Validate schema version (allow migration from v1 → v2)
242        if metadata.schema_version != SCHEMA_VERSION && metadata.schema_version != 1 {
243            warn!(
244                expected = SCHEMA_VERSION,
245                found = metadata.schema_version,
246                "Schema version mismatch"
247            );
248            return Err(PulseDBError::Storage(StorageError::SchemaVersionMismatch {
249                expected: SCHEMA_VERSION,
250                found: metadata.schema_version,
251            }));
252        }
253        let needs_v2_migration = metadata.schema_version == 1;
254
255        // Validate embedding dimension
256        if metadata.embedding_dimension != config.embedding_dimension {
257            warn!(
258                expected = config.embedding_dimension.size(),
259                found = metadata.embedding_dimension.size(),
260                "Embedding dimension mismatch"
261            );
262            return Err(PulseDBError::Validation(
263                ValidationError::DimensionMismatch {
264                    expected: config.embedding_dimension.size(),
265                    got: metadata.embedding_dimension.size(),
266                },
267            ));
268        }
269
270        // Update last_opened_at timestamp and bump schema version if migrating
271        let mut metadata = metadata;
272        metadata.touch();
273        if needs_v2_migration {
274            metadata.schema_version = SCHEMA_VERSION;
275        }
276
277        let write_txn = db.begin_write().map_err(StorageError::from)?;
278        {
279            // Ensure watch_events table exists (migration for pre-E4-S02 databases)
280            let _ = write_txn.open_table(WATCH_EVENTS_TABLE)?;
281
282            // Migrate WAL records from v1 → v2 (add entity_type field)
283            if needs_v2_migration {
284                Self::migrate_wal_v1_to_v2(&write_txn)?;
285                info!("Migrated WAL records from schema v1 to v2");
286            }
287
288            let mut meta_table = write_txn.open_table(METADATA_TABLE)?;
289            let metadata_bytes = bincode::serialize(&metadata)
290                .map_err(|e| StorageError::serialization(e.to_string()))?;
291            meta_table.insert(METADATA_KEY, metadata_bytes.as_slice())?;
292
293            // Ensure sync tables and instance ID exist (migration for pre-sync databases)
294            #[cfg(feature = "sync")]
295            {
296                // Generate instance_id if missing (first open with sync feature)
297                if meta_table.get(INSTANCE_ID_KEY)?.is_none() {
298                    let instance_id = crate::sync::InstanceId::new();
299                    meta_table.insert(INSTANCE_ID_KEY, instance_id.as_bytes().as_slice())?;
300                    debug!("Generated new instance_id for existing database");
301                }
302                let _ = write_txn.open_table(SYNC_CURSORS_TABLE)?;
303            }
304        }
305        write_txn.commit().map_err(StorageError::from)?;
306
307        // Load instance ID for the struct (behind feature gate)
308        #[cfg(feature = "sync")]
309        let instance_id = {
310            let read_txn = db.begin_read().map_err(StorageError::from)?;
311            let meta_table = read_txn.open_table(METADATA_TABLE)?;
312            let entry = meta_table
313                .get(INSTANCE_ID_KEY)?
314                .ok_or_else(|| StorageError::corrupted("Missing instance_id"))?;
315            let bytes: [u8; 16] = entry
316                .value()
317                .try_into()
318                .map_err(|_| StorageError::corrupted("invalid instance_id bytes"))?;
319            crate::sync::InstanceId::from_bytes(bytes)
320        };
321
322        info!(
323            schema_version = metadata.schema_version,
324            dimension = metadata.embedding_dimension.size(),
325            "Database opened successfully"
326        );
327
328        Ok(Self {
329            db,
330            metadata,
331            path,
332            #[cfg(feature = "sync")]
333            instance_id,
334        })
335    }
336
337    /// Returns a reference to the underlying redb database.
338    ///
339    /// This is for internal use by other PulseDB modules.
340    #[inline]
341    #[allow(dead_code)] // Used by Collective CRUD (E1-S02) and Experience CRUD (E1-S03)
342    pub(crate) fn database(&self) -> &Database {
343        &self.db
344    }
345
346    /// Increments the WAL sequence and records a watch event within an existing write transaction.
347    ///
348    /// This is the core of cross-process change detection. By executing within the caller's
349    /// transaction, the sequence increment and event record are atomic with the data mutation:
350    /// if the transaction commits, both are durable; if it rolls back, neither is visible.
351    ///
352    /// # Arguments
353    ///
354    /// * `write_txn` - The caller's open write transaction
355    /// * `entity_id` - The entity that changed (16-byte UUID)
356    /// * `collective_id` - The collective it belongs to
357    /// * `entity_type` - What kind of entity changed (Experience, Relation, etc.)
358    /// * `event_type` - What kind of change occurred (Created, Updated, etc.)
359    /// * `timestamp` - When the change occurred
360    fn increment_wal_and_record(
361        &self,
362        write_txn: &::redb::WriteTransaction,
363        entity_id: &[u8; 16],
364        collective_id: CollectiveId,
365        entity_type: EntityTypeTag,
366        event_type: WatchEventTypeTag,
367        timestamp: Timestamp,
368    ) -> Result<u64> {
369        // Echo prevention: skip WAL recording when applying sync changes
370        #[cfg(feature = "sync")]
371        if crate::sync::guard::is_sync_applying() {
372            return Ok(0);
373        }
374
375        // Read current sequence (0 if key doesn't exist yet)
376        let mut meta_table = write_txn.open_table(METADATA_TABLE)?;
377        let current_seq = match meta_table.get(WAL_SEQUENCE_KEY)? {
378            Some(entry) => {
379                let bytes: [u8; 8] = entry
380                    .value()
381                    .try_into()
382                    .map_err(|_| StorageError::corrupted("invalid wal_sequence bytes"))?;
383                u64::from_be_bytes(bytes)
384            }
385            None => 0,
386        };
387        let new_seq = current_seq + 1;
388
389        // Write new sequence number
390        let seq_bytes = new_seq.to_be_bytes();
391        meta_table.insert(WAL_SEQUENCE_KEY, seq_bytes.as_slice())?;
392
393        // Record the event (schema v2 with entity_type)
394        let record = WatchEventRecord {
395            entity_id: *entity_id,
396            collective_id: *collective_id.as_bytes(),
397            event_type,
398            timestamp_ms: timestamp.as_millis(),
399            entity_type,
400        };
401        let record_bytes =
402            bincode::serialize(&record).map_err(|e| StorageError::serialization(e.to_string()))?;
403
404        let mut events_table = write_txn.open_table(WATCH_EVENTS_TABLE)?;
405        events_table.insert(&seq_bytes, record_bytes.as_slice())?;
406
407        Ok(new_seq)
408    }
409
410    /// Migrates WAL records from schema v1 to v2.
411    ///
412    /// V1 records have 4 fields: experience_id, collective_id, event_type, timestamp_ms.
413    /// V2 adds entity_type (defaults to Experience for existing records).
414    fn migrate_wal_v1_to_v2(write_txn: &::redb::WriteTransaction) -> Result<()> {
415        use super::schema::WatchEventRecordV1;
416
417        let events_table = write_txn.open_table(WATCH_EVENTS_TABLE)?;
418
419        // Collect all (key, v1_record) pairs
420        let mut entries: Vec<([u8; 8], WatchEventRecordV1)> = Vec::new();
421        for entry in events_table.iter()? {
422            let (key, value) = entry.map_err(StorageError::from)?;
423            let seq_bytes: [u8; 8] = *key.value();
424            let v1_record: WatchEventRecordV1 = bincode::deserialize(value.value())
425                .map_err(|e| StorageError::serialization(format!("v1 WAL record: {}", e)))?;
426            entries.push((seq_bytes, v1_record));
427        }
428        drop(events_table);
429
430        // Rewrite as v2 records
431        let mut events_table = write_txn.open_table(WATCH_EVENTS_TABLE)?;
432        for (seq_bytes, v1) in &entries {
433            let v2 = WatchEventRecord {
434                entity_id: v1.experience_id,
435                collective_id: v1.collective_id,
436                event_type: v1.event_type,
437                timestamp_ms: v1.timestamp_ms,
438                entity_type: EntityTypeTag::Experience,
439            };
440            let v2_bytes =
441                bincode::serialize(&v2).map_err(|e| StorageError::serialization(e.to_string()))?;
442            events_table.insert(seq_bytes, v2_bytes.as_slice())?;
443        }
444
445        debug!(count = entries.len(), "Migrated WAL records to v2");
446        Ok(())
447    }
448
449    /// Returns the embedding dimension configured for this database.
450    #[inline]
451    pub fn embedding_dimension(&self) -> EmbeddingDimension {
452        self.metadata.embedding_dimension
453    }
454}
455
456impl StorageEngine for RedbStorage {
457    // =========================================================================
458    // Lifecycle
459    // =========================================================================
460
461    fn metadata(&self) -> &DatabaseMetadata {
462        &self.metadata
463    }
464
465    #[instrument(skip(self))]
466    fn close(self: Box<Self>) -> Result<()> {
467        info!("Closing storage engine");
468
469        // redb flushes all data durably on drop. Since `Database::drop` is
470        // infallible, this method currently always returns Ok(()). The Result
471        // return type is retained for API forward-compatibility if a future
472        // storage backend can report flush errors.
473        drop(self.db);
474
475        info!("Storage engine closed");
476        Ok(())
477    }
478
479    fn path(&self) -> Option<&Path> {
480        Some(&self.path)
481    }
482
483    // =========================================================================
484    // Collective Storage Operations
485    // =========================================================================
486
487    fn save_collective(&self, collective: &Collective) -> Result<()> {
488        let bytes = bincode::serialize(collective)
489            .map_err(|e| StorageError::serialization(e.to_string()))?;
490
491        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
492        {
493            let mut table = write_txn.open_table(COLLECTIVES_TABLE)?;
494            table.insert(collective.id.as_bytes(), bytes.as_slice())?;
495        }
496        self.increment_wal_and_record(
497            &write_txn,
498            collective.id.as_bytes(),
499            collective.id,
500            EntityTypeTag::Collective,
501            WatchEventTypeTag::Created,
502            collective.created_at,
503        )?;
504        write_txn.commit().map_err(StorageError::from)?;
505
506        debug!(id = %collective.id, name = %collective.name, "Collective saved");
507        Ok(())
508    }
509
510    fn get_collective(&self, id: CollectiveId) -> Result<Option<Collective>> {
511        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
512        let table = read_txn.open_table(COLLECTIVES_TABLE)?;
513
514        match table.get(id.as_bytes())? {
515            Some(value) => {
516                let collective: Collective = bincode::deserialize(value.value())
517                    .map_err(|e| StorageError::serialization(e.to_string()))?;
518                Ok(Some(collective))
519            }
520            None => Ok(None),
521        }
522    }
523
524    fn list_collectives(&self) -> Result<Vec<Collective>> {
525        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
526        let table = read_txn.open_table(COLLECTIVES_TABLE)?;
527
528        let mut collectives = Vec::new();
529        for result in table.iter()? {
530            let (_, value) = result.map_err(StorageError::from)?;
531            let collective: Collective = bincode::deserialize(value.value())
532                .map_err(|e| StorageError::serialization(e.to_string()))?;
533            collectives.push(collective);
534        }
535
536        Ok(collectives)
537    }
538
539    fn delete_collective(&self, id: CollectiveId) -> Result<bool> {
540        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
541        let existed;
542        {
543            let mut table = write_txn.open_table(COLLECTIVES_TABLE)?;
544            existed = table.remove(id.as_bytes())?.is_some();
545        }
546        write_txn.commit().map_err(StorageError::from)?;
547
548        if existed {
549            debug!(id = %id, "Collective deleted");
550        }
551        Ok(existed)
552    }
553
554    // =========================================================================
555    // Experience Index Operations
556    // =========================================================================
557
558    fn count_experiences_in_collective(&self, id: CollectiveId) -> Result<u64> {
559        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
560        let table = read_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
561
562        let count = table.get(id.as_bytes())?.count() as u64;
563
564        Ok(count)
565    }
566
567    fn delete_experiences_by_collective(&self, id: CollectiveId) -> Result<u64> {
568        // Phase 1: Read — collect experience IDs and relation IDs to delete
569        let (exp_ids, relation_ids): (Vec<[u8; 16]>, Vec<[u8; 16]>) = {
570            let read_txn = self.db.begin_read().map_err(StorageError::from)?;
571            let table = read_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
572
573            let mut ids = Vec::new();
574            for result in table.get(id.as_bytes())? {
575                let value = result.map_err(StorageError::from)?;
576                let entry = value.value();
577                // Entry is [timestamp: 8 bytes][experience_id: 16 bytes]
578                let mut exp_id = [0u8; 16];
579                exp_id.copy_from_slice(&entry[8..24]);
580                ids.push(exp_id);
581            }
582
583            // Collect all relation IDs for these experiences (deduplicated)
584            let mut rel_ids = std::collections::HashSet::new();
585            let source_table = read_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
586            let target_table = read_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
587            for exp_id in &ids {
588                for result in source_table.get(exp_id)? {
589                    let value = result.map_err(StorageError::from)?;
590                    rel_ids.insert(*value.value());
591                }
592                for result in target_table.get(exp_id)? {
593                    let value = result.map_err(StorageError::from)?;
594                    rel_ids.insert(*value.value());
595                }
596            }
597
598            (ids, rel_ids.into_iter().collect())
599        };
600
601        let count = exp_ids.len() as u64;
602        if count == 0 {
603            return Ok(0);
604        }
605
606        // Phase 2: Write — delete from all tables in a single transaction
607        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
608        {
609            // Delete experience records
610            let mut exp_table = write_txn.open_table(EXPERIENCES_TABLE)?;
611            for exp_id in &exp_ids {
612                exp_table.remove(exp_id)?;
613            }
614        }
615        {
616            // Delete embedding vectors
617            let mut emb_table = write_txn.open_table(EMBEDDINGS_TABLE)?;
618            for exp_id in &exp_ids {
619                emb_table.remove(exp_id)?;
620            }
621        }
622        {
623            // Clear the by-collective index for this collective
624            let mut idx_table = write_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
625            idx_table.remove_all(id.as_bytes())?;
626        }
627        {
628            // Clear the by-type index for all type variants of this collective
629            let mut type_table = write_txn.open_multimap_table(EXPERIENCES_BY_TYPE_TABLE)?;
630            for tag in ExperienceTypeTag::all() {
631                let key = encode_type_index_key(id.as_bytes(), *tag);
632                type_table.remove_all(&key)?;
633            }
634        }
635        {
636            // Delete relations and their index entries
637            if !relation_ids.is_empty() {
638                let mut rel_table = write_txn.open_table(RELATIONS_TABLE)?;
639                let mut source_idx = write_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
640                let mut target_idx = write_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
641
642                // Clear relation indexes for all affected experiences
643                for exp_id in &exp_ids {
644                    source_idx.remove_all(exp_id)?;
645                    target_idx.remove_all(exp_id)?;
646                }
647                // Delete the relation records themselves
648                for rel_id in &relation_ids {
649                    rel_table.remove(rel_id)?;
650                }
651
652                debug!(
653                    count = relation_ids.len(),
654                    "Cascade-deleted relations for collective"
655                );
656            }
657        }
658        write_txn.commit().map_err(StorageError::from)?;
659
660        debug!(id = %id, count = count, "Cascade-deleted experiences for collective");
661        Ok(count)
662    }
663
664    fn list_experience_ids_in_collective(&self, id: CollectiveId) -> Result<Vec<ExperienceId>> {
665        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
666        let table = read_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
667
668        let mut ids = Vec::new();
669        for result in table.get(id.as_bytes())? {
670            let value = result.map_err(StorageError::from)?;
671            let entry = value.value();
672            // Entry is [timestamp: 8 bytes][experience_id: 16 bytes]
673            let mut exp_bytes = [0u8; 16];
674            exp_bytes.copy_from_slice(&entry[8..24]);
675            ids.push(ExperienceId::from_bytes(exp_bytes));
676        }
677
678        Ok(ids)
679    }
680
681    fn get_recent_experience_ids(
682        &self,
683        collective_id: CollectiveId,
684        limit: usize,
685    ) -> Result<Vec<(ExperienceId, Timestamp)>> {
686        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
687        let table = read_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
688
689        // Collect all (ExperienceId, Timestamp) pairs for this collective.
690        // Multimap values are sorted ascending by [timestamp_be][exp_id],
691        // so we collect all and then take from the end for newest-first.
692        let mut entries = Vec::new();
693        for result in table.get(collective_id.as_bytes())? {
694            let value = result.map_err(StorageError::from)?;
695            let entry = value.value();
696            // Entry layout: [timestamp_be: 8 bytes][experience_id: 16 bytes]
697            let mut ts_bytes = [0u8; 8];
698            ts_bytes.copy_from_slice(&entry[..8]);
699            let timestamp = Timestamp::from_millis(i64::from_be_bytes(ts_bytes));
700
701            let mut exp_bytes = [0u8; 16];
702            exp_bytes.copy_from_slice(&entry[8..24]);
703            entries.push((ExperienceId::from_bytes(exp_bytes), timestamp));
704        }
705
706        // Take the last `limit` entries (newest) and reverse to get descending order
707        let start = entries.len().saturating_sub(limit);
708        let mut recent = entries.split_off(start);
709        recent.reverse();
710
711        Ok(recent)
712    }
713
714    // =========================================================================
715    // Experience Storage Operations
716    // =========================================================================
717
718    fn save_experience(&self, experience: &Experience) -> Result<()> {
719        // Serialize experience (embedding is #[serde(skip)], excluded automatically)
720        let exp_bytes = bincode::serialize(experience)
721            .map_err(|e| StorageError::serialization(e.to_string()))?;
722
723        // Convert embedding to raw little-endian bytes
724        let emb_bytes = f32_slice_to_bytes(&experience.embedding);
725
726        // Build index keys
727        let type_key = encode_type_index_key(
728            experience.collective_id.as_bytes(),
729            experience.experience_type.type_tag(),
730        );
731
732        // Write to all 4 tables in a single atomic transaction
733        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
734        {
735            // Main experience record
736            let mut exp_table = write_txn.open_table(EXPERIENCES_TABLE)?;
737            exp_table.insert(experience.id.as_bytes(), exp_bytes.as_slice())?;
738        }
739        {
740            // Embedding vector (stored separately for compactness)
741            let mut emb_table = write_txn.open_table(EMBEDDINGS_TABLE)?;
742            emb_table.insert(experience.id.as_bytes(), emb_bytes.as_slice())?;
743        }
744        {
745            // By-collective index: key=collective_id, value=timestamp+experience_id
746            let mut idx_table = write_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
747            // Value is [timestamp_be: 8 bytes][experience_id: 16 bytes] = 24 bytes
748            let mut value = [0u8; 24];
749            value[..8].copy_from_slice(&experience.timestamp.to_be_bytes());
750            value[8..24].copy_from_slice(experience.id.as_bytes());
751            idx_table.insert(experience.collective_id.as_bytes(), &value)?;
752        }
753        {
754            // By-type index: key=collective_id+type_tag, value=experience_id
755            let mut type_table = write_txn.open_multimap_table(EXPERIENCES_BY_TYPE_TABLE)?;
756            type_table.insert(&type_key, experience.id.as_bytes())?;
757        }
758        // Record WAL event for cross-process change detection
759        self.increment_wal_and_record(
760            &write_txn,
761            experience.id.as_bytes(),
762            experience.collective_id,
763            EntityTypeTag::Experience,
764            WatchEventTypeTag::Created,
765            experience.timestamp,
766        )?;
767        write_txn.commit().map_err(StorageError::from)?;
768
769        debug!(
770            id = %experience.id,
771            collective_id = %experience.collective_id,
772            "Experience saved"
773        );
774        Ok(())
775    }
776
777    fn get_experience(&self, id: ExperienceId) -> Result<Option<Experience>> {
778        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
779
780        // Read main experience record
781        let exp_table = read_txn.open_table(EXPERIENCES_TABLE)?;
782        let exp_entry = match exp_table.get(id.as_bytes())? {
783            Some(v) => v,
784            None => return Ok(None),
785        };
786
787        let mut experience: Experience = bincode::deserialize(exp_entry.value())
788            .map_err(|e| StorageError::serialization(e.to_string()))?;
789
790        // Read embedding from separate table and reconstitute
791        let emb_table = read_txn.open_table(EMBEDDINGS_TABLE)?;
792        if let Some(emb_entry) = emb_table.get(id.as_bytes())? {
793            experience.embedding = bytes_to_f32_vec(emb_entry.value());
794        }
795
796        Ok(Some(experience))
797    }
798
799    fn update_experience(&self, id: ExperienceId, update: &ExperienceUpdate) -> Result<bool> {
800        // Read-modify-write: read the current record, apply updates, write back
801        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
802        let collective_id;
803        let timestamp;
804        let is_archive;
805        {
806            let mut exp_table = write_txn.open_table(EXPERIENCES_TABLE)?;
807
808            let entry = match exp_table.get(id.as_bytes())? {
809                Some(v) => v,
810                None => return Ok(false),
811            };
812
813            let mut experience: Experience = bincode::deserialize(entry.value())
814                .map_err(|e| StorageError::serialization(e.to_string()))?;
815
816            // Drop the borrow on entry before mutating the table
817            drop(entry);
818
819            // Capture metadata for WAL event before applying updates
820            collective_id = experience.collective_id;
821            timestamp = experience.timestamp;
822            is_archive = update.archived == Some(true);
823
824            // Apply updates (only Some fields)
825            if let Some(importance) = update.importance {
826                experience.importance = importance;
827            }
828            if let Some(confidence) = update.confidence {
829                experience.confidence = confidence;
830            }
831            if let Some(ref domain) = update.domain {
832                experience.domain = domain.clone();
833            }
834            if let Some(ref related_files) = update.related_files {
835                experience.related_files = related_files.clone();
836            }
837            if let Some(archived) = update.archived {
838                experience.archived = archived;
839            }
840
841            // Re-serialize and write back
842            let bytes = bincode::serialize(&experience)
843                .map_err(|e| StorageError::serialization(e.to_string()))?;
844            exp_table.insert(id.as_bytes(), bytes.as_slice())?;
845        }
846        // Record WAL event for cross-process change detection
847        let event_type = if is_archive {
848            WatchEventTypeTag::Archived
849        } else {
850            WatchEventTypeTag::Updated
851        };
852        self.increment_wal_and_record(
853            &write_txn,
854            id.as_bytes(),
855            collective_id,
856            EntityTypeTag::Experience,
857            event_type,
858            timestamp,
859        )?;
860        write_txn.commit().map_err(StorageError::from)?;
861
862        debug!(id = %id, "Experience updated");
863        Ok(true)
864    }
865
866    fn delete_experience(&self, id: ExperienceId) -> Result<bool> {
867        // First read the experience to get collective_id, timestamp, and type_tag
868        // (needed for cleaning up secondary indices and WAL event)
869        let (collective_id, timestamp, type_tag) = {
870            let read_txn = self.db.begin_read().map_err(StorageError::from)?;
871            let exp_table = read_txn.open_table(EXPERIENCES_TABLE)?;
872
873            match exp_table.get(id.as_bytes())? {
874                Some(entry) => {
875                    let exp: Experience = bincode::deserialize(entry.value())
876                        .map_err(|e| StorageError::serialization(e.to_string()))?;
877                    (
878                        exp.collective_id,
879                        exp.timestamp,
880                        exp.experience_type.type_tag(),
881                    )
882                }
883                None => return Ok(false),
884            }
885        };
886
887        // Delete from all 4 tables in a single transaction
888        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
889        {
890            let mut exp_table = write_txn.open_table(EXPERIENCES_TABLE)?;
891            exp_table.remove(id.as_bytes())?;
892        }
893        {
894            let mut emb_table = write_txn.open_table(EMBEDDINGS_TABLE)?;
895            emb_table.remove(id.as_bytes())?;
896        }
897        {
898            // Remove specific entry from by-collective multimap
899            let mut idx_table = write_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
900            let mut value = [0u8; 24];
901            value[..8].copy_from_slice(&timestamp.to_be_bytes());
902            value[8..24].copy_from_slice(id.as_bytes());
903            idx_table.remove(collective_id.as_bytes(), &value)?;
904        }
905        {
906            // Remove specific entry from by-type multimap
907            let mut type_table = write_txn.open_multimap_table(EXPERIENCES_BY_TYPE_TABLE)?;
908            let type_key = encode_type_index_key(collective_id.as_bytes(), type_tag);
909            type_table.remove(&type_key, id.as_bytes())?;
910        }
911        // Record WAL event for cross-process change detection
912        self.increment_wal_and_record(
913            &write_txn,
914            id.as_bytes(),
915            collective_id,
916            EntityTypeTag::Experience,
917            WatchEventTypeTag::Deleted,
918            timestamp,
919        )?;
920        write_txn.commit().map_err(StorageError::from)?;
921
922        debug!(id = %id, "Experience deleted");
923        Ok(true)
924    }
925
926    fn reinforce_experience(&self, id: ExperienceId) -> Result<Option<u32>> {
927        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
928        let (new_count, collective_id, timestamp) = {
929            let mut exp_table = write_txn.open_table(EXPERIENCES_TABLE)?;
930
931            let entry = match exp_table.get(id.as_bytes())? {
932                Some(v) => v,
933                None => return Ok(None),
934            };
935
936            let mut experience: Experience = bincode::deserialize(entry.value())
937                .map_err(|e| StorageError::serialization(e.to_string()))?;
938            drop(entry);
939
940            experience.applications = experience.applications.saturating_add(1);
941            let new_count = experience.applications;
942            let collective_id = experience.collective_id;
943            let timestamp = experience.timestamp;
944
945            let bytes = bincode::serialize(&experience)
946                .map_err(|e| StorageError::serialization(e.to_string()))?;
947            exp_table.insert(id.as_bytes(), bytes.as_slice())?;
948            (new_count, collective_id, timestamp)
949        };
950        // Record WAL event for cross-process change detection
951        self.increment_wal_and_record(
952            &write_txn,
953            id.as_bytes(),
954            collective_id,
955            EntityTypeTag::Experience,
956            WatchEventTypeTag::Updated,
957            timestamp,
958        )?;
959        write_txn.commit().map_err(StorageError::from)?;
960
961        debug!(id = %id, applications = new_count, "Experience reinforced");
962        Ok(Some(new_count))
963    }
964
965    fn save_embedding(&self, id: ExperienceId, embedding: &[f32]) -> Result<()> {
966        let bytes = f32_slice_to_bytes(embedding);
967
968        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
969        {
970            let mut table = write_txn.open_table(EMBEDDINGS_TABLE)?;
971            table.insert(id.as_bytes(), bytes.as_slice())?;
972        }
973        write_txn.commit().map_err(StorageError::from)?;
974
975        debug!(id = %id, dim = embedding.len(), "Embedding saved");
976        Ok(())
977    }
978
979    fn get_embedding(&self, id: ExperienceId) -> Result<Option<Vec<f32>>> {
980        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
981        let table = read_txn.open_table(EMBEDDINGS_TABLE)?;
982
983        match table.get(id.as_bytes())? {
984            Some(entry) => Ok(Some(bytes_to_f32_vec(entry.value()))),
985            None => Ok(None),
986        }
987    }
988
989    // =========================================================================
990    // Relation Storage Operations (E3-S01)
991    // =========================================================================
992
993    fn save_relation(&self, relation: &ExperienceRelation) -> Result<()> {
994        let bytes =
995            bincode::serialize(relation).map_err(|e| StorageError::serialization(e.to_string()))?;
996
997        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
998        {
999            let mut table = write_txn.open_table(RELATIONS_TABLE)?;
1000            table.insert(relation.id.as_bytes(), bytes.as_slice())?;
1001        }
1002        {
1003            let mut table = write_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
1004            table.insert(relation.source_id.as_bytes(), relation.id.as_bytes())?;
1005        }
1006        {
1007            let mut table = write_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
1008            table.insert(relation.target_id.as_bytes(), relation.id.as_bytes())?;
1009        }
1010        // Look up collective_id from source experience for WAL record
1011        let collective_id = {
1012            let exp_table = write_txn.open_table(EXPERIENCES_TABLE)?;
1013            let entry = exp_table
1014                .get(relation.source_id.as_bytes())?
1015                .ok_or_else(|| {
1016                    StorageError::corrupted("relation source experience not found for WAL record")
1017                })?;
1018            let exp: Experience = bincode::deserialize(entry.value())
1019                .map_err(|e| StorageError::serialization(e.to_string()))?;
1020            exp.collective_id
1021        };
1022        self.increment_wal_and_record(
1023            &write_txn,
1024            relation.id.as_bytes(),
1025            collective_id,
1026            EntityTypeTag::Relation,
1027            WatchEventTypeTag::Created,
1028            relation.created_at,
1029        )?;
1030        write_txn.commit().map_err(StorageError::from)?;
1031
1032        debug!(id = %relation.id, "Relation saved");
1033        Ok(())
1034    }
1035
1036    fn get_relation(&self, id: RelationId) -> Result<Option<ExperienceRelation>> {
1037        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1038        let table = read_txn.open_table(RELATIONS_TABLE)?;
1039
1040        match table.get(id.as_bytes())? {
1041            Some(value) => {
1042                let relation: ExperienceRelation = bincode::deserialize(value.value())
1043                    .map_err(|e| StorageError::serialization(e.to_string()))?;
1044                Ok(Some(relation))
1045            }
1046            None => Ok(None),
1047        }
1048    }
1049
1050    fn delete_relation(&self, id: RelationId) -> Result<bool> {
1051        // Read the relation first to get source/target IDs for index cleanup
1052        // and source experience's collective_id for WAL record
1053        let (source_id, target_id, collective_id) = {
1054            let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1055            let rel_table = read_txn.open_table(RELATIONS_TABLE)?;
1056
1057            match rel_table.get(id.as_bytes())? {
1058                Some(entry) => {
1059                    let rel: ExperienceRelation = bincode::deserialize(entry.value())
1060                        .map_err(|e| StorageError::serialization(e.to_string()))?;
1061                    // Look up collective_id from source experience
1062                    let exp_table = read_txn.open_table(EXPERIENCES_TABLE)?;
1063                    let cid = match exp_table.get(rel.source_id.as_bytes())? {
1064                        Some(exp_entry) => {
1065                            let exp: Experience = bincode::deserialize(exp_entry.value())
1066                                .map_err(|e| StorageError::serialization(e.to_string()))?;
1067                            exp.collective_id
1068                        }
1069                        // Source experience may have been deleted; use nil collective
1070                        None => CollectiveId::nil(),
1071                    };
1072                    (rel.source_id, rel.target_id, cid)
1073                }
1074                None => return Ok(false),
1075            }
1076        };
1077
1078        // Delete from all 3 tables atomically
1079        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1080        {
1081            let mut table = write_txn.open_table(RELATIONS_TABLE)?;
1082            table.remove(id.as_bytes())?;
1083        }
1084        {
1085            let mut table = write_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
1086            table.remove(source_id.as_bytes(), id.as_bytes())?;
1087        }
1088        {
1089            let mut table = write_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
1090            table.remove(target_id.as_bytes(), id.as_bytes())?;
1091        }
1092        self.increment_wal_and_record(
1093            &write_txn,
1094            id.as_bytes(),
1095            collective_id,
1096            EntityTypeTag::Relation,
1097            WatchEventTypeTag::Deleted,
1098            Timestamp::now(),
1099        )?;
1100        write_txn.commit().map_err(StorageError::from)?;
1101
1102        debug!(id = %id, "Relation deleted");
1103        Ok(true)
1104    }
1105
1106    fn get_relation_ids_by_source(&self, experience_id: ExperienceId) -> Result<Vec<RelationId>> {
1107        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1108        let table = read_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
1109
1110        let mut ids = Vec::new();
1111        for result in table.get(experience_id.as_bytes())? {
1112            let value = result.map_err(StorageError::from)?;
1113            let bytes = value.value();
1114            ids.push(RelationId::from_bytes(*bytes));
1115        }
1116
1117        Ok(ids)
1118    }
1119
1120    fn get_relation_ids_by_target(&self, experience_id: ExperienceId) -> Result<Vec<RelationId>> {
1121        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1122        let table = read_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
1123
1124        let mut ids = Vec::new();
1125        for result in table.get(experience_id.as_bytes())? {
1126            let value = result.map_err(StorageError::from)?;
1127            let bytes = value.value();
1128            ids.push(RelationId::from_bytes(*bytes));
1129        }
1130
1131        Ok(ids)
1132    }
1133
1134    fn delete_relations_for_experience(&self, experience_id: ExperienceId) -> Result<u64> {
1135        // Phase 1: Read — collect all relation IDs from both indexes
1136        let relation_ids: Vec<RelationId> = {
1137            let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1138            let source_table = read_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
1139            let target_table = read_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
1140
1141            let mut ids = std::collections::HashSet::new();
1142
1143            // Outgoing relations (this experience is source)
1144            for result in source_table.get(experience_id.as_bytes())? {
1145                let value = result.map_err(StorageError::from)?;
1146                ids.insert(RelationId::from_bytes(*value.value()));
1147            }
1148
1149            // Incoming relations (this experience is target)
1150            for result in target_table.get(experience_id.as_bytes())? {
1151                let value = result.map_err(StorageError::from)?;
1152                ids.insert(RelationId::from_bytes(*value.value()));
1153            }
1154
1155            ids.into_iter().collect()
1156        };
1157
1158        let count = relation_ids.len() as u64;
1159        if count == 0 {
1160            return Ok(0);
1161        }
1162
1163        // Phase 2: Read each relation to get source/target IDs for index cleanup
1164        let relations: Vec<ExperienceRelation> = {
1165            let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1166            let table = read_txn.open_table(RELATIONS_TABLE)?;
1167
1168            let mut rels = Vec::with_capacity(relation_ids.len());
1169            for rel_id in &relation_ids {
1170                if let Some(entry) = table.get(rel_id.as_bytes())? {
1171                    let rel: ExperienceRelation = bincode::deserialize(entry.value())
1172                        .map_err(|e| StorageError::serialization(e.to_string()))?;
1173                    rels.push(rel);
1174                }
1175            }
1176            rels
1177        };
1178
1179        // Phase 3: Write — delete from all 3 tables atomically
1180        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1181        {
1182            let mut rel_table = write_txn.open_table(RELATIONS_TABLE)?;
1183            for rel in &relations {
1184                rel_table.remove(rel.id.as_bytes())?;
1185            }
1186        }
1187        {
1188            let mut source_table = write_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
1189            for rel in &relations {
1190                source_table.remove(rel.source_id.as_bytes(), rel.id.as_bytes())?;
1191            }
1192        }
1193        {
1194            let mut target_table = write_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
1195            for rel in &relations {
1196                target_table.remove(rel.target_id.as_bytes(), rel.id.as_bytes())?;
1197            }
1198        }
1199        write_txn.commit().map_err(StorageError::from)?;
1200
1201        debug!(
1202            experience_id = %experience_id,
1203            count = count,
1204            "Cascade-deleted relations for experience"
1205        );
1206        Ok(count)
1207    }
1208
1209    fn relation_exists(
1210        &self,
1211        source_id: ExperienceId,
1212        target_id: ExperienceId,
1213        relation_type: RelationType,
1214    ) -> Result<bool> {
1215        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1216        let index_table = read_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
1217        let rel_table = read_txn.open_table(RELATIONS_TABLE)?;
1218
1219        // Scan all relations for this source and check each
1220        for result in index_table.get(source_id.as_bytes())? {
1221            let value = result.map_err(StorageError::from)?;
1222            let rel_id = RelationId::from_bytes(*value.value());
1223
1224            if let Some(entry) = rel_table.get(rel_id.as_bytes())? {
1225                let rel: ExperienceRelation = bincode::deserialize(entry.value())
1226                    .map_err(|e| StorageError::serialization(e.to_string()))?;
1227                if rel.target_id == target_id && rel.relation_type == relation_type {
1228                    return Ok(true);
1229                }
1230            }
1231        }
1232
1233        Ok(false)
1234    }
1235
1236    // =========================================================================
1237    // Insight Storage Operations (E3-S02)
1238    // =========================================================================
1239
1240    fn save_insight(&self, insight: &DerivedInsight) -> Result<()> {
1241        let bytes =
1242            bincode::serialize(insight).map_err(|e| StorageError::serialization(e.to_string()))?;
1243
1244        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1245        {
1246            let mut table = write_txn.open_table(INSIGHTS_TABLE)?;
1247            table.insert(insight.id.as_bytes(), bytes.as_slice())?;
1248        }
1249        {
1250            let mut table = write_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
1251            table.insert(insight.collective_id.as_bytes(), insight.id.as_bytes())?;
1252        }
1253        self.increment_wal_and_record(
1254            &write_txn,
1255            insight.id.as_bytes(),
1256            insight.collective_id,
1257            EntityTypeTag::Insight,
1258            WatchEventTypeTag::Created,
1259            insight.created_at,
1260        )?;
1261        write_txn.commit().map_err(StorageError::from)?;
1262
1263        debug!(id = %insight.id, collective_id = %insight.collective_id, "Insight saved");
1264        Ok(())
1265    }
1266
1267    fn get_insight(&self, id: InsightId) -> Result<Option<DerivedInsight>> {
1268        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1269        let table = read_txn.open_table(INSIGHTS_TABLE)?;
1270
1271        match table.get(id.as_bytes())? {
1272            Some(value) => {
1273                let insight: DerivedInsight = bincode::deserialize(value.value())
1274                    .map_err(|e| StorageError::serialization(e.to_string()))?;
1275                Ok(Some(insight))
1276            }
1277            None => Ok(None),
1278        }
1279    }
1280
1281    fn delete_insight(&self, id: InsightId) -> Result<bool> {
1282        // Read the insight first to get collective_id for index cleanup
1283        let collective_id = {
1284            let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1285            let table = read_txn.open_table(INSIGHTS_TABLE)?;
1286
1287            match table.get(id.as_bytes())? {
1288                Some(entry) => {
1289                    let insight: DerivedInsight = bincode::deserialize(entry.value())
1290                        .map_err(|e| StorageError::serialization(e.to_string()))?;
1291                    insight.collective_id
1292                }
1293                None => return Ok(false),
1294            }
1295        };
1296
1297        // Delete from both tables atomically
1298        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1299        {
1300            let mut table = write_txn.open_table(INSIGHTS_TABLE)?;
1301            table.remove(id.as_bytes())?;
1302        }
1303        {
1304            let mut table = write_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
1305            table.remove(collective_id.as_bytes(), id.as_bytes())?;
1306        }
1307        self.increment_wal_and_record(
1308            &write_txn,
1309            id.as_bytes(),
1310            collective_id,
1311            EntityTypeTag::Insight,
1312            WatchEventTypeTag::Deleted,
1313            Timestamp::now(),
1314        )?;
1315        write_txn.commit().map_err(StorageError::from)?;
1316
1317        debug!(id = %id, "Insight deleted");
1318        Ok(true)
1319    }
1320
1321    fn list_insight_ids_in_collective(&self, id: CollectiveId) -> Result<Vec<InsightId>> {
1322        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1323        let table = read_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
1324
1325        let mut ids = Vec::new();
1326        for result in table.get(id.as_bytes())? {
1327            let value = result.map_err(StorageError::from)?;
1328            ids.push(InsightId::from_bytes(*value.value()));
1329        }
1330
1331        Ok(ids)
1332    }
1333
1334    fn delete_insights_by_collective(&self, id: CollectiveId) -> Result<u64> {
1335        // Phase 1: Read — collect insight IDs
1336        let insight_ids: Vec<[u8; 16]> = {
1337            let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1338            let table = read_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
1339
1340            let mut ids = Vec::new();
1341            for result in table.get(id.as_bytes())? {
1342                let value = result.map_err(StorageError::from)?;
1343                ids.push(*value.value());
1344            }
1345            ids
1346        };
1347
1348        let count = insight_ids.len() as u64;
1349        if count == 0 {
1350            return Ok(0);
1351        }
1352
1353        // Phase 2: Write — delete from both tables atomically
1354        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1355        {
1356            let mut table = write_txn.open_table(INSIGHTS_TABLE)?;
1357            for insight_id in &insight_ids {
1358                table.remove(insight_id)?;
1359            }
1360        }
1361        {
1362            let mut table = write_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
1363            table.remove_all(id.as_bytes())?;
1364        }
1365        write_txn.commit().map_err(StorageError::from)?;
1366
1367        debug!(id = %id, count = count, "Cascade-deleted insights for collective");
1368        Ok(count)
1369    }
1370
1371    // =========================================================================
1372    // Activity Storage Operations (E3-S03)
1373    // =========================================================================
1374
1375    fn save_activity(&self, activity: &Activity) -> Result<()> {
1376        let key = encode_activity_key(activity.collective_id.as_bytes(), &activity.agent_id);
1377        let bytes =
1378            bincode::serialize(activity).map_err(|e| StorageError::serialization(e.to_string()))?;
1379
1380        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1381        {
1382            let mut table = write_txn.open_table(ACTIVITIES_TABLE)?;
1383            table.insert(key.as_slice(), bytes.as_slice())?;
1384        }
1385        write_txn.commit().map_err(StorageError::from)?;
1386
1387        debug!(
1388            agent_id = %activity.agent_id,
1389            collective_id = %activity.collective_id,
1390            "Activity saved"
1391        );
1392        Ok(())
1393    }
1394
1395    fn get_activity(
1396        &self,
1397        agent_id: &str,
1398        collective_id: CollectiveId,
1399    ) -> Result<Option<Activity>> {
1400        let key = encode_activity_key(collective_id.as_bytes(), agent_id);
1401
1402        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1403        let table = read_txn.open_table(ACTIVITIES_TABLE)?;
1404
1405        match table.get(key.as_slice())? {
1406            Some(value) => {
1407                let activity: Activity = bincode::deserialize(value.value())
1408                    .map_err(|e| StorageError::serialization(e.to_string()))?;
1409                Ok(Some(activity))
1410            }
1411            None => Ok(None),
1412        }
1413    }
1414
1415    fn delete_activity(&self, agent_id: &str, collective_id: CollectiveId) -> Result<bool> {
1416        let key = encode_activity_key(collective_id.as_bytes(), agent_id);
1417
1418        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1419        let existed = {
1420            let mut table = write_txn.open_table(ACTIVITIES_TABLE)?;
1421            let removed = table.remove(key.as_slice())?;
1422            removed.is_some()
1423        };
1424        write_txn.commit().map_err(StorageError::from)?;
1425
1426        if existed {
1427            debug!(agent_id = %agent_id, collective_id = %collective_id, "Activity deleted");
1428        }
1429        Ok(existed)
1430    }
1431
1432    fn list_activities_in_collective(&self, collective_id: CollectiveId) -> Result<Vec<Activity>> {
1433        let prefix = collective_id.as_bytes();
1434
1435        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1436        let table = read_txn.open_table(ACTIVITIES_TABLE)?;
1437
1438        let mut activities = Vec::new();
1439        for result in table.iter()? {
1440            let (key, value) = result.map_err(StorageError::from)?;
1441            let key_bytes = key.value();
1442
1443            // Check if this key belongs to the requested collective (16-byte prefix)
1444            if key_bytes.len() >= 16 && decode_collective_from_activity_key(key_bytes) == *prefix {
1445                let activity: Activity = bincode::deserialize(value.value())
1446                    .map_err(|e| StorageError::serialization(e.to_string()))?;
1447                activities.push(activity);
1448            }
1449        }
1450
1451        Ok(activities)
1452    }
1453
1454    fn delete_activities_by_collective(&self, collective_id: CollectiveId) -> Result<u64> {
1455        let prefix = collective_id.as_bytes();
1456
1457        // Phase 1: Read — collect matching keys
1458        let keys_to_delete: Vec<Vec<u8>> = {
1459            let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1460            let table = read_txn.open_table(ACTIVITIES_TABLE)?;
1461
1462            let mut keys = Vec::new();
1463            for result in table.iter()? {
1464                let (key, _) = result.map_err(StorageError::from)?;
1465                let key_bytes = key.value();
1466
1467                if key_bytes.len() >= 16
1468                    && decode_collective_from_activity_key(key_bytes) == *prefix
1469                {
1470                    keys.push(key_bytes.to_vec());
1471                }
1472            }
1473            keys
1474        };
1475
1476        let count = keys_to_delete.len() as u64;
1477        if count == 0 {
1478            return Ok(0);
1479        }
1480
1481        // Phase 2: Write — delete all collected keys
1482        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1483        {
1484            let mut table = write_txn.open_table(ACTIVITIES_TABLE)?;
1485            for key in &keys_to_delete {
1486                table.remove(key.as_slice())?;
1487            }
1488        }
1489        write_txn.commit().map_err(StorageError::from)?;
1490
1491        debug!(
1492            collective_id = %collective_id,
1493            count = count,
1494            "Cascade-deleted activities for collective"
1495        );
1496        Ok(count)
1497    }
1498
1499    // =========================================================================
1500    // Watch Event Operations (E4-S02)
1501    // =========================================================================
1502
1503    fn get_wal_sequence(&self) -> Result<u64> {
1504        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1505        let meta_table = read_txn.open_table(METADATA_TABLE)?;
1506        match meta_table.get(WAL_SEQUENCE_KEY)? {
1507            Some(entry) => {
1508                let bytes: [u8; 8] = entry
1509                    .value()
1510                    .try_into()
1511                    .map_err(|_| StorageError::corrupted("invalid wal_sequence bytes"))?;
1512                Ok(u64::from_be_bytes(bytes))
1513            }
1514            None => Ok(0),
1515        }
1516    }
1517
1518    fn poll_watch_events(
1519        &self,
1520        since_seq: u64,
1521        limit: usize,
1522    ) -> Result<(Vec<WatchEventRecord>, u64)> {
1523        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1524        let events_table = read_txn.open_table(WATCH_EVENTS_TABLE)?;
1525
1526        let start_key = (since_seq + 1).to_be_bytes();
1527        let end_key = u64::MAX.to_be_bytes();
1528        let mut events = Vec::new();
1529        let mut max_seq = since_seq;
1530
1531        for entry in events_table.range::<&[u8; 8]>(&start_key..=&end_key)? {
1532            let (key, value) = entry.map_err(StorageError::from)?;
1533            let seq = u64::from_be_bytes(*key.value());
1534            let record: WatchEventRecord = bincode::deserialize(value.value())
1535                .map_err(|e| StorageError::serialization(e.to_string()))?;
1536            events.push(record);
1537            max_seq = seq;
1538            if events.len() >= limit {
1539                break;
1540            }
1541        }
1542
1543        Ok((events, max_seq))
1544    }
1545
1546    // =========================================================================
1547    // Sync Operations (feature: sync)
1548    // =========================================================================
1549
1550    #[cfg(feature = "sync")]
1551    fn poll_sync_events(
1552        &self,
1553        since_seq: u64,
1554        limit: usize,
1555    ) -> Result<Vec<(u64, WatchEventRecord)>> {
1556        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1557        let events_table = read_txn.open_table(WATCH_EVENTS_TABLE)?;
1558
1559        let start_key = (since_seq + 1).to_be_bytes();
1560        let end_key = u64::MAX.to_be_bytes();
1561        let mut events = Vec::new();
1562
1563        for entry in events_table.range::<&[u8; 8]>(&start_key..=&end_key)? {
1564            let (key, value) = entry.map_err(StorageError::from)?;
1565            let seq = u64::from_be_bytes(*key.value());
1566            let record: WatchEventRecord = bincode::deserialize(value.value())
1567                .map_err(|e| StorageError::serialization(e.to_string()))?;
1568            events.push((seq, record));
1569            if events.len() >= limit {
1570                break;
1571            }
1572        }
1573
1574        Ok(events)
1575    }
1576
1577    #[cfg(feature = "sync")]
1578    fn instance_id(&self) -> crate::sync::InstanceId {
1579        self.instance_id
1580    }
1581
1582    #[cfg(feature = "sync")]
1583    fn save_sync_cursor(&self, cursor: &crate::sync::SyncCursor) -> Result<()> {
1584        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1585        {
1586            let mut table = write_txn.open_table(SYNC_CURSORS_TABLE)?;
1587            let bytes = bincode::serialize(cursor)
1588                .map_err(|e| StorageError::serialization(e.to_string()))?;
1589            table.insert(cursor.instance_id.as_bytes(), bytes.as_slice())?;
1590        }
1591        write_txn.commit().map_err(StorageError::from)?;
1592        debug!(
1593            peer = %cursor.instance_id,
1594            last_sequence = cursor.last_sequence,
1595            "Saved sync cursor"
1596        );
1597        Ok(())
1598    }
1599
1600    #[cfg(feature = "sync")]
1601    fn load_sync_cursor(
1602        &self,
1603        instance_id: &crate::sync::InstanceId,
1604    ) -> Result<Option<crate::sync::SyncCursor>> {
1605        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1606        let table = read_txn.open_table(SYNC_CURSORS_TABLE)?;
1607        match table.get(instance_id.as_bytes())? {
1608            Some(entry) => {
1609                let cursor: crate::sync::SyncCursor = bincode::deserialize(entry.value())
1610                    .map_err(|e| StorageError::serialization(e.to_string()))?;
1611                Ok(Some(cursor))
1612            }
1613            None => Ok(None),
1614        }
1615    }
1616
1617    #[cfg(feature = "sync")]
1618    fn list_sync_cursors(&self) -> Result<Vec<crate::sync::SyncCursor>> {
1619        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1620        let table = read_txn.open_table(SYNC_CURSORS_TABLE)?;
1621        let mut cursors = Vec::new();
1622        for entry in table.iter()? {
1623            let (_, value) = entry.map_err(StorageError::from)?;
1624            let cursor: crate::sync::SyncCursor = bincode::deserialize(value.value())
1625                .map_err(|e| StorageError::serialization(e.to_string()))?;
1626            cursors.push(cursor);
1627        }
1628        Ok(cursors)
1629    }
1630
1631    #[cfg(feature = "sync")]
1632    fn compact_wal_events(&self, up_to_seq: u64) -> Result<u64> {
1633        if up_to_seq == 0 {
1634            return Ok(0);
1635        }
1636
1637        // Collect keys to delete in a read pass
1638        let keys_to_delete: Vec<[u8; 8]> = {
1639            let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1640            let events_table = read_txn.open_table(WATCH_EVENTS_TABLE)?;
1641
1642            let start_key = 1u64.to_be_bytes();
1643            let end_key = up_to_seq.to_be_bytes();
1644            let mut keys = Vec::new();
1645
1646            for entry in events_table.range::<&[u8; 8]>(&start_key..=&end_key)? {
1647                let (key, _) = entry.map_err(StorageError::from)?;
1648                keys.push(*key.value());
1649            }
1650            keys
1651        };
1652
1653        if keys_to_delete.is_empty() {
1654            return Ok(0);
1655        }
1656
1657        let count = keys_to_delete.len() as u64;
1658
1659        // Delete in a write transaction
1660        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1661        {
1662            let mut events_table = write_txn.open_table(WATCH_EVENTS_TABLE)?;
1663            for key in &keys_to_delete {
1664                events_table.remove(key)?;
1665            }
1666        }
1667        write_txn.commit().map_err(StorageError::from)?;
1668
1669        debug!(count, up_to_seq, "Compacted WAL events");
1670        Ok(count)
1671    }
1672}
1673
1674// ============================================================================
1675// Embedding byte conversion helpers
1676// ============================================================================
1677
1678/// Converts a slice of f32 values to raw little-endian bytes.
1679#[inline]
1680fn f32_slice_to_bytes(data: &[f32]) -> Vec<u8> {
1681    let mut bytes = Vec::with_capacity(data.len() * 4);
1682    for &val in data {
1683        bytes.extend_from_slice(&val.to_le_bytes());
1684    }
1685    bytes
1686}
1687
1688/// Converts raw little-endian bytes back to a Vec<f32>.
1689#[inline]
1690fn bytes_to_f32_vec(data: &[u8]) -> Vec<f32> {
1691    data.chunks_exact(4)
1692        .map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
1693        .collect()
1694}
1695
1696// RedbStorage is auto Send + Sync: Database, DatabaseMetadata, and PathBuf
1697// are all Send + Sync.
1698
1699#[cfg(test)]
1700mod tests {
1701    use super::*;
1702    use tempfile::tempdir;
1703
1704    fn default_config() -> Config {
1705        Config::default()
1706    }
1707
1708    #[test]
1709    fn test_open_creates_new_database() {
1710        let dir = tempdir().unwrap();
1711        let path = dir.path().join("test.db");
1712
1713        assert!(!path.exists());
1714
1715        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1716
1717        assert!(path.exists());
1718        assert_eq!(storage.metadata().schema_version, SCHEMA_VERSION);
1719        assert_eq!(
1720            storage.metadata().embedding_dimension,
1721            EmbeddingDimension::D384
1722        );
1723
1724        Box::new(storage).close().unwrap();
1725    }
1726
1727    #[test]
1728    fn test_open_existing_database() {
1729        let dir = tempdir().unwrap();
1730        let path = dir.path().join("test.db");
1731
1732        // Create database
1733        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1734        let created_at = storage.metadata().created_at;
1735        Box::new(storage).close().unwrap();
1736
1737        // Reopen
1738        std::thread::sleep(std::time::Duration::from_millis(10));
1739        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1740
1741        // created_at should be preserved
1742        assert_eq!(storage.metadata().created_at, created_at);
1743        // last_opened_at should be updated
1744        assert!(storage.metadata().last_opened_at > created_at);
1745
1746        Box::new(storage).close().unwrap();
1747    }
1748
1749    #[test]
1750    fn test_dimension_mismatch_returns_error() {
1751        let dir = tempdir().unwrap();
1752        let path = dir.path().join("test.db");
1753
1754        // Create with D384
1755        let config_384 = Config {
1756            embedding_dimension: EmbeddingDimension::D384,
1757            ..Default::default()
1758        };
1759        let storage = RedbStorage::open(&path, &config_384).unwrap();
1760        Box::new(storage).close().unwrap();
1761
1762        // Try to reopen with D768
1763        let config_768 = Config {
1764            embedding_dimension: EmbeddingDimension::D768,
1765            ..Default::default()
1766        };
1767        let result = RedbStorage::open(&path, &config_768);
1768
1769        assert!(result.is_err());
1770        let err = result.unwrap_err();
1771        assert!(matches!(
1772            err,
1773            PulseDBError::Validation(ValidationError::DimensionMismatch { .. })
1774        ));
1775    }
1776
1777    #[test]
1778    fn test_database_files_created() {
1779        let dir = tempdir().unwrap();
1780        let path = dir.path().join("pulse.db");
1781
1782        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1783
1784        // Main database file should exist
1785        assert!(path.exists());
1786        assert!(storage.path().is_some());
1787        assert_eq!(storage.path().unwrap(), path);
1788
1789        Box::new(storage).close().unwrap();
1790    }
1791
1792    #[test]
1793    fn test_metadata_preserved_across_opens() {
1794        let dir = tempdir().unwrap();
1795        let path = dir.path().join("test.db");
1796
1797        let config = Config {
1798            embedding_dimension: EmbeddingDimension::Custom(512),
1799            ..Default::default()
1800        };
1801
1802        // Create
1803        let storage = RedbStorage::open(&path, &config).unwrap();
1804        assert_eq!(
1805            storage.metadata().embedding_dimension,
1806            EmbeddingDimension::Custom(512)
1807        );
1808        Box::new(storage).close().unwrap();
1809
1810        // Reopen
1811        let storage = RedbStorage::open(&path, &config).unwrap();
1812        assert_eq!(
1813            storage.metadata().embedding_dimension,
1814            EmbeddingDimension::Custom(512)
1815        );
1816        Box::new(storage).close().unwrap();
1817    }
1818
1819    #[test]
1820    fn test_embedding_dimension_accessor() {
1821        let dir = tempdir().unwrap();
1822        let path = dir.path().join("test.db");
1823
1824        let config = Config {
1825            embedding_dimension: EmbeddingDimension::D768,
1826            ..Default::default()
1827        };
1828
1829        let storage = RedbStorage::open(&path, &config).unwrap();
1830        assert_eq!(storage.embedding_dimension(), EmbeddingDimension::D768);
1831
1832        Box::new(storage).close().unwrap();
1833    }
1834
1835    #[test]
1836    fn test_all_six_tables_created() {
1837        let dir = tempdir().unwrap();
1838        let path = dir.path().join("test.db");
1839
1840        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1841
1842        // Verify all 6 tables exist by opening each in a read transaction.
1843        // If any table wasn't created during initialize_new(), this would
1844        // return a TableDoesNotExist error.
1845        let read_txn = storage.database().begin_read().unwrap();
1846
1847        read_txn.open_table(METADATA_TABLE).unwrap();
1848        read_txn.open_table(COLLECTIVES_TABLE).unwrap();
1849        read_txn.open_table(EXPERIENCES_TABLE).unwrap();
1850        read_txn.open_table(EMBEDDINGS_TABLE).unwrap();
1851        read_txn
1852            .open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)
1853            .unwrap();
1854        read_txn
1855            .open_multimap_table(EXPERIENCES_BY_TYPE_TABLE)
1856            .unwrap();
1857
1858        Box::new(storage).close().unwrap();
1859    }
1860
1861    // ====================================================================
1862    // Collective CRUD tests
1863    // ====================================================================
1864
1865    #[test]
1866    fn test_save_and_get_collective() {
1867        let dir = tempdir().unwrap();
1868        let path = dir.path().join("test.db");
1869        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1870
1871        let collective = Collective::new("test-project", 384);
1872        let id = collective.id;
1873
1874        storage.save_collective(&collective).unwrap();
1875
1876        let retrieved = storage.get_collective(id).unwrap().unwrap();
1877        assert_eq!(retrieved.id, id);
1878        assert_eq!(retrieved.name, "test-project");
1879        assert_eq!(retrieved.embedding_dimension, 384);
1880        assert!(retrieved.owner_id.is_none());
1881
1882        Box::new(storage).close().unwrap();
1883    }
1884
1885    #[test]
1886    fn test_get_nonexistent_collective_returns_none() {
1887        let dir = tempdir().unwrap();
1888        let path = dir.path().join("test.db");
1889        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1890
1891        let result = storage.get_collective(CollectiveId::new()).unwrap();
1892        assert!(result.is_none());
1893
1894        Box::new(storage).close().unwrap();
1895    }
1896
1897    #[test]
1898    fn test_save_collective_overwrites_existing() {
1899        let dir = tempdir().unwrap();
1900        let path = dir.path().join("test.db");
1901        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1902
1903        let mut collective = Collective::new("original-name", 384);
1904        let id = collective.id;
1905        storage.save_collective(&collective).unwrap();
1906
1907        // Overwrite with updated name
1908        collective.name = "updated-name".to_string();
1909        storage.save_collective(&collective).unwrap();
1910
1911        let retrieved = storage.get_collective(id).unwrap().unwrap();
1912        assert_eq!(retrieved.name, "updated-name");
1913
1914        Box::new(storage).close().unwrap();
1915    }
1916
1917    #[test]
1918    fn test_list_collectives_empty() {
1919        let dir = tempdir().unwrap();
1920        let path = dir.path().join("test.db");
1921        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1922
1923        let collectives = storage.list_collectives().unwrap();
1924        assert!(collectives.is_empty());
1925
1926        Box::new(storage).close().unwrap();
1927    }
1928
1929    #[test]
1930    fn test_list_collectives_returns_all() {
1931        let dir = tempdir().unwrap();
1932        let path = dir.path().join("test.db");
1933        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1934
1935        let c1 = Collective::new("project-alpha", 384);
1936        let c2 = Collective::new("project-beta", 384);
1937        let c3 = Collective::new("project-gamma", 384);
1938
1939        storage.save_collective(&c1).unwrap();
1940        storage.save_collective(&c2).unwrap();
1941        storage.save_collective(&c3).unwrap();
1942
1943        let collectives = storage.list_collectives().unwrap();
1944        assert_eq!(collectives.len(), 3);
1945
1946        // Verify all IDs are present
1947        let ids: Vec<CollectiveId> = collectives.iter().map(|c| c.id).collect();
1948        assert!(ids.contains(&c1.id));
1949        assert!(ids.contains(&c2.id));
1950        assert!(ids.contains(&c3.id));
1951
1952        Box::new(storage).close().unwrap();
1953    }
1954
1955    #[test]
1956    fn test_delete_collective_existing() {
1957        let dir = tempdir().unwrap();
1958        let path = dir.path().join("test.db");
1959        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1960
1961        let collective = Collective::new("to-delete", 384);
1962        let id = collective.id;
1963        storage.save_collective(&collective).unwrap();
1964
1965        // Delete it
1966        let deleted = storage.delete_collective(id).unwrap();
1967        assert!(deleted);
1968
1969        // Verify it's gone
1970        assert!(storage.get_collective(id).unwrap().is_none());
1971
1972        Box::new(storage).close().unwrap();
1973    }
1974
1975    #[test]
1976    fn test_delete_collective_nonexistent() {
1977        let dir = tempdir().unwrap();
1978        let path = dir.path().join("test.db");
1979        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1980
1981        let deleted = storage.delete_collective(CollectiveId::new()).unwrap();
1982        assert!(!deleted);
1983
1984        Box::new(storage).close().unwrap();
1985    }
1986
1987    // ====================================================================
1988    // ACID Guarantee Tests
1989    // ====================================================================
1990
1991    #[test]
1992    fn test_uncommitted_transaction_is_invisible() {
1993        // ATOMICITY: If we don't commit a write transaction, the data
1994        // must not be visible to subsequent reads.
1995        let dir = tempdir().unwrap();
1996        let path = dir.path().join("test.db");
1997        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1998
1999        let collective = Collective::new("phantom", 384);
2000        let id = collective.id;
2001        let bytes = bincode::serialize(&collective).unwrap();
2002
2003        // Open a write transaction, insert data, but DON'T commit -- just drop
2004        {
2005            let write_txn = storage.database().begin_write().unwrap();
2006            {
2007                let mut table = write_txn.open_table(COLLECTIVES_TABLE).unwrap();
2008                table.insert(id.as_bytes(), bytes.as_slice()).unwrap();
2009            }
2010            // write_txn is dropped here without commit() -- rolled back
2011        }
2012
2013        // The collective should NOT be visible
2014        let result = storage.get_collective(id).unwrap();
2015        assert!(result.is_none(), "Uncommitted data must not be visible");
2016
2017        Box::new(storage).close().unwrap();
2018    }
2019
2020    #[test]
2021    fn test_committed_transaction_is_visible() {
2022        // DURABILITY (within session): committed data must be immediately
2023        // visible to subsequent reads.
2024        let dir = tempdir().unwrap();
2025        let path = dir.path().join("test.db");
2026        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2027
2028        let collective = Collective::new("committed", 384);
2029        let id = collective.id;
2030
2031        storage.save_collective(&collective).unwrap();
2032
2033        let result = storage.get_collective(id).unwrap();
2034        assert!(result.is_some(), "Committed data must be visible");
2035
2036        Box::new(storage).close().unwrap();
2037    }
2038
2039    #[test]
2040    fn test_multi_table_atomicity() {
2041        // ATOMICITY: A single transaction writing to multiple tables
2042        // is all-or-nothing. Here we write to both COLLECTIVES and METADATA
2043        // in one transaction and verify both are visible after commit.
2044        let dir = tempdir().unwrap();
2045        let path = dir.path().join("test.db");
2046        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2047
2048        let collective = Collective::new("multi-table", 384);
2049        let id = collective.id;
2050        let collective_bytes = bincode::serialize(&collective).unwrap();
2051
2052        // Write to TWO tables in a single transaction
2053        let write_txn = storage.database().begin_write().unwrap();
2054        {
2055            let mut coll_table = write_txn.open_table(COLLECTIVES_TABLE).unwrap();
2056            coll_table
2057                .insert(id.as_bytes(), collective_bytes.as_slice())
2058                .unwrap();
2059        }
2060        {
2061            let mut meta_table = write_txn.open_table(METADATA_TABLE).unwrap();
2062            meta_table
2063                .insert("test_marker", b"multi_table_test".as_slice())
2064                .unwrap();
2065        }
2066        write_txn.commit().unwrap();
2067
2068        // Verify BOTH writes are visible
2069        let coll = storage.get_collective(id).unwrap();
2070        assert!(coll.is_some(), "Collective from multi-table txn must exist");
2071
2072        let read_txn = storage.database().begin_read().unwrap();
2073        let meta_table = read_txn.open_table(METADATA_TABLE).unwrap();
2074        let marker = meta_table.get("test_marker").unwrap();
2075        assert!(marker.is_some(), "Metadata from multi-table txn must exist");
2076
2077        Box::new(storage).close().unwrap();
2078    }
2079
2080    #[test]
2081    fn test_mvcc_read_consistency() {
2082        // ISOLATION (MVCC): A single read transaction sees a consistent
2083        // snapshot reflecting all committed writes up to the moment the
2084        // read was opened, and none of the uncommitted or subsequent ones.
2085        //
2086        // We write across multiple separate transactions, then verify a
2087        // read sees the expected consistent state. Combined with
2088        // test_uncommitted_transaction_is_invisible (atomicity), this
2089        // covers the key ACID isolation properties.
2090        //
2091        // Note: redb 2.6.3 has a page allocation constraint that prevents
2092        // holding a read transaction open while a write commits on the
2093        // same Database handle. redb guarantees MVCC isolation internally
2094        // via shadow paging; this test verifies our usage is correct.
2095        let dir = tempdir().unwrap();
2096        let path = dir.path().join("test.db");
2097        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2098
2099        // Write 3 collectives across separate transactions
2100        let c1 = Collective::new("alpha", 384);
2101        let c2 = Collective::new("beta", 384);
2102        let c3 = Collective::new("gamma", 384);
2103
2104        storage.save_collective(&c1).unwrap();
2105        storage.save_collective(&c2).unwrap();
2106        storage.save_collective(&c3).unwrap();
2107
2108        // Delete c2 (another transaction)
2109        storage.delete_collective(c2.id).unwrap();
2110
2111        // A read transaction must see the consistent state:
2112        // c1 and c3 present, c2 absent
2113        let read_txn = storage.database().begin_read().unwrap();
2114        let table = read_txn.open_table(COLLECTIVES_TABLE).unwrap();
2115
2116        assert!(
2117            table.get(c1.id.as_bytes()).unwrap().is_some(),
2118            "c1 must be visible (committed)"
2119        );
2120        assert!(
2121            table.get(c2.id.as_bytes()).unwrap().is_none(),
2122            "c2 must be absent (deleted)"
2123        );
2124        assert!(
2125            table.get(c3.id.as_bytes()).unwrap().is_some(),
2126            "c3 must be visible (committed)"
2127        );
2128
2129        // Count should be exactly 2
2130        let count = table.iter().unwrap().count();
2131        // +1 for the metadata entry? No -- COLLECTIVES_TABLE is separate.
2132        assert_eq!(count, 2, "Exactly 2 collectives should exist");
2133
2134        drop(table);
2135        drop(read_txn);
2136
2137        Box::new(storage).close().unwrap();
2138    }
2139
2140    // ====================================================================
2141    // Corruption Detection Tests
2142    // ====================================================================
2143
2144    #[test]
2145    fn test_corruption_detection_invalid_metadata_bytes() {
2146        // Opening a database whose metadata contains garbage bytes
2147        // must return a Corrupted error, not a panic or deserialization UB.
2148        let dir = tempdir().unwrap();
2149        let path = dir.path().join("corrupt.db");
2150
2151        // Create a valid database, then corrupt the metadata
2152        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2153        let write_txn = storage.database().begin_write().unwrap();
2154        {
2155            let mut meta = write_txn.open_table(METADATA_TABLE).unwrap();
2156            meta.insert(METADATA_KEY, b"not-valid-bincode-data".as_slice())
2157                .unwrap();
2158        }
2159        write_txn.commit().unwrap();
2160        Box::new(storage).close().unwrap();
2161
2162        // Reopen must detect the corruption
2163        let result = RedbStorage::open(&path, &default_config());
2164        assert!(result.is_err(), "Corrupted metadata must be rejected");
2165        let err = result.unwrap_err();
2166        match err {
2167            PulseDBError::Storage(StorageError::Corrupted(msg)) => {
2168                assert!(
2169                    msg.contains("Invalid metadata format"),
2170                    "Error should mention invalid format, got: {}",
2171                    msg
2172                );
2173            }
2174            other => panic!("Expected StorageError::Corrupted, got: {:?}", other),
2175        }
2176    }
2177
2178    #[test]
2179    fn test_corruption_detection_missing_metadata_key() {
2180        // If the metadata table exists but the "db_metadata" key is absent,
2181        // open_existing must return a Corrupted error.
2182        let dir = tempdir().unwrap();
2183        let path = dir.path().join("no_key.db");
2184
2185        // Create a valid database, then delete the metadata key
2186        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2187        let write_txn = storage.database().begin_write().unwrap();
2188        {
2189            let mut meta = write_txn.open_table(METADATA_TABLE).unwrap();
2190            meta.remove(METADATA_KEY).unwrap();
2191        }
2192        write_txn.commit().unwrap();
2193        Box::new(storage).close().unwrap();
2194
2195        // Reopen must detect the missing key
2196        let result = RedbStorage::open(&path, &default_config());
2197        assert!(result.is_err(), "Missing metadata key must be rejected");
2198        let err = result.unwrap_err();
2199        match err {
2200            PulseDBError::Storage(StorageError::Corrupted(msg)) => {
2201                assert!(
2202                    msg.contains("Missing database metadata"),
2203                    "Error should mention missing metadata, got: {}",
2204                    msg
2205                );
2206            }
2207            other => panic!("Expected StorageError::Corrupted, got: {:?}", other),
2208        }
2209    }
2210
2211    #[test]
2212    fn test_corruption_detection_missing_metadata_table() {
2213        // If the metadata table doesn't exist at all, open_existing must
2214        // return a Corrupted error. We simulate this by creating a raw
2215        // redb database without our schema tables.
2216        let dir = tempdir().unwrap();
2217        let path = dir.path().join("no_table.db");
2218
2219        // Create a raw redb database with a dummy table (not our schema)
2220        {
2221            let db = ::redb::Database::create(&path).unwrap();
2222            let write_txn = db.begin_write().unwrap();
2223            {
2224                let dummy: ::redb::TableDefinition<&str, &str> =
2225                    ::redb::TableDefinition::new("dummy");
2226                let mut table = write_txn.open_table(dummy).unwrap();
2227                table.insert("key", "value").unwrap();
2228            }
2229            write_txn.commit().unwrap();
2230        }
2231
2232        // Opening this as a PulseDB must detect the missing metadata table
2233        let result = RedbStorage::open(&path, &default_config());
2234        assert!(result.is_err(), "Missing metadata table must be rejected");
2235        let err = result.unwrap_err();
2236        match err {
2237            PulseDBError::Storage(StorageError::Corrupted(msg)) => {
2238                assert!(
2239                    msg.contains("Cannot open metadata table"),
2240                    "Error should mention metadata table, got: {}",
2241                    msg
2242                );
2243            }
2244            other => panic!("Expected StorageError::Corrupted, got: {:?}", other),
2245        }
2246    }
2247
2248    // ====================================================================
2249    // Experience CRUD tests
2250    // ====================================================================
2251
2252    use crate::experience::{Experience, ExperienceType, ExperienceUpdate, Severity};
2253    use crate::types::{AgentId, ExperienceId, Timestamp};
2254
2255    /// Creates a test experience with a given collective_id and embedding dimension.
2256    fn test_experience(collective_id: CollectiveId, dim: usize) -> Experience {
2257        Experience {
2258            id: ExperienceId::new(),
2259            collective_id,
2260            content: "Test experience content".into(),
2261            embedding: vec![0.42; dim],
2262            experience_type: ExperienceType::Fact {
2263                statement: "redb uses shadow paging".into(),
2264                source: "docs".into(),
2265            },
2266            importance: 0.8,
2267            confidence: 0.7,
2268            applications: 0,
2269            domain: vec!["rust".into(), "databases".into()],
2270            related_files: vec!["src/storage/redb.rs".into()],
2271            source_agent: AgentId::new("test-agent"),
2272            source_task: None,
2273            timestamp: Timestamp::now(),
2274            archived: false,
2275        }
2276    }
2277
2278    #[test]
2279    fn test_save_and_get_experience() {
2280        let dir = tempdir().unwrap();
2281        let path = dir.path().join("test.db");
2282        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2283
2284        let collective = Collective::new("test", 384);
2285        storage.save_collective(&collective).unwrap();
2286
2287        let exp = test_experience(collective.id, 384);
2288        let exp_id = exp.id;
2289
2290        storage.save_experience(&exp).unwrap();
2291
2292        let retrieved = storage.get_experience(exp_id).unwrap().unwrap();
2293        assert_eq!(retrieved.id, exp_id);
2294        assert_eq!(retrieved.collective_id, collective.id);
2295        assert_eq!(retrieved.content, "Test experience content");
2296        assert_eq!(retrieved.importance, 0.8);
2297        assert_eq!(retrieved.confidence, 0.7);
2298        assert_eq!(retrieved.applications, 0);
2299        assert_eq!(retrieved.domain, vec!["rust", "databases"]);
2300        assert!(!retrieved.archived);
2301        // Embedding should be reconstituted from EMBEDDINGS_TABLE
2302        assert_eq!(retrieved.embedding.len(), 384);
2303        assert_eq!(retrieved.embedding[0], 0.42);
2304
2305        Box::new(storage).close().unwrap();
2306    }
2307
2308    #[test]
2309    fn test_get_nonexistent_experience_returns_none() {
2310        let dir = tempdir().unwrap();
2311        let path = dir.path().join("test.db");
2312        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2313
2314        let result = storage.get_experience(ExperienceId::new()).unwrap();
2315        assert!(result.is_none());
2316
2317        Box::new(storage).close().unwrap();
2318    }
2319
2320    #[test]
2321    fn test_update_experience_fields() {
2322        let dir = tempdir().unwrap();
2323        let path = dir.path().join("test.db");
2324        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2325
2326        let collective = Collective::new("test", 384);
2327        storage.save_collective(&collective).unwrap();
2328
2329        let exp = test_experience(collective.id, 384);
2330        let exp_id = exp.id;
2331        storage.save_experience(&exp).unwrap();
2332
2333        // Update importance and domain
2334        let update = ExperienceUpdate {
2335            importance: Some(0.95),
2336            domain: Some(vec!["updated-tag".into()]),
2337            ..Default::default()
2338        };
2339        let updated = storage.update_experience(exp_id, &update).unwrap();
2340        assert!(updated);
2341
2342        let retrieved = storage.get_experience(exp_id).unwrap().unwrap();
2343        assert_eq!(retrieved.importance, 0.95);
2344        assert_eq!(retrieved.domain, vec!["updated-tag"]);
2345        // Unchanged fields
2346        assert_eq!(retrieved.confidence, 0.7);
2347
2348        Box::new(storage).close().unwrap();
2349    }
2350
2351    #[test]
2352    fn test_update_nonexistent_experience_returns_false() {
2353        let dir = tempdir().unwrap();
2354        let path = dir.path().join("test.db");
2355        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2356
2357        let update = ExperienceUpdate {
2358            importance: Some(0.5),
2359            ..Default::default()
2360        };
2361        let result = storage
2362            .update_experience(ExperienceId::new(), &update)
2363            .unwrap();
2364        assert!(!result);
2365
2366        Box::new(storage).close().unwrap();
2367    }
2368
2369    #[test]
2370    fn test_delete_experience() {
2371        let dir = tempdir().unwrap();
2372        let path = dir.path().join("test.db");
2373        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2374
2375        let collective = Collective::new("test", 384);
2376        storage.save_collective(&collective).unwrap();
2377
2378        let exp = test_experience(collective.id, 384);
2379        let exp_id = exp.id;
2380        storage.save_experience(&exp).unwrap();
2381
2382        // Verify exists
2383        assert!(storage.get_experience(exp_id).unwrap().is_some());
2384
2385        // Delete
2386        let deleted = storage.delete_experience(exp_id).unwrap();
2387        assert!(deleted);
2388
2389        // Verify gone
2390        assert!(storage.get_experience(exp_id).unwrap().is_none());
2391        assert!(storage.get_embedding(exp_id).unwrap().is_none());
2392
2393        // Verify index cleaned up
2394        assert_eq!(
2395            storage
2396                .count_experiences_in_collective(collective.id)
2397                .unwrap(),
2398            0
2399        );
2400
2401        Box::new(storage).close().unwrap();
2402    }
2403
2404    #[test]
2405    fn test_delete_nonexistent_experience_returns_false() {
2406        let dir = tempdir().unwrap();
2407        let path = dir.path().join("test.db");
2408        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2409
2410        let result = storage.delete_experience(ExperienceId::new()).unwrap();
2411        assert!(!result);
2412
2413        Box::new(storage).close().unwrap();
2414    }
2415
2416    #[test]
2417    fn test_save_and_get_embedding() {
2418        let dir = tempdir().unwrap();
2419        let path = dir.path().join("test.db");
2420        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2421
2422        let id = ExperienceId::new();
2423        let embedding = vec![0.1, 0.2, 0.3, -0.5, 1.0, f32::MIN_POSITIVE];
2424
2425        storage.save_embedding(id, &embedding).unwrap();
2426
2427        let retrieved = storage.get_embedding(id).unwrap().unwrap();
2428        assert_eq!(retrieved, embedding);
2429
2430        Box::new(storage).close().unwrap();
2431    }
2432
2433    #[test]
2434    fn test_experience_by_collective_index() {
2435        let dir = tempdir().unwrap();
2436        let path = dir.path().join("test.db");
2437        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2438
2439        let collective = Collective::new("test", 384);
2440        storage.save_collective(&collective).unwrap();
2441
2442        // Add 3 experiences
2443        for _ in 0..3 {
2444            let exp = test_experience(collective.id, 384);
2445            storage.save_experience(&exp).unwrap();
2446        }
2447
2448        // Count should be 3
2449        assert_eq!(
2450            storage
2451                .count_experiences_in_collective(collective.id)
2452                .unwrap(),
2453            3
2454        );
2455
2456        Box::new(storage).close().unwrap();
2457    }
2458
2459    #[test]
2460    fn test_cascade_delete_includes_experiences() {
2461        let dir = tempdir().unwrap();
2462        let path = dir.path().join("test.db");
2463        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2464
2465        let collective = Collective::new("test", 384);
2466        storage.save_collective(&collective).unwrap();
2467
2468        let exp1 = test_experience(collective.id, 384);
2469        let exp2 = test_experience(collective.id, 384);
2470        let id1 = exp1.id;
2471        let id2 = exp2.id;
2472        storage.save_experience(&exp1).unwrap();
2473        storage.save_experience(&exp2).unwrap();
2474
2475        // Cascade delete
2476        let count = storage
2477            .delete_experiences_by_collective(collective.id)
2478            .unwrap();
2479        assert_eq!(count, 2);
2480
2481        // Verify experiences are gone
2482        assert!(storage.get_experience(id1).unwrap().is_none());
2483        assert!(storage.get_experience(id2).unwrap().is_none());
2484        assert!(storage.get_embedding(id1).unwrap().is_none());
2485        assert!(storage.get_embedding(id2).unwrap().is_none());
2486
2487        Box::new(storage).close().unwrap();
2488    }
2489
2490    #[test]
2491    fn test_update_experience_archived_flag() {
2492        let dir = tempdir().unwrap();
2493        let path = dir.path().join("test.db");
2494        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2495
2496        let collective = Collective::new("test", 384);
2497        storage.save_collective(&collective).unwrap();
2498
2499        let exp = test_experience(collective.id, 384);
2500        let exp_id = exp.id;
2501        storage.save_experience(&exp).unwrap();
2502
2503        // Archive
2504        let update = ExperienceUpdate {
2505            archived: Some(true),
2506            ..Default::default()
2507        };
2508        storage.update_experience(exp_id, &update).unwrap();
2509
2510        let retrieved = storage.get_experience(exp_id).unwrap().unwrap();
2511        assert!(retrieved.archived);
2512
2513        // Unarchive
2514        let update = ExperienceUpdate {
2515            archived: Some(false),
2516            ..Default::default()
2517        };
2518        storage.update_experience(exp_id, &update).unwrap();
2519
2520        let retrieved = storage.get_experience(exp_id).unwrap().unwrap();
2521        assert!(!retrieved.archived);
2522
2523        Box::new(storage).close().unwrap();
2524    }
2525
2526    #[test]
2527    fn test_f32_byte_conversion_roundtrip() {
2528        let original = vec![0.0, 1.0, -1.0, f32::MAX, f32::MIN, std::f32::consts::PI];
2529        let bytes = f32_slice_to_bytes(&original);
2530        assert_eq!(bytes.len(), original.len() * 4);
2531
2532        let restored = bytes_to_f32_vec(&bytes);
2533        assert_eq!(original, restored);
2534    }
2535
2536    #[test]
2537    fn test_experience_with_all_type_variants() {
2538        let dir = tempdir().unwrap();
2539        let path = dir.path().join("test.db");
2540        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2541
2542        let collective = Collective::new("test", 384);
2543        storage.save_collective(&collective).unwrap();
2544
2545        // Save one experience per type variant
2546        let types = vec![
2547            ExperienceType::Difficulty {
2548                description: "test".into(),
2549                severity: Severity::High,
2550            },
2551            ExperienceType::Solution {
2552                problem_ref: None,
2553                approach: "test".into(),
2554                worked: true,
2555            },
2556            ExperienceType::ErrorPattern {
2557                signature: "E0308".into(),
2558                fix: "check types".into(),
2559                prevention: "use clippy".into(),
2560            },
2561            ExperienceType::SuccessPattern {
2562                task_type: "refactor".into(),
2563                approach: "extract method".into(),
2564                quality: 0.9,
2565            },
2566            ExperienceType::UserPreference {
2567                category: "style".into(),
2568                preference: "snake_case".into(),
2569                strength: 1.0,
2570            },
2571            ExperienceType::ArchitecturalDecision {
2572                decision: "use redb".into(),
2573                rationale: "pure Rust".into(),
2574            },
2575            ExperienceType::TechInsight {
2576                technology: "tokio".into(),
2577                insight: "spawn_blocking".into(),
2578            },
2579            ExperienceType::Fact {
2580                statement: "Rust is safe".into(),
2581                source: "docs".into(),
2582            },
2583            ExperienceType::Generic { category: None },
2584        ];
2585
2586        for experience_type in types {
2587            let mut exp = test_experience(collective.id, 384);
2588            exp.experience_type = experience_type;
2589            storage.save_experience(&exp).unwrap();
2590
2591            // Verify roundtrip
2592            let retrieved = storage.get_experience(exp.id).unwrap().unwrap();
2593            assert_eq!(
2594                retrieved.experience_type.type_tag(),
2595                exp.experience_type.type_tag()
2596            );
2597        }
2598
2599        assert_eq!(
2600            storage
2601                .count_experiences_in_collective(collective.id)
2602                .unwrap(),
2603            9
2604        );
2605
2606        Box::new(storage).close().unwrap();
2607    }
2608
2609    #[test]
2610    fn test_reinforce_experience_atomic() {
2611        let dir = tempdir().unwrap();
2612        let path = dir.path().join("test.db");
2613        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2614
2615        let collective = Collective::new("test", 384);
2616        storage.save_collective(&collective).unwrap();
2617
2618        let exp = test_experience(collective.id, 384);
2619        let exp_id = exp.id;
2620        storage.save_experience(&exp).unwrap();
2621
2622        // Reinforce 3 times
2623        assert_eq!(storage.reinforce_experience(exp_id).unwrap(), Some(1));
2624        assert_eq!(storage.reinforce_experience(exp_id).unwrap(), Some(2));
2625        assert_eq!(storage.reinforce_experience(exp_id).unwrap(), Some(3));
2626
2627        // Verify the stored value
2628        let retrieved = storage.get_experience(exp_id).unwrap().unwrap();
2629        assert_eq!(retrieved.applications, 3);
2630
2631        // Verify embedding was NOT re-written (still intact)
2632        let emb = storage.get_embedding(exp_id).unwrap().unwrap();
2633        assert_eq!(emb.len(), 384);
2634
2635        Box::new(storage).close().unwrap();
2636    }
2637
2638    #[test]
2639    fn test_reinforce_experience_nonexistent() {
2640        let dir = tempdir().unwrap();
2641        let path = dir.path().join("test.db");
2642        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2643
2644        let result = storage.reinforce_experience(ExperienceId::new()).unwrap();
2645        assert!(result.is_none());
2646
2647        Box::new(storage).close().unwrap();
2648    }
2649
2650    // ====================================================================
2651    // WAL Sequence Tracking Tests (E4-S02)
2652    // ====================================================================
2653
2654    #[test]
2655    fn test_wal_sequence_starts_at_zero() {
2656        let dir = tempdir().unwrap();
2657        let path = dir.path().join("test.db");
2658        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2659
2660        assert_eq!(storage.get_wal_sequence().unwrap(), 0);
2661
2662        Box::new(storage).close().unwrap();
2663    }
2664
2665    #[test]
2666    fn test_save_experience_increments_wal_sequence() {
2667        let dir = tempdir().unwrap();
2668        let path = dir.path().join("test.db");
2669        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2670
2671        let collective = Collective::new("test", 384);
2672        storage.save_collective(&collective).unwrap();
2673        // save_collective now records WAL event #1
2674        assert_eq!(storage.get_wal_sequence().unwrap(), 1);
2675
2676        let exp1 = test_experience(collective.id, 384);
2677        storage.save_experience(&exp1).unwrap();
2678        assert_eq!(storage.get_wal_sequence().unwrap(), 2);
2679
2680        let exp2 = test_experience(collective.id, 384);
2681        storage.save_experience(&exp2).unwrap();
2682        assert_eq!(storage.get_wal_sequence().unwrap(), 3);
2683
2684        Box::new(storage).close().unwrap();
2685    }
2686
2687    #[test]
2688    fn test_poll_watch_events_returns_correct_events() {
2689        let dir = tempdir().unwrap();
2690        let path = dir.path().join("test.db");
2691        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2692
2693        let collective = Collective::new("test", 384);
2694        storage.save_collective(&collective).unwrap();
2695        // Collective creates WAL event #1
2696
2697        let exp1 = test_experience(collective.id, 384);
2698        let exp2 = test_experience(collective.id, 384);
2699        let exp3 = test_experience(collective.id, 384);
2700        storage.save_experience(&exp1).unwrap();
2701        storage.save_experience(&exp2).unwrap();
2702        storage.save_experience(&exp3).unwrap();
2703
2704        // Poll all events (collective + 3 experiences = 4 total)
2705        let (events, max_seq) = storage.poll_watch_events(0, 100).unwrap();
2706        assert_eq!(events.len(), 4);
2707        assert_eq!(max_seq, 4);
2708        // First event is collective creation, rest are experience creations
2709        assert!(events
2710            .iter()
2711            .all(|e| e.event_type == WatchEventTypeTag::Created));
2712        // Skip collective event (index 0), experience IDs should match
2713        assert_eq!(events[1].entity_id, *exp1.id.as_bytes());
2714        assert_eq!(events[2].entity_id, *exp2.id.as_bytes());
2715        assert_eq!(events[3].entity_id, *exp3.id.as_bytes());
2716
2717        Box::new(storage).close().unwrap();
2718    }
2719
2720    #[test]
2721    fn test_poll_watch_events_since_midpoint() {
2722        let dir = tempdir().unwrap();
2723        let path = dir.path().join("test.db");
2724        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2725
2726        let collective = Collective::new("test", 384);
2727        storage.save_collective(&collective).unwrap();
2728
2729        // Create 5 experiences
2730        for _ in 0..5 {
2731            let exp = test_experience(collective.id, 384);
2732            storage.save_experience(&exp).unwrap();
2733        }
2734
2735        // Collective = seq 1, 5 experiences = seq 2-6. Total = 6.
2736        // Poll from seq 4 — should get 2 events (seq 5 and 6)
2737        let (events, max_seq) = storage.poll_watch_events(4, 100).unwrap();
2738        assert_eq!(events.len(), 2);
2739        assert_eq!(max_seq, 6);
2740
2741        Box::new(storage).close().unwrap();
2742    }
2743
2744    #[test]
2745    fn test_poll_watch_events_empty_when_caught_up() {
2746        let dir = tempdir().unwrap();
2747        let path = dir.path().join("test.db");
2748        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2749
2750        let collective = Collective::new("test", 384);
2751        storage.save_collective(&collective).unwrap();
2752
2753        let exp = test_experience(collective.id, 384);
2754        storage.save_experience(&exp).unwrap();
2755
2756        // Poll everything (collective + experience = 2 events)
2757        let (events, max_seq) = storage.poll_watch_events(0, 100).unwrap();
2758        assert_eq!(events.len(), 2);
2759        assert_eq!(max_seq, 2);
2760
2761        // Poll again from same position — empty
2762        let (events, max_seq) = storage.poll_watch_events(2, 100).unwrap();
2763        assert_eq!(events.len(), 0);
2764        assert_eq!(max_seq, 2); // stays the same
2765
2766        Box::new(storage).close().unwrap();
2767    }
2768
2769    #[test]
2770    fn test_delete_records_watch_event() {
2771        let dir = tempdir().unwrap();
2772        let path = dir.path().join("test.db");
2773        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2774
2775        let collective = Collective::new("test", 384);
2776        storage.save_collective(&collective).unwrap();
2777
2778        let exp = test_experience(collective.id, 384);
2779        storage.save_experience(&exp).unwrap();
2780        storage.delete_experience(exp.id).unwrap();
2781
2782        // Collective(1) + Created(2) + Deleted(3) = 3 events
2783        let (events, max_seq) = storage.poll_watch_events(0, 100).unwrap();
2784        assert_eq!(events.len(), 3);
2785        assert_eq!(max_seq, 3);
2786        assert_eq!(events[0].event_type, WatchEventTypeTag::Created); // collective
2787        assert_eq!(events[1].event_type, WatchEventTypeTag::Created); // experience
2788        assert_eq!(events[2].event_type, WatchEventTypeTag::Deleted); // experience deleted
2789        assert_eq!(events[2].entity_id, *exp.id.as_bytes());
2790
2791        Box::new(storage).close().unwrap();
2792    }
2793
2794    #[test]
2795    fn test_update_records_watch_event() {
2796        let dir = tempdir().unwrap();
2797        let path = dir.path().join("test.db");
2798        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2799
2800        let collective = Collective::new("test", 384);
2801        storage.save_collective(&collective).unwrap();
2802
2803        let exp = test_experience(collective.id, 384);
2804        storage.save_experience(&exp).unwrap();
2805
2806        let update = ExperienceUpdate {
2807            importance: Some(0.99),
2808            ..Default::default()
2809        };
2810        storage.update_experience(exp.id, &update).unwrap();
2811
2812        // Collective(1) + Created(2) + Updated(3) = 3 events
2813        let (events, _) = storage.poll_watch_events(0, 100).unwrap();
2814        assert_eq!(events.len(), 3);
2815        assert_eq!(events[2].event_type, WatchEventTypeTag::Updated);
2816
2817        Box::new(storage).close().unwrap();
2818    }
2819
2820    #[test]
2821    fn test_reinforce_records_watch_event() {
2822        let dir = tempdir().unwrap();
2823        let path = dir.path().join("test.db");
2824        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2825
2826        let collective = Collective::new("test", 384);
2827        storage.save_collective(&collective).unwrap();
2828
2829        let exp = test_experience(collective.id, 384);
2830        storage.save_experience(&exp).unwrap();
2831        storage.reinforce_experience(exp.id).unwrap();
2832
2833        // Collective(1) + Created(2) + Updated(3) = 3 events
2834        let (events, _) = storage.poll_watch_events(0, 100).unwrap();
2835        assert_eq!(events.len(), 3);
2836        assert_eq!(events[1].event_type, WatchEventTypeTag::Created);
2837        assert_eq!(events[2].event_type, WatchEventTypeTag::Updated);
2838
2839        Box::new(storage).close().unwrap();
2840    }
2841
2842    #[test]
2843    fn test_archive_records_archived_event() {
2844        let dir = tempdir().unwrap();
2845        let path = dir.path().join("test.db");
2846        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2847
2848        let collective = Collective::new("test", 384);
2849        storage.save_collective(&collective).unwrap();
2850
2851        let exp = test_experience(collective.id, 384);
2852        storage.save_experience(&exp).unwrap();
2853
2854        let update = ExperienceUpdate {
2855            archived: Some(true),
2856            ..Default::default()
2857        };
2858        storage.update_experience(exp.id, &update).unwrap();
2859
2860        // Collective(1) + Created(2) + Archived(3) = 3 events
2861        let (events, _) = storage.poll_watch_events(0, 100).unwrap();
2862        assert_eq!(events.len(), 3);
2863        assert_eq!(events[2].event_type, WatchEventTypeTag::Archived);
2864
2865        Box::new(storage).close().unwrap();
2866    }
2867
2868    #[test]
2869    fn test_poll_watch_events_batch_limit() {
2870        let dir = tempdir().unwrap();
2871        let path = dir.path().join("test.db");
2872        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2873
2874        let collective = Collective::new("test", 384);
2875        storage.save_collective(&collective).unwrap();
2876
2877        // Create 10 experiences
2878        for _ in 0..10 {
2879            let exp = test_experience(collective.id, 384);
2880            storage.save_experience(&exp).unwrap();
2881        }
2882
2883        // Poll with limit of 3
2884        let (events, max_seq) = storage.poll_watch_events(0, 3).unwrap();
2885        assert_eq!(events.len(), 3);
2886        assert_eq!(max_seq, 3);
2887
2888        // Continue from where we left off
2889        let (events, max_seq) = storage.poll_watch_events(3, 3).unwrap();
2890        assert_eq!(events.len(), 3);
2891        assert_eq!(max_seq, 6);
2892
2893        Box::new(storage).close().unwrap();
2894    }
2895}