Skip to main content

pulsedb/storage/
redb.rs

1//! redb storage engine implementation.
2//!
3//! This module provides the primary storage backend for PulseDB using
4//! [redb](https://docs.rs/redb), a pure Rust embedded key-value store.
5//!
6//! # Features
7//!
8//! - ACID transactions with MVCC
9//! - Single-writer, multiple-reader concurrency
10//! - Automatic crash recovery
11//! - Zero external dependencies (pure Rust)
12//!
13//! # File Layout
14//!
15//! When you open a database at `./pulse.db`, redb creates:
16//! - `./pulse.db` - Main database file
17//! - `./pulse.db.lock` - Lock file for writer coordination (may not be visible)
18
19use std::path::{Path, PathBuf};
20
21use ::redb::{Database, ReadableTable};
22use tracing::{debug, info, instrument, warn};
23
24use crate::activity::Activity;
25use crate::collective::Collective;
26use crate::experience::{Experience, ExperienceUpdate};
27use crate::insight::DerivedInsight;
28use crate::relation::{ExperienceRelation, RelationType};
29use crate::types::{CollectiveId, ExperienceId, InsightId, RelationId, Timestamp};
30
31use super::schema::{
32    decode_collective_from_activity_key, encode_activity_key, encode_type_index_key,
33    DatabaseMetadata, ExperienceTypeTag, WatchEventRecord, WatchEventTypeTag, ACTIVITIES_TABLE,
34    COLLECTIVES_TABLE, EMBEDDINGS_TABLE, EXPERIENCES_BY_COLLECTIVE_TABLE,
35    EXPERIENCES_BY_TYPE_TABLE, EXPERIENCES_TABLE, INSIGHTS_BY_COLLECTIVE_TABLE, INSIGHTS_TABLE,
36    METADATA_TABLE, RELATIONS_BY_SOURCE_TABLE, RELATIONS_BY_TARGET_TABLE, RELATIONS_TABLE,
37    SCHEMA_VERSION, WAL_SEQUENCE_KEY, WATCH_EVENTS_TABLE,
38};
39use super::StorageEngine;
40use crate::config::{Config, EmbeddingDimension};
41use crate::error::{PulseDBError, Result, StorageError, ValidationError};
42
43/// Metadata key in the metadata table.
44const METADATA_KEY: &str = "db_metadata";
45
46/// redb storage engine wrapper.
47///
48/// This struct holds the redb database handle and cached metadata.
49/// It implements [`StorageEngine`] for use with PulseDB.
50///
51/// # Thread Safety
52///
53/// `RedbStorage` is `Send + Sync`. redb handles internal synchronization
54/// using MVCC for readers and exclusive locking for writers.
55#[derive(Debug)]
56pub struct RedbStorage {
57    /// The redb database handle.
58    db: Database,
59
60    /// Cached database metadata.
61    metadata: DatabaseMetadata,
62
63    /// Path to the database file.
64    path: PathBuf,
65}
66
67impl RedbStorage {
68    /// Opens or creates a database at the given path.
69    ///
70    /// If the database doesn't exist, it will be created and initialized
71    /// with the configuration settings. If it exists, the configuration
72    /// will be validated against the stored metadata.
73    ///
74    /// # Arguments
75    ///
76    /// * `path` - Path to the database file
77    /// * `config` - Database configuration
78    ///
79    /// # Errors
80    ///
81    /// Returns an error if:
82    /// - The database file is corrupted
83    /// - The database is locked by another process
84    /// - Schema version doesn't match
85    /// - Embedding dimension doesn't match (for existing databases)
86    ///
87    /// # Example
88    ///
89    /// ```rust
90    /// # fn main() -> pulsedb::Result<()> {
91    /// # let dir = tempfile::tempdir().unwrap();
92    /// use pulsedb::{Config, storage::RedbStorage};
93    ///
94    /// let storage = RedbStorage::open(dir.path().join("test.db"), &Config::default())?;
95    /// # Ok(())
96    /// # }
97    /// ```
98    #[instrument(skip(config), fields(path = %path.as_ref().display()))]
99    pub fn open(path: impl AsRef<Path>, config: &Config) -> Result<Self> {
100        let path = path.as_ref();
101        let db_exists = path.exists();
102
103        debug!(db_exists = db_exists, "Opening storage engine");
104
105        // Create or open the database
106        let db = Self::create_database(path, config)?;
107
108        if db_exists {
109            // Validate existing database
110            Self::open_existing(db, path.to_path_buf(), config)
111        } else {
112            // Initialize new database
113            Self::initialize_new(db, path.to_path_buf(), config)
114        }
115    }
116
117    /// Creates the redb database with appropriate settings.
118    fn create_database(path: &Path, _config: &Config) -> Result<Database> {
119        let builder = Database::builder();
120
121        // Note: redb 2.x doesn't have set_cache_size, it manages memory internally
122        // The cache_size_mb config will be used for future optimizations
123
124        // Note: redb doesn't expose a typed error variant for lock conflicts,
125        // so we detect them via error message string matching. This may need
126        // updating if redb changes its error messages in a future version.
127        let db = builder.create(path).map_err(|e| {
128            if e.to_string().contains("locked") {
129                StorageError::DatabaseLocked
130            } else {
131                StorageError::Redb(e.to_string())
132            }
133        })?;
134
135        debug!("Database file opened successfully");
136        Ok(db)
137    }
138
139    /// Initializes a new database with tables and metadata.
140    #[instrument(skip(db, config), fields(path = %path.display()))]
141    fn initialize_new(db: Database, path: PathBuf, config: &Config) -> Result<Self> {
142        info!("Initializing new database");
143
144        let metadata = DatabaseMetadata::new(config.embedding_dimension);
145
146        // Create all tables and write metadata in a single transaction
147        let write_txn = db.begin_write().map_err(StorageError::from)?;
148
149        {
150            // Create the metadata table and write metadata
151            let mut meta_table = write_txn.open_table(METADATA_TABLE)?;
152            let metadata_bytes = bincode::serialize(&metadata)
153                .map_err(|e| StorageError::serialization(e.to_string()))?;
154            meta_table.insert(METADATA_KEY, metadata_bytes.as_slice())?;
155
156            // Create other tables (they're created on first access)
157            let _ = write_txn.open_table(COLLECTIVES_TABLE)?;
158            let _ = write_txn.open_table(EXPERIENCES_TABLE)?;
159            let _ = write_txn.open_table(EMBEDDINGS_TABLE)?;
160            let _ = write_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
161            let _ = write_txn.open_multimap_table(EXPERIENCES_BY_TYPE_TABLE)?;
162            let _ = write_txn.open_table(RELATIONS_TABLE)?;
163            let _ = write_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
164            let _ = write_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
165            let _ = write_txn.open_table(INSIGHTS_TABLE)?;
166            let _ = write_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
167            let _ = write_txn.open_table(ACTIVITIES_TABLE)?;
168            let _ = write_txn.open_table(WATCH_EVENTS_TABLE)?;
169        }
170
171        write_txn.commit().map_err(StorageError::from)?;
172
173        info!(
174            schema_version = SCHEMA_VERSION,
175            dimension = config.embedding_dimension.size(),
176            "Database initialized"
177        );
178
179        Ok(Self { db, metadata, path })
180    }
181
182    /// Opens and validates an existing database.
183    #[instrument(skip(db, config), fields(path = %path.display()))]
184    fn open_existing(db: Database, path: PathBuf, config: &Config) -> Result<Self> {
185        info!("Opening existing database");
186
187        // Read metadata from the database
188        let read_txn = db.begin_read().map_err(StorageError::from)?;
189
190        let metadata = {
191            let meta_table = read_txn.open_table(METADATA_TABLE).map_err(|e| {
192                StorageError::corrupted(format!("Cannot open metadata table: {}", e))
193            })?;
194
195            let metadata_bytes = meta_table
196                .get(METADATA_KEY)
197                .map_err(StorageError::from)?
198                .ok_or_else(|| StorageError::corrupted("Missing database metadata"))?;
199
200            bincode::deserialize::<DatabaseMetadata>(metadata_bytes.value())
201                .map_err(|e| StorageError::corrupted(format!("Invalid metadata format: {}", e)))?
202        };
203
204        drop(read_txn);
205
206        // Validate schema version
207        if metadata.schema_version != SCHEMA_VERSION {
208            warn!(
209                expected = SCHEMA_VERSION,
210                found = metadata.schema_version,
211                "Schema version mismatch"
212            );
213            return Err(PulseDBError::Storage(StorageError::SchemaVersionMismatch {
214                expected: SCHEMA_VERSION,
215                found: metadata.schema_version,
216            }));
217        }
218
219        // Validate embedding dimension
220        if metadata.embedding_dimension != config.embedding_dimension {
221            warn!(
222                expected = config.embedding_dimension.size(),
223                found = metadata.embedding_dimension.size(),
224                "Embedding dimension mismatch"
225            );
226            return Err(PulseDBError::Validation(
227                ValidationError::DimensionMismatch {
228                    expected: config.embedding_dimension.size(),
229                    got: metadata.embedding_dimension.size(),
230                },
231            ));
232        }
233
234        // Update last_opened_at timestamp
235        let mut metadata = metadata;
236        metadata.touch();
237
238        let write_txn = db.begin_write().map_err(StorageError::from)?;
239        {
240            let mut meta_table = write_txn.open_table(METADATA_TABLE)?;
241            let metadata_bytes = bincode::serialize(&metadata)
242                .map_err(|e| StorageError::serialization(e.to_string()))?;
243            meta_table.insert(METADATA_KEY, metadata_bytes.as_slice())?;
244
245            // Ensure watch_events table exists (migration for pre-E4-S02 databases)
246            let _ = write_txn.open_table(WATCH_EVENTS_TABLE)?;
247        }
248        write_txn.commit().map_err(StorageError::from)?;
249
250        info!(
251            schema_version = metadata.schema_version,
252            dimension = metadata.embedding_dimension.size(),
253            "Database opened successfully"
254        );
255
256        Ok(Self { db, metadata, path })
257    }
258
259    /// Returns a reference to the underlying redb database.
260    ///
261    /// This is for internal use by other PulseDB modules.
262    #[inline]
263    #[allow(dead_code)] // Used by Collective CRUD (E1-S02) and Experience CRUD (E1-S03)
264    pub(crate) fn database(&self) -> &Database {
265        &self.db
266    }
267
268    /// Increments the WAL sequence and records a watch event within an existing write transaction.
269    ///
270    /// This is the core of cross-process change detection. By executing within the caller's
271    /// transaction, the sequence increment and event record are atomic with the data mutation:
272    /// if the transaction commits, both are durable; if it rolls back, neither is visible.
273    ///
274    /// # Arguments
275    ///
276    /// * `write_txn` - The caller's open write transaction
277    /// * `experience_id` - The experience that changed
278    /// * `collective_id` - The collective it belongs to
279    /// * `event_type` - What kind of change occurred
280    /// * `timestamp` - When the change occurred
281    fn increment_wal_and_record(
282        &self,
283        write_txn: &::redb::WriteTransaction,
284        experience_id: ExperienceId,
285        collective_id: CollectiveId,
286        event_type: WatchEventTypeTag,
287        timestamp: Timestamp,
288    ) -> Result<u64> {
289        // Read current sequence (0 if key doesn't exist yet)
290        let mut meta_table = write_txn.open_table(METADATA_TABLE)?;
291        let current_seq = match meta_table.get(WAL_SEQUENCE_KEY)? {
292            Some(entry) => {
293                let bytes: [u8; 8] = entry
294                    .value()
295                    .try_into()
296                    .map_err(|_| StorageError::corrupted("invalid wal_sequence bytes"))?;
297                u64::from_be_bytes(bytes)
298            }
299            None => 0,
300        };
301        let new_seq = current_seq + 1;
302
303        // Write new sequence number
304        let seq_bytes = new_seq.to_be_bytes();
305        meta_table.insert(WAL_SEQUENCE_KEY, seq_bytes.as_slice())?;
306
307        // Record the event
308        let record = WatchEventRecord {
309            experience_id: *experience_id.as_bytes(),
310            collective_id: *collective_id.as_bytes(),
311            event_type,
312            timestamp_ms: timestamp.as_millis(),
313        };
314        let record_bytes =
315            bincode::serialize(&record).map_err(|e| StorageError::serialization(e.to_string()))?;
316
317        let mut events_table = write_txn.open_table(WATCH_EVENTS_TABLE)?;
318        events_table.insert(&seq_bytes, record_bytes.as_slice())?;
319
320        Ok(new_seq)
321    }
322
323    /// Returns the embedding dimension configured for this database.
324    #[inline]
325    pub fn embedding_dimension(&self) -> EmbeddingDimension {
326        self.metadata.embedding_dimension
327    }
328}
329
330impl StorageEngine for RedbStorage {
331    // =========================================================================
332    // Lifecycle
333    // =========================================================================
334
335    fn metadata(&self) -> &DatabaseMetadata {
336        &self.metadata
337    }
338
339    #[instrument(skip(self))]
340    fn close(self: Box<Self>) -> Result<()> {
341        info!("Closing storage engine");
342
343        // redb flushes all data durably on drop. Since `Database::drop` is
344        // infallible, this method currently always returns Ok(()). The Result
345        // return type is retained for API forward-compatibility if a future
346        // storage backend can report flush errors.
347        drop(self.db);
348
349        info!("Storage engine closed");
350        Ok(())
351    }
352
353    fn path(&self) -> Option<&Path> {
354        Some(&self.path)
355    }
356
357    // =========================================================================
358    // Collective Storage Operations
359    // =========================================================================
360
361    fn save_collective(&self, collective: &Collective) -> Result<()> {
362        let bytes = bincode::serialize(collective)
363            .map_err(|e| StorageError::serialization(e.to_string()))?;
364
365        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
366        {
367            let mut table = write_txn.open_table(COLLECTIVES_TABLE)?;
368            table.insert(collective.id.as_bytes(), bytes.as_slice())?;
369        }
370        write_txn.commit().map_err(StorageError::from)?;
371
372        debug!(id = %collective.id, name = %collective.name, "Collective saved");
373        Ok(())
374    }
375
376    fn get_collective(&self, id: CollectiveId) -> Result<Option<Collective>> {
377        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
378        let table = read_txn.open_table(COLLECTIVES_TABLE)?;
379
380        match table.get(id.as_bytes())? {
381            Some(value) => {
382                let collective: Collective = bincode::deserialize(value.value())
383                    .map_err(|e| StorageError::serialization(e.to_string()))?;
384                Ok(Some(collective))
385            }
386            None => Ok(None),
387        }
388    }
389
390    fn list_collectives(&self) -> Result<Vec<Collective>> {
391        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
392        let table = read_txn.open_table(COLLECTIVES_TABLE)?;
393
394        let mut collectives = Vec::new();
395        for result in table.iter()? {
396            let (_, value) = result.map_err(StorageError::from)?;
397            let collective: Collective = bincode::deserialize(value.value())
398                .map_err(|e| StorageError::serialization(e.to_string()))?;
399            collectives.push(collective);
400        }
401
402        Ok(collectives)
403    }
404
405    fn delete_collective(&self, id: CollectiveId) -> Result<bool> {
406        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
407        let existed;
408        {
409            let mut table = write_txn.open_table(COLLECTIVES_TABLE)?;
410            existed = table.remove(id.as_bytes())?.is_some();
411        }
412        write_txn.commit().map_err(StorageError::from)?;
413
414        if existed {
415            debug!(id = %id, "Collective deleted");
416        }
417        Ok(existed)
418    }
419
420    // =========================================================================
421    // Experience Index Operations
422    // =========================================================================
423
424    fn count_experiences_in_collective(&self, id: CollectiveId) -> Result<u64> {
425        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
426        let table = read_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
427
428        let count = table.get(id.as_bytes())?.count() as u64;
429
430        Ok(count)
431    }
432
433    fn delete_experiences_by_collective(&self, id: CollectiveId) -> Result<u64> {
434        // Phase 1: Read — collect experience IDs and relation IDs to delete
435        let (exp_ids, relation_ids): (Vec<[u8; 16]>, Vec<[u8; 16]>) = {
436            let read_txn = self.db.begin_read().map_err(StorageError::from)?;
437            let table = read_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
438
439            let mut ids = Vec::new();
440            for result in table.get(id.as_bytes())? {
441                let value = result.map_err(StorageError::from)?;
442                let entry = value.value();
443                // Entry is [timestamp: 8 bytes][experience_id: 16 bytes]
444                let mut exp_id = [0u8; 16];
445                exp_id.copy_from_slice(&entry[8..24]);
446                ids.push(exp_id);
447            }
448
449            // Collect all relation IDs for these experiences (deduplicated)
450            let mut rel_ids = std::collections::HashSet::new();
451            let source_table = read_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
452            let target_table = read_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
453            for exp_id in &ids {
454                for result in source_table.get(exp_id)? {
455                    let value = result.map_err(StorageError::from)?;
456                    rel_ids.insert(*value.value());
457                }
458                for result in target_table.get(exp_id)? {
459                    let value = result.map_err(StorageError::from)?;
460                    rel_ids.insert(*value.value());
461                }
462            }
463
464            (ids, rel_ids.into_iter().collect())
465        };
466
467        let count = exp_ids.len() as u64;
468        if count == 0 {
469            return Ok(0);
470        }
471
472        // Phase 2: Write — delete from all tables in a single transaction
473        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
474        {
475            // Delete experience records
476            let mut exp_table = write_txn.open_table(EXPERIENCES_TABLE)?;
477            for exp_id in &exp_ids {
478                exp_table.remove(exp_id)?;
479            }
480        }
481        {
482            // Delete embedding vectors
483            let mut emb_table = write_txn.open_table(EMBEDDINGS_TABLE)?;
484            for exp_id in &exp_ids {
485                emb_table.remove(exp_id)?;
486            }
487        }
488        {
489            // Clear the by-collective index for this collective
490            let mut idx_table = write_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
491            idx_table.remove_all(id.as_bytes())?;
492        }
493        {
494            // Clear the by-type index for all type variants of this collective
495            let mut type_table = write_txn.open_multimap_table(EXPERIENCES_BY_TYPE_TABLE)?;
496            for tag in ExperienceTypeTag::all() {
497                let key = encode_type_index_key(id.as_bytes(), *tag);
498                type_table.remove_all(&key)?;
499            }
500        }
501        {
502            // Delete relations and their index entries
503            if !relation_ids.is_empty() {
504                let mut rel_table = write_txn.open_table(RELATIONS_TABLE)?;
505                let mut source_idx = write_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
506                let mut target_idx = write_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
507
508                // Clear relation indexes for all affected experiences
509                for exp_id in &exp_ids {
510                    source_idx.remove_all(exp_id)?;
511                    target_idx.remove_all(exp_id)?;
512                }
513                // Delete the relation records themselves
514                for rel_id in &relation_ids {
515                    rel_table.remove(rel_id)?;
516                }
517
518                debug!(
519                    count = relation_ids.len(),
520                    "Cascade-deleted relations for collective"
521                );
522            }
523        }
524        write_txn.commit().map_err(StorageError::from)?;
525
526        debug!(id = %id, count = count, "Cascade-deleted experiences for collective");
527        Ok(count)
528    }
529
530    fn list_experience_ids_in_collective(&self, id: CollectiveId) -> Result<Vec<ExperienceId>> {
531        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
532        let table = read_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
533
534        let mut ids = Vec::new();
535        for result in table.get(id.as_bytes())? {
536            let value = result.map_err(StorageError::from)?;
537            let entry = value.value();
538            // Entry is [timestamp: 8 bytes][experience_id: 16 bytes]
539            let mut exp_bytes = [0u8; 16];
540            exp_bytes.copy_from_slice(&entry[8..24]);
541            ids.push(ExperienceId::from_bytes(exp_bytes));
542        }
543
544        Ok(ids)
545    }
546
547    fn get_recent_experience_ids(
548        &self,
549        collective_id: CollectiveId,
550        limit: usize,
551    ) -> Result<Vec<(ExperienceId, Timestamp)>> {
552        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
553        let table = read_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
554
555        // Collect all (ExperienceId, Timestamp) pairs for this collective.
556        // Multimap values are sorted ascending by [timestamp_be][exp_id],
557        // so we collect all and then take from the end for newest-first.
558        let mut entries = Vec::new();
559        for result in table.get(collective_id.as_bytes())? {
560            let value = result.map_err(StorageError::from)?;
561            let entry = value.value();
562            // Entry layout: [timestamp_be: 8 bytes][experience_id: 16 bytes]
563            let mut ts_bytes = [0u8; 8];
564            ts_bytes.copy_from_slice(&entry[..8]);
565            let timestamp = Timestamp::from_millis(i64::from_be_bytes(ts_bytes));
566
567            let mut exp_bytes = [0u8; 16];
568            exp_bytes.copy_from_slice(&entry[8..24]);
569            entries.push((ExperienceId::from_bytes(exp_bytes), timestamp));
570        }
571
572        // Take the last `limit` entries (newest) and reverse to get descending order
573        let start = entries.len().saturating_sub(limit);
574        let mut recent = entries.split_off(start);
575        recent.reverse();
576
577        Ok(recent)
578    }
579
580    // =========================================================================
581    // Experience Storage Operations
582    // =========================================================================
583
584    fn save_experience(&self, experience: &Experience) -> Result<()> {
585        // Serialize experience (embedding is #[serde(skip)], excluded automatically)
586        let exp_bytes = bincode::serialize(experience)
587            .map_err(|e| StorageError::serialization(e.to_string()))?;
588
589        // Convert embedding to raw little-endian bytes
590        let emb_bytes = f32_slice_to_bytes(&experience.embedding);
591
592        // Build index keys
593        let type_key = encode_type_index_key(
594            experience.collective_id.as_bytes(),
595            experience.experience_type.type_tag(),
596        );
597
598        // Write to all 4 tables in a single atomic transaction
599        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
600        {
601            // Main experience record
602            let mut exp_table = write_txn.open_table(EXPERIENCES_TABLE)?;
603            exp_table.insert(experience.id.as_bytes(), exp_bytes.as_slice())?;
604        }
605        {
606            // Embedding vector (stored separately for compactness)
607            let mut emb_table = write_txn.open_table(EMBEDDINGS_TABLE)?;
608            emb_table.insert(experience.id.as_bytes(), emb_bytes.as_slice())?;
609        }
610        {
611            // By-collective index: key=collective_id, value=timestamp+experience_id
612            let mut idx_table = write_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
613            // Value is [timestamp_be: 8 bytes][experience_id: 16 bytes] = 24 bytes
614            let mut value = [0u8; 24];
615            value[..8].copy_from_slice(&experience.timestamp.to_be_bytes());
616            value[8..24].copy_from_slice(experience.id.as_bytes());
617            idx_table.insert(experience.collective_id.as_bytes(), &value)?;
618        }
619        {
620            // By-type index: key=collective_id+type_tag, value=experience_id
621            let mut type_table = write_txn.open_multimap_table(EXPERIENCES_BY_TYPE_TABLE)?;
622            type_table.insert(&type_key, experience.id.as_bytes())?;
623        }
624        // Record WAL event for cross-process change detection
625        self.increment_wal_and_record(
626            &write_txn,
627            experience.id,
628            experience.collective_id,
629            WatchEventTypeTag::Created,
630            experience.timestamp,
631        )?;
632        write_txn.commit().map_err(StorageError::from)?;
633
634        debug!(
635            id = %experience.id,
636            collective_id = %experience.collective_id,
637            "Experience saved"
638        );
639        Ok(())
640    }
641
642    fn get_experience(&self, id: ExperienceId) -> Result<Option<Experience>> {
643        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
644
645        // Read main experience record
646        let exp_table = read_txn.open_table(EXPERIENCES_TABLE)?;
647        let exp_entry = match exp_table.get(id.as_bytes())? {
648            Some(v) => v,
649            None => return Ok(None),
650        };
651
652        let mut experience: Experience = bincode::deserialize(exp_entry.value())
653            .map_err(|e| StorageError::serialization(e.to_string()))?;
654
655        // Read embedding from separate table and reconstitute
656        let emb_table = read_txn.open_table(EMBEDDINGS_TABLE)?;
657        if let Some(emb_entry) = emb_table.get(id.as_bytes())? {
658            experience.embedding = bytes_to_f32_vec(emb_entry.value());
659        }
660
661        Ok(Some(experience))
662    }
663
664    fn update_experience(&self, id: ExperienceId, update: &ExperienceUpdate) -> Result<bool> {
665        // Read-modify-write: read the current record, apply updates, write back
666        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
667        let collective_id;
668        let timestamp;
669        let is_archive;
670        {
671            let mut exp_table = write_txn.open_table(EXPERIENCES_TABLE)?;
672
673            let entry = match exp_table.get(id.as_bytes())? {
674                Some(v) => v,
675                None => return Ok(false),
676            };
677
678            let mut experience: Experience = bincode::deserialize(entry.value())
679                .map_err(|e| StorageError::serialization(e.to_string()))?;
680
681            // Drop the borrow on entry before mutating the table
682            drop(entry);
683
684            // Capture metadata for WAL event before applying updates
685            collective_id = experience.collective_id;
686            timestamp = experience.timestamp;
687            is_archive = update.archived == Some(true);
688
689            // Apply updates (only Some fields)
690            if let Some(importance) = update.importance {
691                experience.importance = importance;
692            }
693            if let Some(confidence) = update.confidence {
694                experience.confidence = confidence;
695            }
696            if let Some(ref domain) = update.domain {
697                experience.domain = domain.clone();
698            }
699            if let Some(ref related_files) = update.related_files {
700                experience.related_files = related_files.clone();
701            }
702            if let Some(archived) = update.archived {
703                experience.archived = archived;
704            }
705
706            // Re-serialize and write back
707            let bytes = bincode::serialize(&experience)
708                .map_err(|e| StorageError::serialization(e.to_string()))?;
709            exp_table.insert(id.as_bytes(), bytes.as_slice())?;
710        }
711        // Record WAL event for cross-process change detection
712        let event_type = if is_archive {
713            WatchEventTypeTag::Archived
714        } else {
715            WatchEventTypeTag::Updated
716        };
717        self.increment_wal_and_record(&write_txn, id, collective_id, event_type, timestamp)?;
718        write_txn.commit().map_err(StorageError::from)?;
719
720        debug!(id = %id, "Experience updated");
721        Ok(true)
722    }
723
724    fn delete_experience(&self, id: ExperienceId) -> Result<bool> {
725        // First read the experience to get collective_id, timestamp, and type_tag
726        // (needed for cleaning up secondary indices and WAL event)
727        let (collective_id, timestamp, type_tag) = {
728            let read_txn = self.db.begin_read().map_err(StorageError::from)?;
729            let exp_table = read_txn.open_table(EXPERIENCES_TABLE)?;
730
731            match exp_table.get(id.as_bytes())? {
732                Some(entry) => {
733                    let exp: Experience = bincode::deserialize(entry.value())
734                        .map_err(|e| StorageError::serialization(e.to_string()))?;
735                    (
736                        exp.collective_id,
737                        exp.timestamp,
738                        exp.experience_type.type_tag(),
739                    )
740                }
741                None => return Ok(false),
742            }
743        };
744
745        // Delete from all 4 tables in a single transaction
746        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
747        {
748            let mut exp_table = write_txn.open_table(EXPERIENCES_TABLE)?;
749            exp_table.remove(id.as_bytes())?;
750        }
751        {
752            let mut emb_table = write_txn.open_table(EMBEDDINGS_TABLE)?;
753            emb_table.remove(id.as_bytes())?;
754        }
755        {
756            // Remove specific entry from by-collective multimap
757            let mut idx_table = write_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
758            let mut value = [0u8; 24];
759            value[..8].copy_from_slice(&timestamp.to_be_bytes());
760            value[8..24].copy_from_slice(id.as_bytes());
761            idx_table.remove(collective_id.as_bytes(), &value)?;
762        }
763        {
764            // Remove specific entry from by-type multimap
765            let mut type_table = write_txn.open_multimap_table(EXPERIENCES_BY_TYPE_TABLE)?;
766            let type_key = encode_type_index_key(collective_id.as_bytes(), type_tag);
767            type_table.remove(&type_key, id.as_bytes())?;
768        }
769        // Record WAL event for cross-process change detection
770        self.increment_wal_and_record(
771            &write_txn,
772            id,
773            collective_id,
774            WatchEventTypeTag::Deleted,
775            timestamp,
776        )?;
777        write_txn.commit().map_err(StorageError::from)?;
778
779        debug!(id = %id, "Experience deleted");
780        Ok(true)
781    }
782
783    fn reinforce_experience(&self, id: ExperienceId) -> Result<Option<u32>> {
784        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
785        let (new_count, collective_id, timestamp) = {
786            let mut exp_table = write_txn.open_table(EXPERIENCES_TABLE)?;
787
788            let entry = match exp_table.get(id.as_bytes())? {
789                Some(v) => v,
790                None => return Ok(None),
791            };
792
793            let mut experience: Experience = bincode::deserialize(entry.value())
794                .map_err(|e| StorageError::serialization(e.to_string()))?;
795            drop(entry);
796
797            experience.applications = experience.applications.saturating_add(1);
798            let new_count = experience.applications;
799            let collective_id = experience.collective_id;
800            let timestamp = experience.timestamp;
801
802            let bytes = bincode::serialize(&experience)
803                .map_err(|e| StorageError::serialization(e.to_string()))?;
804            exp_table.insert(id.as_bytes(), bytes.as_slice())?;
805            (new_count, collective_id, timestamp)
806        };
807        // Record WAL event for cross-process change detection
808        self.increment_wal_and_record(
809            &write_txn,
810            id,
811            collective_id,
812            WatchEventTypeTag::Updated,
813            timestamp,
814        )?;
815        write_txn.commit().map_err(StorageError::from)?;
816
817        debug!(id = %id, applications = new_count, "Experience reinforced");
818        Ok(Some(new_count))
819    }
820
821    fn save_embedding(&self, id: ExperienceId, embedding: &[f32]) -> Result<()> {
822        let bytes = f32_slice_to_bytes(embedding);
823
824        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
825        {
826            let mut table = write_txn.open_table(EMBEDDINGS_TABLE)?;
827            table.insert(id.as_bytes(), bytes.as_slice())?;
828        }
829        write_txn.commit().map_err(StorageError::from)?;
830
831        debug!(id = %id, dim = embedding.len(), "Embedding saved");
832        Ok(())
833    }
834
835    fn get_embedding(&self, id: ExperienceId) -> Result<Option<Vec<f32>>> {
836        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
837        let table = read_txn.open_table(EMBEDDINGS_TABLE)?;
838
839        match table.get(id.as_bytes())? {
840            Some(entry) => Ok(Some(bytes_to_f32_vec(entry.value()))),
841            None => Ok(None),
842        }
843    }
844
845    // =========================================================================
846    // Relation Storage Operations (E3-S01)
847    // =========================================================================
848
849    fn save_relation(&self, relation: &ExperienceRelation) -> Result<()> {
850        let bytes =
851            bincode::serialize(relation).map_err(|e| StorageError::serialization(e.to_string()))?;
852
853        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
854        {
855            let mut table = write_txn.open_table(RELATIONS_TABLE)?;
856            table.insert(relation.id.as_bytes(), bytes.as_slice())?;
857        }
858        {
859            let mut table = write_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
860            table.insert(relation.source_id.as_bytes(), relation.id.as_bytes())?;
861        }
862        {
863            let mut table = write_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
864            table.insert(relation.target_id.as_bytes(), relation.id.as_bytes())?;
865        }
866        write_txn.commit().map_err(StorageError::from)?;
867
868        debug!(id = %relation.id, "Relation saved");
869        Ok(())
870    }
871
872    fn get_relation(&self, id: RelationId) -> Result<Option<ExperienceRelation>> {
873        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
874        let table = read_txn.open_table(RELATIONS_TABLE)?;
875
876        match table.get(id.as_bytes())? {
877            Some(value) => {
878                let relation: ExperienceRelation = bincode::deserialize(value.value())
879                    .map_err(|e| StorageError::serialization(e.to_string()))?;
880                Ok(Some(relation))
881            }
882            None => Ok(None),
883        }
884    }
885
886    fn delete_relation(&self, id: RelationId) -> Result<bool> {
887        // Read the relation first to get source/target IDs for index cleanup
888        let (source_id, target_id) = {
889            let read_txn = self.db.begin_read().map_err(StorageError::from)?;
890            let table = read_txn.open_table(RELATIONS_TABLE)?;
891
892            match table.get(id.as_bytes())? {
893                Some(entry) => {
894                    let rel: ExperienceRelation = bincode::deserialize(entry.value())
895                        .map_err(|e| StorageError::serialization(e.to_string()))?;
896                    (rel.source_id, rel.target_id)
897                }
898                None => return Ok(false),
899            }
900        };
901
902        // Delete from all 3 tables atomically
903        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
904        {
905            let mut table = write_txn.open_table(RELATIONS_TABLE)?;
906            table.remove(id.as_bytes())?;
907        }
908        {
909            let mut table = write_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
910            table.remove(source_id.as_bytes(), id.as_bytes())?;
911        }
912        {
913            let mut table = write_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
914            table.remove(target_id.as_bytes(), id.as_bytes())?;
915        }
916        write_txn.commit().map_err(StorageError::from)?;
917
918        debug!(id = %id, "Relation deleted");
919        Ok(true)
920    }
921
922    fn get_relation_ids_by_source(&self, experience_id: ExperienceId) -> Result<Vec<RelationId>> {
923        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
924        let table = read_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
925
926        let mut ids = Vec::new();
927        for result in table.get(experience_id.as_bytes())? {
928            let value = result.map_err(StorageError::from)?;
929            let bytes = value.value();
930            ids.push(RelationId::from_bytes(*bytes));
931        }
932
933        Ok(ids)
934    }
935
936    fn get_relation_ids_by_target(&self, experience_id: ExperienceId) -> Result<Vec<RelationId>> {
937        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
938        let table = read_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
939
940        let mut ids = Vec::new();
941        for result in table.get(experience_id.as_bytes())? {
942            let value = result.map_err(StorageError::from)?;
943            let bytes = value.value();
944            ids.push(RelationId::from_bytes(*bytes));
945        }
946
947        Ok(ids)
948    }
949
950    fn delete_relations_for_experience(&self, experience_id: ExperienceId) -> Result<u64> {
951        // Phase 1: Read — collect all relation IDs from both indexes
952        let relation_ids: Vec<RelationId> = {
953            let read_txn = self.db.begin_read().map_err(StorageError::from)?;
954            let source_table = read_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
955            let target_table = read_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
956
957            let mut ids = std::collections::HashSet::new();
958
959            // Outgoing relations (this experience is source)
960            for result in source_table.get(experience_id.as_bytes())? {
961                let value = result.map_err(StorageError::from)?;
962                ids.insert(RelationId::from_bytes(*value.value()));
963            }
964
965            // Incoming relations (this experience is target)
966            for result in target_table.get(experience_id.as_bytes())? {
967                let value = result.map_err(StorageError::from)?;
968                ids.insert(RelationId::from_bytes(*value.value()));
969            }
970
971            ids.into_iter().collect()
972        };
973
974        let count = relation_ids.len() as u64;
975        if count == 0 {
976            return Ok(0);
977        }
978
979        // Phase 2: Read each relation to get source/target IDs for index cleanup
980        let relations: Vec<ExperienceRelation> = {
981            let read_txn = self.db.begin_read().map_err(StorageError::from)?;
982            let table = read_txn.open_table(RELATIONS_TABLE)?;
983
984            let mut rels = Vec::with_capacity(relation_ids.len());
985            for rel_id in &relation_ids {
986                if let Some(entry) = table.get(rel_id.as_bytes())? {
987                    let rel: ExperienceRelation = bincode::deserialize(entry.value())
988                        .map_err(|e| StorageError::serialization(e.to_string()))?;
989                    rels.push(rel);
990                }
991            }
992            rels
993        };
994
995        // Phase 3: Write — delete from all 3 tables atomically
996        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
997        {
998            let mut rel_table = write_txn.open_table(RELATIONS_TABLE)?;
999            for rel in &relations {
1000                rel_table.remove(rel.id.as_bytes())?;
1001            }
1002        }
1003        {
1004            let mut source_table = write_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
1005            for rel in &relations {
1006                source_table.remove(rel.source_id.as_bytes(), rel.id.as_bytes())?;
1007            }
1008        }
1009        {
1010            let mut target_table = write_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
1011            for rel in &relations {
1012                target_table.remove(rel.target_id.as_bytes(), rel.id.as_bytes())?;
1013            }
1014        }
1015        write_txn.commit().map_err(StorageError::from)?;
1016
1017        debug!(
1018            experience_id = %experience_id,
1019            count = count,
1020            "Cascade-deleted relations for experience"
1021        );
1022        Ok(count)
1023    }
1024
1025    fn relation_exists(
1026        &self,
1027        source_id: ExperienceId,
1028        target_id: ExperienceId,
1029        relation_type: RelationType,
1030    ) -> Result<bool> {
1031        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1032        let index_table = read_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
1033        let rel_table = read_txn.open_table(RELATIONS_TABLE)?;
1034
1035        // Scan all relations for this source and check each
1036        for result in index_table.get(source_id.as_bytes())? {
1037            let value = result.map_err(StorageError::from)?;
1038            let rel_id = RelationId::from_bytes(*value.value());
1039
1040            if let Some(entry) = rel_table.get(rel_id.as_bytes())? {
1041                let rel: ExperienceRelation = bincode::deserialize(entry.value())
1042                    .map_err(|e| StorageError::serialization(e.to_string()))?;
1043                if rel.target_id == target_id && rel.relation_type == relation_type {
1044                    return Ok(true);
1045                }
1046            }
1047        }
1048
1049        Ok(false)
1050    }
1051
1052    // =========================================================================
1053    // Insight Storage Operations (E3-S02)
1054    // =========================================================================
1055
1056    fn save_insight(&self, insight: &DerivedInsight) -> Result<()> {
1057        let bytes =
1058            bincode::serialize(insight).map_err(|e| StorageError::serialization(e.to_string()))?;
1059
1060        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1061        {
1062            let mut table = write_txn.open_table(INSIGHTS_TABLE)?;
1063            table.insert(insight.id.as_bytes(), bytes.as_slice())?;
1064        }
1065        {
1066            let mut table = write_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
1067            table.insert(insight.collective_id.as_bytes(), insight.id.as_bytes())?;
1068        }
1069        write_txn.commit().map_err(StorageError::from)?;
1070
1071        debug!(id = %insight.id, collective_id = %insight.collective_id, "Insight saved");
1072        Ok(())
1073    }
1074
1075    fn get_insight(&self, id: InsightId) -> Result<Option<DerivedInsight>> {
1076        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1077        let table = read_txn.open_table(INSIGHTS_TABLE)?;
1078
1079        match table.get(id.as_bytes())? {
1080            Some(value) => {
1081                let insight: DerivedInsight = bincode::deserialize(value.value())
1082                    .map_err(|e| StorageError::serialization(e.to_string()))?;
1083                Ok(Some(insight))
1084            }
1085            None => Ok(None),
1086        }
1087    }
1088
1089    fn delete_insight(&self, id: InsightId) -> Result<bool> {
1090        // Read the insight first to get collective_id for index cleanup
1091        let collective_id = {
1092            let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1093            let table = read_txn.open_table(INSIGHTS_TABLE)?;
1094
1095            match table.get(id.as_bytes())? {
1096                Some(entry) => {
1097                    let insight: DerivedInsight = bincode::deserialize(entry.value())
1098                        .map_err(|e| StorageError::serialization(e.to_string()))?;
1099                    insight.collective_id
1100                }
1101                None => return Ok(false),
1102            }
1103        };
1104
1105        // Delete from both tables atomically
1106        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1107        {
1108            let mut table = write_txn.open_table(INSIGHTS_TABLE)?;
1109            table.remove(id.as_bytes())?;
1110        }
1111        {
1112            let mut table = write_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
1113            table.remove(collective_id.as_bytes(), id.as_bytes())?;
1114        }
1115        write_txn.commit().map_err(StorageError::from)?;
1116
1117        debug!(id = %id, "Insight deleted");
1118        Ok(true)
1119    }
1120
1121    fn list_insight_ids_in_collective(&self, id: CollectiveId) -> Result<Vec<InsightId>> {
1122        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1123        let table = read_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
1124
1125        let mut ids = Vec::new();
1126        for result in table.get(id.as_bytes())? {
1127            let value = result.map_err(StorageError::from)?;
1128            ids.push(InsightId::from_bytes(*value.value()));
1129        }
1130
1131        Ok(ids)
1132    }
1133
1134    fn delete_insights_by_collective(&self, id: CollectiveId) -> Result<u64> {
1135        // Phase 1: Read — collect insight IDs
1136        let insight_ids: Vec<[u8; 16]> = {
1137            let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1138            let table = read_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
1139
1140            let mut ids = Vec::new();
1141            for result in table.get(id.as_bytes())? {
1142                let value = result.map_err(StorageError::from)?;
1143                ids.push(*value.value());
1144            }
1145            ids
1146        };
1147
1148        let count = insight_ids.len() as u64;
1149        if count == 0 {
1150            return Ok(0);
1151        }
1152
1153        // Phase 2: Write — delete from both tables atomically
1154        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1155        {
1156            let mut table = write_txn.open_table(INSIGHTS_TABLE)?;
1157            for insight_id in &insight_ids {
1158                table.remove(insight_id)?;
1159            }
1160        }
1161        {
1162            let mut table = write_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
1163            table.remove_all(id.as_bytes())?;
1164        }
1165        write_txn.commit().map_err(StorageError::from)?;
1166
1167        debug!(id = %id, count = count, "Cascade-deleted insights for collective");
1168        Ok(count)
1169    }
1170
1171    // =========================================================================
1172    // Activity Storage Operations (E3-S03)
1173    // =========================================================================
1174
1175    fn save_activity(&self, activity: &Activity) -> Result<()> {
1176        let key = encode_activity_key(activity.collective_id.as_bytes(), &activity.agent_id);
1177        let bytes =
1178            bincode::serialize(activity).map_err(|e| StorageError::serialization(e.to_string()))?;
1179
1180        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1181        {
1182            let mut table = write_txn.open_table(ACTIVITIES_TABLE)?;
1183            table.insert(key.as_slice(), bytes.as_slice())?;
1184        }
1185        write_txn.commit().map_err(StorageError::from)?;
1186
1187        debug!(
1188            agent_id = %activity.agent_id,
1189            collective_id = %activity.collective_id,
1190            "Activity saved"
1191        );
1192        Ok(())
1193    }
1194
1195    fn get_activity(
1196        &self,
1197        agent_id: &str,
1198        collective_id: CollectiveId,
1199    ) -> Result<Option<Activity>> {
1200        let key = encode_activity_key(collective_id.as_bytes(), agent_id);
1201
1202        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1203        let table = read_txn.open_table(ACTIVITIES_TABLE)?;
1204
1205        match table.get(key.as_slice())? {
1206            Some(value) => {
1207                let activity: Activity = bincode::deserialize(value.value())
1208                    .map_err(|e| StorageError::serialization(e.to_string()))?;
1209                Ok(Some(activity))
1210            }
1211            None => Ok(None),
1212        }
1213    }
1214
1215    fn delete_activity(&self, agent_id: &str, collective_id: CollectiveId) -> Result<bool> {
1216        let key = encode_activity_key(collective_id.as_bytes(), agent_id);
1217
1218        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1219        let existed = {
1220            let mut table = write_txn.open_table(ACTIVITIES_TABLE)?;
1221            let removed = table.remove(key.as_slice())?;
1222            removed.is_some()
1223        };
1224        write_txn.commit().map_err(StorageError::from)?;
1225
1226        if existed {
1227            debug!(agent_id = %agent_id, collective_id = %collective_id, "Activity deleted");
1228        }
1229        Ok(existed)
1230    }
1231
1232    fn list_activities_in_collective(&self, collective_id: CollectiveId) -> Result<Vec<Activity>> {
1233        let prefix = collective_id.as_bytes();
1234
1235        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1236        let table = read_txn.open_table(ACTIVITIES_TABLE)?;
1237
1238        let mut activities = Vec::new();
1239        for result in table.iter()? {
1240            let (key, value) = result.map_err(StorageError::from)?;
1241            let key_bytes = key.value();
1242
1243            // Check if this key belongs to the requested collective (16-byte prefix)
1244            if key_bytes.len() >= 16 && decode_collective_from_activity_key(key_bytes) == *prefix {
1245                let activity: Activity = bincode::deserialize(value.value())
1246                    .map_err(|e| StorageError::serialization(e.to_string()))?;
1247                activities.push(activity);
1248            }
1249        }
1250
1251        Ok(activities)
1252    }
1253
1254    fn delete_activities_by_collective(&self, collective_id: CollectiveId) -> Result<u64> {
1255        let prefix = collective_id.as_bytes();
1256
1257        // Phase 1: Read — collect matching keys
1258        let keys_to_delete: Vec<Vec<u8>> = {
1259            let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1260            let table = read_txn.open_table(ACTIVITIES_TABLE)?;
1261
1262            let mut keys = Vec::new();
1263            for result in table.iter()? {
1264                let (key, _) = result.map_err(StorageError::from)?;
1265                let key_bytes = key.value();
1266
1267                if key_bytes.len() >= 16
1268                    && decode_collective_from_activity_key(key_bytes) == *prefix
1269                {
1270                    keys.push(key_bytes.to_vec());
1271                }
1272            }
1273            keys
1274        };
1275
1276        let count = keys_to_delete.len() as u64;
1277        if count == 0 {
1278            return Ok(0);
1279        }
1280
1281        // Phase 2: Write — delete all collected keys
1282        let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1283        {
1284            let mut table = write_txn.open_table(ACTIVITIES_TABLE)?;
1285            for key in &keys_to_delete {
1286                table.remove(key.as_slice())?;
1287            }
1288        }
1289        write_txn.commit().map_err(StorageError::from)?;
1290
1291        debug!(
1292            collective_id = %collective_id,
1293            count = count,
1294            "Cascade-deleted activities for collective"
1295        );
1296        Ok(count)
1297    }
1298
1299    // =========================================================================
1300    // Watch Event Operations (E4-S02)
1301    // =========================================================================
1302
1303    fn get_wal_sequence(&self) -> Result<u64> {
1304        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1305        let meta_table = read_txn.open_table(METADATA_TABLE)?;
1306        match meta_table.get(WAL_SEQUENCE_KEY)? {
1307            Some(entry) => {
1308                let bytes: [u8; 8] = entry
1309                    .value()
1310                    .try_into()
1311                    .map_err(|_| StorageError::corrupted("invalid wal_sequence bytes"))?;
1312                Ok(u64::from_be_bytes(bytes))
1313            }
1314            None => Ok(0),
1315        }
1316    }
1317
1318    fn poll_watch_events(
1319        &self,
1320        since_seq: u64,
1321        limit: usize,
1322    ) -> Result<(Vec<WatchEventRecord>, u64)> {
1323        let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1324        let events_table = read_txn.open_table(WATCH_EVENTS_TABLE)?;
1325
1326        let start_key = (since_seq + 1).to_be_bytes();
1327        let end_key = u64::MAX.to_be_bytes();
1328        let mut events = Vec::new();
1329        let mut max_seq = since_seq;
1330
1331        for entry in events_table.range::<&[u8; 8]>(&start_key..=&end_key)? {
1332            let (key, value) = entry.map_err(StorageError::from)?;
1333            let seq = u64::from_be_bytes(*key.value());
1334            let record: WatchEventRecord = bincode::deserialize(value.value())
1335                .map_err(|e| StorageError::serialization(e.to_string()))?;
1336            events.push(record);
1337            max_seq = seq;
1338            if events.len() >= limit {
1339                break;
1340            }
1341        }
1342
1343        Ok((events, max_seq))
1344    }
1345}
1346
1347// ============================================================================
1348// Embedding byte conversion helpers
1349// ============================================================================
1350
1351/// Converts a slice of f32 values to raw little-endian bytes.
1352#[inline]
1353fn f32_slice_to_bytes(data: &[f32]) -> Vec<u8> {
1354    let mut bytes = Vec::with_capacity(data.len() * 4);
1355    for &val in data {
1356        bytes.extend_from_slice(&val.to_le_bytes());
1357    }
1358    bytes
1359}
1360
1361/// Converts raw little-endian bytes back to a Vec<f32>.
1362#[inline]
1363fn bytes_to_f32_vec(data: &[u8]) -> Vec<f32> {
1364    data.chunks_exact(4)
1365        .map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
1366        .collect()
1367}
1368
1369// RedbStorage is auto Send + Sync: Database, DatabaseMetadata, and PathBuf
1370// are all Send + Sync.
1371
1372#[cfg(test)]
1373mod tests {
1374    use super::*;
1375    use tempfile::tempdir;
1376
1377    fn default_config() -> Config {
1378        Config::default()
1379    }
1380
1381    #[test]
1382    fn test_open_creates_new_database() {
1383        let dir = tempdir().unwrap();
1384        let path = dir.path().join("test.db");
1385
1386        assert!(!path.exists());
1387
1388        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1389
1390        assert!(path.exists());
1391        assert_eq!(storage.metadata().schema_version, SCHEMA_VERSION);
1392        assert_eq!(
1393            storage.metadata().embedding_dimension,
1394            EmbeddingDimension::D384
1395        );
1396
1397        Box::new(storage).close().unwrap();
1398    }
1399
1400    #[test]
1401    fn test_open_existing_database() {
1402        let dir = tempdir().unwrap();
1403        let path = dir.path().join("test.db");
1404
1405        // Create database
1406        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1407        let created_at = storage.metadata().created_at;
1408        Box::new(storage).close().unwrap();
1409
1410        // Reopen
1411        std::thread::sleep(std::time::Duration::from_millis(10));
1412        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1413
1414        // created_at should be preserved
1415        assert_eq!(storage.metadata().created_at, created_at);
1416        // last_opened_at should be updated
1417        assert!(storage.metadata().last_opened_at > created_at);
1418
1419        Box::new(storage).close().unwrap();
1420    }
1421
1422    #[test]
1423    fn test_dimension_mismatch_returns_error() {
1424        let dir = tempdir().unwrap();
1425        let path = dir.path().join("test.db");
1426
1427        // Create with D384
1428        let config_384 = Config {
1429            embedding_dimension: EmbeddingDimension::D384,
1430            ..Default::default()
1431        };
1432        let storage = RedbStorage::open(&path, &config_384).unwrap();
1433        Box::new(storage).close().unwrap();
1434
1435        // Try to reopen with D768
1436        let config_768 = Config {
1437            embedding_dimension: EmbeddingDimension::D768,
1438            ..Default::default()
1439        };
1440        let result = RedbStorage::open(&path, &config_768);
1441
1442        assert!(result.is_err());
1443        let err = result.unwrap_err();
1444        assert!(matches!(
1445            err,
1446            PulseDBError::Validation(ValidationError::DimensionMismatch { .. })
1447        ));
1448    }
1449
1450    #[test]
1451    fn test_database_files_created() {
1452        let dir = tempdir().unwrap();
1453        let path = dir.path().join("pulse.db");
1454
1455        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1456
1457        // Main database file should exist
1458        assert!(path.exists());
1459        assert!(storage.path().is_some());
1460        assert_eq!(storage.path().unwrap(), path);
1461
1462        Box::new(storage).close().unwrap();
1463    }
1464
1465    #[test]
1466    fn test_metadata_preserved_across_opens() {
1467        let dir = tempdir().unwrap();
1468        let path = dir.path().join("test.db");
1469
1470        let config = Config {
1471            embedding_dimension: EmbeddingDimension::Custom(512),
1472            ..Default::default()
1473        };
1474
1475        // Create
1476        let storage = RedbStorage::open(&path, &config).unwrap();
1477        assert_eq!(
1478            storage.metadata().embedding_dimension,
1479            EmbeddingDimension::Custom(512)
1480        );
1481        Box::new(storage).close().unwrap();
1482
1483        // Reopen
1484        let storage = RedbStorage::open(&path, &config).unwrap();
1485        assert_eq!(
1486            storage.metadata().embedding_dimension,
1487            EmbeddingDimension::Custom(512)
1488        );
1489        Box::new(storage).close().unwrap();
1490    }
1491
1492    #[test]
1493    fn test_embedding_dimension_accessor() {
1494        let dir = tempdir().unwrap();
1495        let path = dir.path().join("test.db");
1496
1497        let config = Config {
1498            embedding_dimension: EmbeddingDimension::D768,
1499            ..Default::default()
1500        };
1501
1502        let storage = RedbStorage::open(&path, &config).unwrap();
1503        assert_eq!(storage.embedding_dimension(), EmbeddingDimension::D768);
1504
1505        Box::new(storage).close().unwrap();
1506    }
1507
1508    #[test]
1509    fn test_all_six_tables_created() {
1510        let dir = tempdir().unwrap();
1511        let path = dir.path().join("test.db");
1512
1513        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1514
1515        // Verify all 6 tables exist by opening each in a read transaction.
1516        // If any table wasn't created during initialize_new(), this would
1517        // return a TableDoesNotExist error.
1518        let read_txn = storage.database().begin_read().unwrap();
1519
1520        read_txn.open_table(METADATA_TABLE).unwrap();
1521        read_txn.open_table(COLLECTIVES_TABLE).unwrap();
1522        read_txn.open_table(EXPERIENCES_TABLE).unwrap();
1523        read_txn.open_table(EMBEDDINGS_TABLE).unwrap();
1524        read_txn
1525            .open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)
1526            .unwrap();
1527        read_txn
1528            .open_multimap_table(EXPERIENCES_BY_TYPE_TABLE)
1529            .unwrap();
1530
1531        Box::new(storage).close().unwrap();
1532    }
1533
1534    // ====================================================================
1535    // Collective CRUD tests
1536    // ====================================================================
1537
1538    #[test]
1539    fn test_save_and_get_collective() {
1540        let dir = tempdir().unwrap();
1541        let path = dir.path().join("test.db");
1542        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1543
1544        let collective = Collective::new("test-project", 384);
1545        let id = collective.id;
1546
1547        storage.save_collective(&collective).unwrap();
1548
1549        let retrieved = storage.get_collective(id).unwrap().unwrap();
1550        assert_eq!(retrieved.id, id);
1551        assert_eq!(retrieved.name, "test-project");
1552        assert_eq!(retrieved.embedding_dimension, 384);
1553        assert!(retrieved.owner_id.is_none());
1554
1555        Box::new(storage).close().unwrap();
1556    }
1557
1558    #[test]
1559    fn test_get_nonexistent_collective_returns_none() {
1560        let dir = tempdir().unwrap();
1561        let path = dir.path().join("test.db");
1562        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1563
1564        let result = storage.get_collective(CollectiveId::new()).unwrap();
1565        assert!(result.is_none());
1566
1567        Box::new(storage).close().unwrap();
1568    }
1569
1570    #[test]
1571    fn test_save_collective_overwrites_existing() {
1572        let dir = tempdir().unwrap();
1573        let path = dir.path().join("test.db");
1574        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1575
1576        let mut collective = Collective::new("original-name", 384);
1577        let id = collective.id;
1578        storage.save_collective(&collective).unwrap();
1579
1580        // Overwrite with updated name
1581        collective.name = "updated-name".to_string();
1582        storage.save_collective(&collective).unwrap();
1583
1584        let retrieved = storage.get_collective(id).unwrap().unwrap();
1585        assert_eq!(retrieved.name, "updated-name");
1586
1587        Box::new(storage).close().unwrap();
1588    }
1589
1590    #[test]
1591    fn test_list_collectives_empty() {
1592        let dir = tempdir().unwrap();
1593        let path = dir.path().join("test.db");
1594        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1595
1596        let collectives = storage.list_collectives().unwrap();
1597        assert!(collectives.is_empty());
1598
1599        Box::new(storage).close().unwrap();
1600    }
1601
1602    #[test]
1603    fn test_list_collectives_returns_all() {
1604        let dir = tempdir().unwrap();
1605        let path = dir.path().join("test.db");
1606        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1607
1608        let c1 = Collective::new("project-alpha", 384);
1609        let c2 = Collective::new("project-beta", 384);
1610        let c3 = Collective::new("project-gamma", 384);
1611
1612        storage.save_collective(&c1).unwrap();
1613        storage.save_collective(&c2).unwrap();
1614        storage.save_collective(&c3).unwrap();
1615
1616        let collectives = storage.list_collectives().unwrap();
1617        assert_eq!(collectives.len(), 3);
1618
1619        // Verify all IDs are present
1620        let ids: Vec<CollectiveId> = collectives.iter().map(|c| c.id).collect();
1621        assert!(ids.contains(&c1.id));
1622        assert!(ids.contains(&c2.id));
1623        assert!(ids.contains(&c3.id));
1624
1625        Box::new(storage).close().unwrap();
1626    }
1627
1628    #[test]
1629    fn test_delete_collective_existing() {
1630        let dir = tempdir().unwrap();
1631        let path = dir.path().join("test.db");
1632        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1633
1634        let collective = Collective::new("to-delete", 384);
1635        let id = collective.id;
1636        storage.save_collective(&collective).unwrap();
1637
1638        // Delete it
1639        let deleted = storage.delete_collective(id).unwrap();
1640        assert!(deleted);
1641
1642        // Verify it's gone
1643        assert!(storage.get_collective(id).unwrap().is_none());
1644
1645        Box::new(storage).close().unwrap();
1646    }
1647
1648    #[test]
1649    fn test_delete_collective_nonexistent() {
1650        let dir = tempdir().unwrap();
1651        let path = dir.path().join("test.db");
1652        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1653
1654        let deleted = storage.delete_collective(CollectiveId::new()).unwrap();
1655        assert!(!deleted);
1656
1657        Box::new(storage).close().unwrap();
1658    }
1659
1660    // ====================================================================
1661    // ACID Guarantee Tests
1662    // ====================================================================
1663
1664    #[test]
1665    fn test_uncommitted_transaction_is_invisible() {
1666        // ATOMICITY: If we don't commit a write transaction, the data
1667        // must not be visible to subsequent reads.
1668        let dir = tempdir().unwrap();
1669        let path = dir.path().join("test.db");
1670        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1671
1672        let collective = Collective::new("phantom", 384);
1673        let id = collective.id;
1674        let bytes = bincode::serialize(&collective).unwrap();
1675
1676        // Open a write transaction, insert data, but DON'T commit -- just drop
1677        {
1678            let write_txn = storage.database().begin_write().unwrap();
1679            {
1680                let mut table = write_txn.open_table(COLLECTIVES_TABLE).unwrap();
1681                table.insert(id.as_bytes(), bytes.as_slice()).unwrap();
1682            }
1683            // write_txn is dropped here without commit() -- rolled back
1684        }
1685
1686        // The collective should NOT be visible
1687        let result = storage.get_collective(id).unwrap();
1688        assert!(result.is_none(), "Uncommitted data must not be visible");
1689
1690        Box::new(storage).close().unwrap();
1691    }
1692
1693    #[test]
1694    fn test_committed_transaction_is_visible() {
1695        // DURABILITY (within session): committed data must be immediately
1696        // visible to subsequent reads.
1697        let dir = tempdir().unwrap();
1698        let path = dir.path().join("test.db");
1699        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1700
1701        let collective = Collective::new("committed", 384);
1702        let id = collective.id;
1703
1704        storage.save_collective(&collective).unwrap();
1705
1706        let result = storage.get_collective(id).unwrap();
1707        assert!(result.is_some(), "Committed data must be visible");
1708
1709        Box::new(storage).close().unwrap();
1710    }
1711
1712    #[test]
1713    fn test_multi_table_atomicity() {
1714        // ATOMICITY: A single transaction writing to multiple tables
1715        // is all-or-nothing. Here we write to both COLLECTIVES and METADATA
1716        // in one transaction and verify both are visible after commit.
1717        let dir = tempdir().unwrap();
1718        let path = dir.path().join("test.db");
1719        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1720
1721        let collective = Collective::new("multi-table", 384);
1722        let id = collective.id;
1723        let collective_bytes = bincode::serialize(&collective).unwrap();
1724
1725        // Write to TWO tables in a single transaction
1726        let write_txn = storage.database().begin_write().unwrap();
1727        {
1728            let mut coll_table = write_txn.open_table(COLLECTIVES_TABLE).unwrap();
1729            coll_table
1730                .insert(id.as_bytes(), collective_bytes.as_slice())
1731                .unwrap();
1732        }
1733        {
1734            let mut meta_table = write_txn.open_table(METADATA_TABLE).unwrap();
1735            meta_table
1736                .insert("test_marker", b"multi_table_test".as_slice())
1737                .unwrap();
1738        }
1739        write_txn.commit().unwrap();
1740
1741        // Verify BOTH writes are visible
1742        let coll = storage.get_collective(id).unwrap();
1743        assert!(coll.is_some(), "Collective from multi-table txn must exist");
1744
1745        let read_txn = storage.database().begin_read().unwrap();
1746        let meta_table = read_txn.open_table(METADATA_TABLE).unwrap();
1747        let marker = meta_table.get("test_marker").unwrap();
1748        assert!(marker.is_some(), "Metadata from multi-table txn must exist");
1749
1750        Box::new(storage).close().unwrap();
1751    }
1752
1753    #[test]
1754    fn test_mvcc_read_consistency() {
1755        // ISOLATION (MVCC): A single read transaction sees a consistent
1756        // snapshot reflecting all committed writes up to the moment the
1757        // read was opened, and none of the uncommitted or subsequent ones.
1758        //
1759        // We write across multiple separate transactions, then verify a
1760        // read sees the expected consistent state. Combined with
1761        // test_uncommitted_transaction_is_invisible (atomicity), this
1762        // covers the key ACID isolation properties.
1763        //
1764        // Note: redb 2.6.3 has a page allocation constraint that prevents
1765        // holding a read transaction open while a write commits on the
1766        // same Database handle. redb guarantees MVCC isolation internally
1767        // via shadow paging; this test verifies our usage is correct.
1768        let dir = tempdir().unwrap();
1769        let path = dir.path().join("test.db");
1770        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1771
1772        // Write 3 collectives across separate transactions
1773        let c1 = Collective::new("alpha", 384);
1774        let c2 = Collective::new("beta", 384);
1775        let c3 = Collective::new("gamma", 384);
1776
1777        storage.save_collective(&c1).unwrap();
1778        storage.save_collective(&c2).unwrap();
1779        storage.save_collective(&c3).unwrap();
1780
1781        // Delete c2 (another transaction)
1782        storage.delete_collective(c2.id).unwrap();
1783
1784        // A read transaction must see the consistent state:
1785        // c1 and c3 present, c2 absent
1786        let read_txn = storage.database().begin_read().unwrap();
1787        let table = read_txn.open_table(COLLECTIVES_TABLE).unwrap();
1788
1789        assert!(
1790            table.get(c1.id.as_bytes()).unwrap().is_some(),
1791            "c1 must be visible (committed)"
1792        );
1793        assert!(
1794            table.get(c2.id.as_bytes()).unwrap().is_none(),
1795            "c2 must be absent (deleted)"
1796        );
1797        assert!(
1798            table.get(c3.id.as_bytes()).unwrap().is_some(),
1799            "c3 must be visible (committed)"
1800        );
1801
1802        // Count should be exactly 2
1803        let count = table.iter().unwrap().count();
1804        // +1 for the metadata entry? No -- COLLECTIVES_TABLE is separate.
1805        assert_eq!(count, 2, "Exactly 2 collectives should exist");
1806
1807        drop(table);
1808        drop(read_txn);
1809
1810        Box::new(storage).close().unwrap();
1811    }
1812
1813    // ====================================================================
1814    // Corruption Detection Tests
1815    // ====================================================================
1816
1817    #[test]
1818    fn test_corruption_detection_invalid_metadata_bytes() {
1819        // Opening a database whose metadata contains garbage bytes
1820        // must return a Corrupted error, not a panic or deserialization UB.
1821        let dir = tempdir().unwrap();
1822        let path = dir.path().join("corrupt.db");
1823
1824        // Create a valid database, then corrupt the metadata
1825        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1826        let write_txn = storage.database().begin_write().unwrap();
1827        {
1828            let mut meta = write_txn.open_table(METADATA_TABLE).unwrap();
1829            meta.insert(METADATA_KEY, b"not-valid-bincode-data".as_slice())
1830                .unwrap();
1831        }
1832        write_txn.commit().unwrap();
1833        Box::new(storage).close().unwrap();
1834
1835        // Reopen must detect the corruption
1836        let result = RedbStorage::open(&path, &default_config());
1837        assert!(result.is_err(), "Corrupted metadata must be rejected");
1838        let err = result.unwrap_err();
1839        match err {
1840            PulseDBError::Storage(StorageError::Corrupted(msg)) => {
1841                assert!(
1842                    msg.contains("Invalid metadata format"),
1843                    "Error should mention invalid format, got: {}",
1844                    msg
1845                );
1846            }
1847            other => panic!("Expected StorageError::Corrupted, got: {:?}", other),
1848        }
1849    }
1850
1851    #[test]
1852    fn test_corruption_detection_missing_metadata_key() {
1853        // If the metadata table exists but the "db_metadata" key is absent,
1854        // open_existing must return a Corrupted error.
1855        let dir = tempdir().unwrap();
1856        let path = dir.path().join("no_key.db");
1857
1858        // Create a valid database, then delete the metadata key
1859        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1860        let write_txn = storage.database().begin_write().unwrap();
1861        {
1862            let mut meta = write_txn.open_table(METADATA_TABLE).unwrap();
1863            meta.remove(METADATA_KEY).unwrap();
1864        }
1865        write_txn.commit().unwrap();
1866        Box::new(storage).close().unwrap();
1867
1868        // Reopen must detect the missing key
1869        let result = RedbStorage::open(&path, &default_config());
1870        assert!(result.is_err(), "Missing metadata key must be rejected");
1871        let err = result.unwrap_err();
1872        match err {
1873            PulseDBError::Storage(StorageError::Corrupted(msg)) => {
1874                assert!(
1875                    msg.contains("Missing database metadata"),
1876                    "Error should mention missing metadata, got: {}",
1877                    msg
1878                );
1879            }
1880            other => panic!("Expected StorageError::Corrupted, got: {:?}", other),
1881        }
1882    }
1883
1884    #[test]
1885    fn test_corruption_detection_missing_metadata_table() {
1886        // If the metadata table doesn't exist at all, open_existing must
1887        // return a Corrupted error. We simulate this by creating a raw
1888        // redb database without our schema tables.
1889        let dir = tempdir().unwrap();
1890        let path = dir.path().join("no_table.db");
1891
1892        // Create a raw redb database with a dummy table (not our schema)
1893        {
1894            let db = ::redb::Database::create(&path).unwrap();
1895            let write_txn = db.begin_write().unwrap();
1896            {
1897                let dummy: ::redb::TableDefinition<&str, &str> =
1898                    ::redb::TableDefinition::new("dummy");
1899                let mut table = write_txn.open_table(dummy).unwrap();
1900                table.insert("key", "value").unwrap();
1901            }
1902            write_txn.commit().unwrap();
1903        }
1904
1905        // Opening this as a PulseDB must detect the missing metadata table
1906        let result = RedbStorage::open(&path, &default_config());
1907        assert!(result.is_err(), "Missing metadata table must be rejected");
1908        let err = result.unwrap_err();
1909        match err {
1910            PulseDBError::Storage(StorageError::Corrupted(msg)) => {
1911                assert!(
1912                    msg.contains("Cannot open metadata table"),
1913                    "Error should mention metadata table, got: {}",
1914                    msg
1915                );
1916            }
1917            other => panic!("Expected StorageError::Corrupted, got: {:?}", other),
1918        }
1919    }
1920
1921    // ====================================================================
1922    // Experience CRUD tests
1923    // ====================================================================
1924
1925    use crate::experience::{Experience, ExperienceType, ExperienceUpdate, Severity};
1926    use crate::types::{AgentId, ExperienceId, Timestamp};
1927
1928    /// Creates a test experience with a given collective_id and embedding dimension.
1929    fn test_experience(collective_id: CollectiveId, dim: usize) -> Experience {
1930        Experience {
1931            id: ExperienceId::new(),
1932            collective_id,
1933            content: "Test experience content".into(),
1934            embedding: vec![0.42; dim],
1935            experience_type: ExperienceType::Fact {
1936                statement: "redb uses shadow paging".into(),
1937                source: "docs".into(),
1938            },
1939            importance: 0.8,
1940            confidence: 0.7,
1941            applications: 0,
1942            domain: vec!["rust".into(), "databases".into()],
1943            related_files: vec!["src/storage/redb.rs".into()],
1944            source_agent: AgentId::new("test-agent"),
1945            source_task: None,
1946            timestamp: Timestamp::now(),
1947            archived: false,
1948        }
1949    }
1950
1951    #[test]
1952    fn test_save_and_get_experience() {
1953        let dir = tempdir().unwrap();
1954        let path = dir.path().join("test.db");
1955        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1956
1957        let collective = Collective::new("test", 384);
1958        storage.save_collective(&collective).unwrap();
1959
1960        let exp = test_experience(collective.id, 384);
1961        let exp_id = exp.id;
1962
1963        storage.save_experience(&exp).unwrap();
1964
1965        let retrieved = storage.get_experience(exp_id).unwrap().unwrap();
1966        assert_eq!(retrieved.id, exp_id);
1967        assert_eq!(retrieved.collective_id, collective.id);
1968        assert_eq!(retrieved.content, "Test experience content");
1969        assert_eq!(retrieved.importance, 0.8);
1970        assert_eq!(retrieved.confidence, 0.7);
1971        assert_eq!(retrieved.applications, 0);
1972        assert_eq!(retrieved.domain, vec!["rust", "databases"]);
1973        assert!(!retrieved.archived);
1974        // Embedding should be reconstituted from EMBEDDINGS_TABLE
1975        assert_eq!(retrieved.embedding.len(), 384);
1976        assert_eq!(retrieved.embedding[0], 0.42);
1977
1978        Box::new(storage).close().unwrap();
1979    }
1980
1981    #[test]
1982    fn test_get_nonexistent_experience_returns_none() {
1983        let dir = tempdir().unwrap();
1984        let path = dir.path().join("test.db");
1985        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1986
1987        let result = storage.get_experience(ExperienceId::new()).unwrap();
1988        assert!(result.is_none());
1989
1990        Box::new(storage).close().unwrap();
1991    }
1992
1993    #[test]
1994    fn test_update_experience_fields() {
1995        let dir = tempdir().unwrap();
1996        let path = dir.path().join("test.db");
1997        let storage = RedbStorage::open(&path, &default_config()).unwrap();
1998
1999        let collective = Collective::new("test", 384);
2000        storage.save_collective(&collective).unwrap();
2001
2002        let exp = test_experience(collective.id, 384);
2003        let exp_id = exp.id;
2004        storage.save_experience(&exp).unwrap();
2005
2006        // Update importance and domain
2007        let update = ExperienceUpdate {
2008            importance: Some(0.95),
2009            domain: Some(vec!["updated-tag".into()]),
2010            ..Default::default()
2011        };
2012        let updated = storage.update_experience(exp_id, &update).unwrap();
2013        assert!(updated);
2014
2015        let retrieved = storage.get_experience(exp_id).unwrap().unwrap();
2016        assert_eq!(retrieved.importance, 0.95);
2017        assert_eq!(retrieved.domain, vec!["updated-tag"]);
2018        // Unchanged fields
2019        assert_eq!(retrieved.confidence, 0.7);
2020
2021        Box::new(storage).close().unwrap();
2022    }
2023
2024    #[test]
2025    fn test_update_nonexistent_experience_returns_false() {
2026        let dir = tempdir().unwrap();
2027        let path = dir.path().join("test.db");
2028        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2029
2030        let update = ExperienceUpdate {
2031            importance: Some(0.5),
2032            ..Default::default()
2033        };
2034        let result = storage
2035            .update_experience(ExperienceId::new(), &update)
2036            .unwrap();
2037        assert!(!result);
2038
2039        Box::new(storage).close().unwrap();
2040    }
2041
2042    #[test]
2043    fn test_delete_experience() {
2044        let dir = tempdir().unwrap();
2045        let path = dir.path().join("test.db");
2046        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2047
2048        let collective = Collective::new("test", 384);
2049        storage.save_collective(&collective).unwrap();
2050
2051        let exp = test_experience(collective.id, 384);
2052        let exp_id = exp.id;
2053        storage.save_experience(&exp).unwrap();
2054
2055        // Verify exists
2056        assert!(storage.get_experience(exp_id).unwrap().is_some());
2057
2058        // Delete
2059        let deleted = storage.delete_experience(exp_id).unwrap();
2060        assert!(deleted);
2061
2062        // Verify gone
2063        assert!(storage.get_experience(exp_id).unwrap().is_none());
2064        assert!(storage.get_embedding(exp_id).unwrap().is_none());
2065
2066        // Verify index cleaned up
2067        assert_eq!(
2068            storage
2069                .count_experiences_in_collective(collective.id)
2070                .unwrap(),
2071            0
2072        );
2073
2074        Box::new(storage).close().unwrap();
2075    }
2076
2077    #[test]
2078    fn test_delete_nonexistent_experience_returns_false() {
2079        let dir = tempdir().unwrap();
2080        let path = dir.path().join("test.db");
2081        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2082
2083        let result = storage.delete_experience(ExperienceId::new()).unwrap();
2084        assert!(!result);
2085
2086        Box::new(storage).close().unwrap();
2087    }
2088
2089    #[test]
2090    fn test_save_and_get_embedding() {
2091        let dir = tempdir().unwrap();
2092        let path = dir.path().join("test.db");
2093        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2094
2095        let id = ExperienceId::new();
2096        let embedding = vec![0.1, 0.2, 0.3, -0.5, 1.0, f32::MIN_POSITIVE];
2097
2098        storage.save_embedding(id, &embedding).unwrap();
2099
2100        let retrieved = storage.get_embedding(id).unwrap().unwrap();
2101        assert_eq!(retrieved, embedding);
2102
2103        Box::new(storage).close().unwrap();
2104    }
2105
2106    #[test]
2107    fn test_experience_by_collective_index() {
2108        let dir = tempdir().unwrap();
2109        let path = dir.path().join("test.db");
2110        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2111
2112        let collective = Collective::new("test", 384);
2113        storage.save_collective(&collective).unwrap();
2114
2115        // Add 3 experiences
2116        for _ in 0..3 {
2117            let exp = test_experience(collective.id, 384);
2118            storage.save_experience(&exp).unwrap();
2119        }
2120
2121        // Count should be 3
2122        assert_eq!(
2123            storage
2124                .count_experiences_in_collective(collective.id)
2125                .unwrap(),
2126            3
2127        );
2128
2129        Box::new(storage).close().unwrap();
2130    }
2131
2132    #[test]
2133    fn test_cascade_delete_includes_experiences() {
2134        let dir = tempdir().unwrap();
2135        let path = dir.path().join("test.db");
2136        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2137
2138        let collective = Collective::new("test", 384);
2139        storage.save_collective(&collective).unwrap();
2140
2141        let exp1 = test_experience(collective.id, 384);
2142        let exp2 = test_experience(collective.id, 384);
2143        let id1 = exp1.id;
2144        let id2 = exp2.id;
2145        storage.save_experience(&exp1).unwrap();
2146        storage.save_experience(&exp2).unwrap();
2147
2148        // Cascade delete
2149        let count = storage
2150            .delete_experiences_by_collective(collective.id)
2151            .unwrap();
2152        assert_eq!(count, 2);
2153
2154        // Verify experiences are gone
2155        assert!(storage.get_experience(id1).unwrap().is_none());
2156        assert!(storage.get_experience(id2).unwrap().is_none());
2157        assert!(storage.get_embedding(id1).unwrap().is_none());
2158        assert!(storage.get_embedding(id2).unwrap().is_none());
2159
2160        Box::new(storage).close().unwrap();
2161    }
2162
2163    #[test]
2164    fn test_update_experience_archived_flag() {
2165        let dir = tempdir().unwrap();
2166        let path = dir.path().join("test.db");
2167        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2168
2169        let collective = Collective::new("test", 384);
2170        storage.save_collective(&collective).unwrap();
2171
2172        let exp = test_experience(collective.id, 384);
2173        let exp_id = exp.id;
2174        storage.save_experience(&exp).unwrap();
2175
2176        // Archive
2177        let update = ExperienceUpdate {
2178            archived: Some(true),
2179            ..Default::default()
2180        };
2181        storage.update_experience(exp_id, &update).unwrap();
2182
2183        let retrieved = storage.get_experience(exp_id).unwrap().unwrap();
2184        assert!(retrieved.archived);
2185
2186        // Unarchive
2187        let update = ExperienceUpdate {
2188            archived: Some(false),
2189            ..Default::default()
2190        };
2191        storage.update_experience(exp_id, &update).unwrap();
2192
2193        let retrieved = storage.get_experience(exp_id).unwrap().unwrap();
2194        assert!(!retrieved.archived);
2195
2196        Box::new(storage).close().unwrap();
2197    }
2198
2199    #[test]
2200    fn test_f32_byte_conversion_roundtrip() {
2201        let original = vec![0.0, 1.0, -1.0, f32::MAX, f32::MIN, std::f32::consts::PI];
2202        let bytes = f32_slice_to_bytes(&original);
2203        assert_eq!(bytes.len(), original.len() * 4);
2204
2205        let restored = bytes_to_f32_vec(&bytes);
2206        assert_eq!(original, restored);
2207    }
2208
2209    #[test]
2210    fn test_experience_with_all_type_variants() {
2211        let dir = tempdir().unwrap();
2212        let path = dir.path().join("test.db");
2213        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2214
2215        let collective = Collective::new("test", 384);
2216        storage.save_collective(&collective).unwrap();
2217
2218        // Save one experience per type variant
2219        let types = vec![
2220            ExperienceType::Difficulty {
2221                description: "test".into(),
2222                severity: Severity::High,
2223            },
2224            ExperienceType::Solution {
2225                problem_ref: None,
2226                approach: "test".into(),
2227                worked: true,
2228            },
2229            ExperienceType::ErrorPattern {
2230                signature: "E0308".into(),
2231                fix: "check types".into(),
2232                prevention: "use clippy".into(),
2233            },
2234            ExperienceType::SuccessPattern {
2235                task_type: "refactor".into(),
2236                approach: "extract method".into(),
2237                quality: 0.9,
2238            },
2239            ExperienceType::UserPreference {
2240                category: "style".into(),
2241                preference: "snake_case".into(),
2242                strength: 1.0,
2243            },
2244            ExperienceType::ArchitecturalDecision {
2245                decision: "use redb".into(),
2246                rationale: "pure Rust".into(),
2247            },
2248            ExperienceType::TechInsight {
2249                technology: "tokio".into(),
2250                insight: "spawn_blocking".into(),
2251            },
2252            ExperienceType::Fact {
2253                statement: "Rust is safe".into(),
2254                source: "docs".into(),
2255            },
2256            ExperienceType::Generic { category: None },
2257        ];
2258
2259        for experience_type in types {
2260            let mut exp = test_experience(collective.id, 384);
2261            exp.experience_type = experience_type;
2262            storage.save_experience(&exp).unwrap();
2263
2264            // Verify roundtrip
2265            let retrieved = storage.get_experience(exp.id).unwrap().unwrap();
2266            assert_eq!(
2267                retrieved.experience_type.type_tag(),
2268                exp.experience_type.type_tag()
2269            );
2270        }
2271
2272        assert_eq!(
2273            storage
2274                .count_experiences_in_collective(collective.id)
2275                .unwrap(),
2276            9
2277        );
2278
2279        Box::new(storage).close().unwrap();
2280    }
2281
2282    #[test]
2283    fn test_reinforce_experience_atomic() {
2284        let dir = tempdir().unwrap();
2285        let path = dir.path().join("test.db");
2286        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2287
2288        let collective = Collective::new("test", 384);
2289        storage.save_collective(&collective).unwrap();
2290
2291        let exp = test_experience(collective.id, 384);
2292        let exp_id = exp.id;
2293        storage.save_experience(&exp).unwrap();
2294
2295        // Reinforce 3 times
2296        assert_eq!(storage.reinforce_experience(exp_id).unwrap(), Some(1));
2297        assert_eq!(storage.reinforce_experience(exp_id).unwrap(), Some(2));
2298        assert_eq!(storage.reinforce_experience(exp_id).unwrap(), Some(3));
2299
2300        // Verify the stored value
2301        let retrieved = storage.get_experience(exp_id).unwrap().unwrap();
2302        assert_eq!(retrieved.applications, 3);
2303
2304        // Verify embedding was NOT re-written (still intact)
2305        let emb = storage.get_embedding(exp_id).unwrap().unwrap();
2306        assert_eq!(emb.len(), 384);
2307
2308        Box::new(storage).close().unwrap();
2309    }
2310
2311    #[test]
2312    fn test_reinforce_experience_nonexistent() {
2313        let dir = tempdir().unwrap();
2314        let path = dir.path().join("test.db");
2315        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2316
2317        let result = storage.reinforce_experience(ExperienceId::new()).unwrap();
2318        assert!(result.is_none());
2319
2320        Box::new(storage).close().unwrap();
2321    }
2322
2323    // ====================================================================
2324    // WAL Sequence Tracking Tests (E4-S02)
2325    // ====================================================================
2326
2327    #[test]
2328    fn test_wal_sequence_starts_at_zero() {
2329        let dir = tempdir().unwrap();
2330        let path = dir.path().join("test.db");
2331        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2332
2333        assert_eq!(storage.get_wal_sequence().unwrap(), 0);
2334
2335        Box::new(storage).close().unwrap();
2336    }
2337
2338    #[test]
2339    fn test_save_experience_increments_wal_sequence() {
2340        let dir = tempdir().unwrap();
2341        let path = dir.path().join("test.db");
2342        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2343
2344        let collective = Collective::new("test", 384);
2345        storage.save_collective(&collective).unwrap();
2346
2347        let exp1 = test_experience(collective.id, 384);
2348        storage.save_experience(&exp1).unwrap();
2349        assert_eq!(storage.get_wal_sequence().unwrap(), 1);
2350
2351        let exp2 = test_experience(collective.id, 384);
2352        storage.save_experience(&exp2).unwrap();
2353        assert_eq!(storage.get_wal_sequence().unwrap(), 2);
2354
2355        Box::new(storage).close().unwrap();
2356    }
2357
2358    #[test]
2359    fn test_poll_watch_events_returns_correct_events() {
2360        let dir = tempdir().unwrap();
2361        let path = dir.path().join("test.db");
2362        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2363
2364        let collective = Collective::new("test", 384);
2365        storage.save_collective(&collective).unwrap();
2366
2367        let exp1 = test_experience(collective.id, 384);
2368        let exp2 = test_experience(collective.id, 384);
2369        let exp3 = test_experience(collective.id, 384);
2370        storage.save_experience(&exp1).unwrap();
2371        storage.save_experience(&exp2).unwrap();
2372        storage.save_experience(&exp3).unwrap();
2373
2374        let (events, max_seq) = storage.poll_watch_events(0, 100).unwrap();
2375        assert_eq!(events.len(), 3);
2376        assert_eq!(max_seq, 3);
2377        // All should be Created events
2378        assert!(events
2379            .iter()
2380            .all(|e| e.event_type == WatchEventTypeTag::Created));
2381        // IDs should match in order
2382        assert_eq!(events[0].experience_id, *exp1.id.as_bytes());
2383        assert_eq!(events[1].experience_id, *exp2.id.as_bytes());
2384        assert_eq!(events[2].experience_id, *exp3.id.as_bytes());
2385
2386        Box::new(storage).close().unwrap();
2387    }
2388
2389    #[test]
2390    fn test_poll_watch_events_since_midpoint() {
2391        let dir = tempdir().unwrap();
2392        let path = dir.path().join("test.db");
2393        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2394
2395        let collective = Collective::new("test", 384);
2396        storage.save_collective(&collective).unwrap();
2397
2398        // Create 5 experiences
2399        for _ in 0..5 {
2400            let exp = test_experience(collective.id, 384);
2401            storage.save_experience(&exp).unwrap();
2402        }
2403
2404        // Poll from seq 3 — should get 2 events (seq 4 and 5)
2405        let (events, max_seq) = storage.poll_watch_events(3, 100).unwrap();
2406        assert_eq!(events.len(), 2);
2407        assert_eq!(max_seq, 5);
2408
2409        Box::new(storage).close().unwrap();
2410    }
2411
2412    #[test]
2413    fn test_poll_watch_events_empty_when_caught_up() {
2414        let dir = tempdir().unwrap();
2415        let path = dir.path().join("test.db");
2416        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2417
2418        let collective = Collective::new("test", 384);
2419        storage.save_collective(&collective).unwrap();
2420
2421        let exp = test_experience(collective.id, 384);
2422        storage.save_experience(&exp).unwrap();
2423
2424        // Poll everything
2425        let (events, max_seq) = storage.poll_watch_events(0, 100).unwrap();
2426        assert_eq!(events.len(), 1);
2427        assert_eq!(max_seq, 1);
2428
2429        // Poll again from same position — empty
2430        let (events, max_seq) = storage.poll_watch_events(1, 100).unwrap();
2431        assert_eq!(events.len(), 0);
2432        assert_eq!(max_seq, 1); // stays the same
2433
2434        Box::new(storage).close().unwrap();
2435    }
2436
2437    #[test]
2438    fn test_delete_records_watch_event() {
2439        let dir = tempdir().unwrap();
2440        let path = dir.path().join("test.db");
2441        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2442
2443        let collective = Collective::new("test", 384);
2444        storage.save_collective(&collective).unwrap();
2445
2446        let exp = test_experience(collective.id, 384);
2447        storage.save_experience(&exp).unwrap();
2448        storage.delete_experience(exp.id).unwrap();
2449
2450        let (events, max_seq) = storage.poll_watch_events(0, 100).unwrap();
2451        assert_eq!(events.len(), 2);
2452        assert_eq!(max_seq, 2);
2453        assert_eq!(events[0].event_type, WatchEventTypeTag::Created);
2454        assert_eq!(events[1].event_type, WatchEventTypeTag::Deleted);
2455        assert_eq!(events[1].experience_id, *exp.id.as_bytes());
2456
2457        Box::new(storage).close().unwrap();
2458    }
2459
2460    #[test]
2461    fn test_update_records_watch_event() {
2462        let dir = tempdir().unwrap();
2463        let path = dir.path().join("test.db");
2464        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2465
2466        let collective = Collective::new("test", 384);
2467        storage.save_collective(&collective).unwrap();
2468
2469        let exp = test_experience(collective.id, 384);
2470        storage.save_experience(&exp).unwrap();
2471
2472        let update = ExperienceUpdate {
2473            importance: Some(0.99),
2474            ..Default::default()
2475        };
2476        storage.update_experience(exp.id, &update).unwrap();
2477
2478        let (events, _) = storage.poll_watch_events(0, 100).unwrap();
2479        assert_eq!(events.len(), 2);
2480        assert_eq!(events[1].event_type, WatchEventTypeTag::Updated);
2481
2482        Box::new(storage).close().unwrap();
2483    }
2484
2485    #[test]
2486    fn test_reinforce_records_watch_event() {
2487        let dir = tempdir().unwrap();
2488        let path = dir.path().join("test.db");
2489        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2490
2491        let collective = Collective::new("test", 384);
2492        storage.save_collective(&collective).unwrap();
2493
2494        let exp = test_experience(collective.id, 384);
2495        storage.save_experience(&exp).unwrap();
2496        storage.reinforce_experience(exp.id).unwrap();
2497
2498        let (events, _) = storage.poll_watch_events(0, 100).unwrap();
2499        assert_eq!(events.len(), 2);
2500        assert_eq!(events[0].event_type, WatchEventTypeTag::Created);
2501        assert_eq!(events[1].event_type, WatchEventTypeTag::Updated);
2502
2503        Box::new(storage).close().unwrap();
2504    }
2505
2506    #[test]
2507    fn test_archive_records_archived_event() {
2508        let dir = tempdir().unwrap();
2509        let path = dir.path().join("test.db");
2510        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2511
2512        let collective = Collective::new("test", 384);
2513        storage.save_collective(&collective).unwrap();
2514
2515        let exp = test_experience(collective.id, 384);
2516        storage.save_experience(&exp).unwrap();
2517
2518        let update = ExperienceUpdate {
2519            archived: Some(true),
2520            ..Default::default()
2521        };
2522        storage.update_experience(exp.id, &update).unwrap();
2523
2524        let (events, _) = storage.poll_watch_events(0, 100).unwrap();
2525        assert_eq!(events.len(), 2);
2526        assert_eq!(events[1].event_type, WatchEventTypeTag::Archived);
2527
2528        Box::new(storage).close().unwrap();
2529    }
2530
2531    #[test]
2532    fn test_poll_watch_events_batch_limit() {
2533        let dir = tempdir().unwrap();
2534        let path = dir.path().join("test.db");
2535        let storage = RedbStorage::open(&path, &default_config()).unwrap();
2536
2537        let collective = Collective::new("test", 384);
2538        storage.save_collective(&collective).unwrap();
2539
2540        // Create 10 experiences
2541        for _ in 0..10 {
2542            let exp = test_experience(collective.id, 384);
2543            storage.save_experience(&exp).unwrap();
2544        }
2545
2546        // Poll with limit of 3
2547        let (events, max_seq) = storage.poll_watch_events(0, 3).unwrap();
2548        assert_eq!(events.len(), 3);
2549        assert_eq!(max_seq, 3);
2550
2551        // Continue from where we left off
2552        let (events, max_seq) = storage.poll_watch_events(3, 3).unwrap();
2553        assert_eq!(events.len(), 3);
2554        assert_eq!(max_seq, 6);
2555
2556        Box::new(storage).close().unwrap();
2557    }
2558}