1use std::path::{Path, PathBuf};
20
21use ::redb::{Database, ReadableTable};
22use tracing::{debug, info, instrument, warn};
23
24use crate::activity::Activity;
25use crate::collective::Collective;
26use crate::experience::{Experience, ExperienceUpdate};
27use crate::insight::DerivedInsight;
28use crate::relation::{ExperienceRelation, RelationType};
29use crate::types::{CollectiveId, ExperienceId, InsightId, RelationId, Timestamp};
30
31use super::schema::{
32 decode_collective_from_activity_key, encode_activity_key, encode_type_index_key,
33 DatabaseMetadata, EntityTypeTag, ExperienceTypeTag, WatchEventRecord, WatchEventTypeTag,
34 ACTIVITIES_TABLE, COLLECTIVES_TABLE, EMBEDDINGS_TABLE, EXPERIENCES_BY_COLLECTIVE_TABLE,
35 EXPERIENCES_BY_TYPE_TABLE, EXPERIENCES_TABLE, INSIGHTS_BY_COLLECTIVE_TABLE, INSIGHTS_TABLE,
36 METADATA_TABLE, RELATIONS_BY_SOURCE_TABLE, RELATIONS_BY_TARGET_TABLE, RELATIONS_TABLE,
37 SCHEMA_VERSION, WAL_SEQUENCE_KEY, WATCH_EVENTS_TABLE,
38};
39#[cfg(feature = "sync")]
40use super::schema::{INSTANCE_ID_KEY, SYNC_CURSORS_TABLE};
41use super::StorageEngine;
42use crate::config::{Config, EmbeddingDimension};
43use crate::error::{PulseDBError, Result, StorageError, ValidationError};
44
45const METADATA_KEY: &str = "db_metadata";
47
48#[derive(Debug)]
58pub struct RedbStorage {
59 db: Database,
61
62 metadata: DatabaseMetadata,
64
65 path: PathBuf,
67
68 #[cfg(feature = "sync")]
70 instance_id: crate::sync::InstanceId,
71}
72
73impl RedbStorage {
74 #[instrument(skip(config), fields(path = %path.as_ref().display()))]
105 pub fn open(path: impl AsRef<Path>, config: &Config) -> Result<Self> {
106 let path = path.as_ref();
107 let db_exists = path.exists();
108
109 debug!(db_exists = db_exists, "Opening storage engine");
110
111 let db = Self::create_database(path, config)?;
113
114 if db_exists {
115 Self::open_existing(db, path.to_path_buf(), config)
117 } else {
118 Self::initialize_new(db, path.to_path_buf(), config)
120 }
121 }
122
123 fn create_database(path: &Path, _config: &Config) -> Result<Database> {
125 let builder = Database::builder();
126
127 let db = builder.create(path).map_err(|e| {
134 if e.to_string().contains("locked") {
135 StorageError::DatabaseLocked
136 } else {
137 StorageError::Redb(e.to_string())
138 }
139 })?;
140
141 debug!("Database file opened successfully");
142 Ok(db)
143 }
144
145 #[instrument(skip(db, config), fields(path = %path.display()))]
147 fn initialize_new(db: Database, path: PathBuf, config: &Config) -> Result<Self> {
148 info!("Initializing new database");
149
150 let metadata = DatabaseMetadata::new(config.embedding_dimension);
151
152 let write_txn = db.begin_write().map_err(StorageError::from)?;
154
155 {
156 let mut meta_table = write_txn.open_table(METADATA_TABLE)?;
158 let metadata_bytes = bincode::serialize(&metadata)
159 .map_err(|e| StorageError::serialization(e.to_string()))?;
160 meta_table.insert(METADATA_KEY, metadata_bytes.as_slice())?;
161
162 let _ = write_txn.open_table(COLLECTIVES_TABLE)?;
164 let _ = write_txn.open_table(EXPERIENCES_TABLE)?;
165 let _ = write_txn.open_table(EMBEDDINGS_TABLE)?;
166 let _ = write_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
167 let _ = write_txn.open_multimap_table(EXPERIENCES_BY_TYPE_TABLE)?;
168 let _ = write_txn.open_table(RELATIONS_TABLE)?;
169 let _ = write_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
170 let _ = write_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
171 let _ = write_txn.open_table(INSIGHTS_TABLE)?;
172 let _ = write_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
173 let _ = write_txn.open_table(ACTIVITIES_TABLE)?;
174 let _ = write_txn.open_table(WATCH_EVENTS_TABLE)?;
175
176 #[cfg(feature = "sync")]
178 {
179 let instance_id = crate::sync::InstanceId::new();
180 meta_table.insert(INSTANCE_ID_KEY, instance_id.as_bytes().as_slice())?;
181 let _ = write_txn.open_table(SYNC_CURSORS_TABLE)?;
182 }
183 }
184
185 write_txn.commit().map_err(StorageError::from)?;
186
187 #[cfg(feature = "sync")]
189 let instance_id = {
190 let read_txn = db.begin_read().map_err(StorageError::from)?;
191 let meta_table = read_txn.open_table(METADATA_TABLE)?;
192 let entry = meta_table
193 .get(INSTANCE_ID_KEY)?
194 .ok_or_else(|| StorageError::corrupted("Missing instance_id after init"))?;
195 let bytes: [u8; 16] = entry
196 .value()
197 .try_into()
198 .map_err(|_| StorageError::corrupted("invalid instance_id bytes"))?;
199 crate::sync::InstanceId::from_bytes(bytes)
200 };
201
202 info!(
203 schema_version = SCHEMA_VERSION,
204 dimension = config.embedding_dimension.size(),
205 "Database initialized"
206 );
207
208 Ok(Self {
209 db,
210 metadata,
211 path,
212 #[cfg(feature = "sync")]
213 instance_id,
214 })
215 }
216
217 #[instrument(skip(db, config), fields(path = %path.display()))]
219 fn open_existing(db: Database, path: PathBuf, config: &Config) -> Result<Self> {
220 info!("Opening existing database");
221
222 let read_txn = db.begin_read().map_err(StorageError::from)?;
224
225 let metadata = {
226 let meta_table = read_txn.open_table(METADATA_TABLE).map_err(|e| {
227 StorageError::corrupted(format!("Cannot open metadata table: {}", e))
228 })?;
229
230 let metadata_bytes = meta_table
231 .get(METADATA_KEY)
232 .map_err(StorageError::from)?
233 .ok_or_else(|| StorageError::corrupted("Missing database metadata"))?;
234
235 bincode::deserialize::<DatabaseMetadata>(metadata_bytes.value())
236 .map_err(|e| StorageError::corrupted(format!("Invalid metadata format: {}", e)))?
237 };
238
239 drop(read_txn);
240
241 if metadata.schema_version != SCHEMA_VERSION && metadata.schema_version != 1 {
243 warn!(
244 expected = SCHEMA_VERSION,
245 found = metadata.schema_version,
246 "Schema version mismatch"
247 );
248 return Err(PulseDBError::Storage(StorageError::SchemaVersionMismatch {
249 expected: SCHEMA_VERSION,
250 found: metadata.schema_version,
251 }));
252 }
253 let needs_v2_migration = metadata.schema_version == 1;
254
255 if metadata.embedding_dimension != config.embedding_dimension {
257 warn!(
258 expected = config.embedding_dimension.size(),
259 found = metadata.embedding_dimension.size(),
260 "Embedding dimension mismatch"
261 );
262 return Err(PulseDBError::Validation(
263 ValidationError::DimensionMismatch {
264 expected: config.embedding_dimension.size(),
265 got: metadata.embedding_dimension.size(),
266 },
267 ));
268 }
269
270 let mut metadata = metadata;
272 metadata.touch();
273 if needs_v2_migration {
274 metadata.schema_version = SCHEMA_VERSION;
275 }
276
277 let write_txn = db.begin_write().map_err(StorageError::from)?;
278 {
279 let _ = write_txn.open_table(WATCH_EVENTS_TABLE)?;
281
282 if needs_v2_migration {
284 Self::migrate_wal_v1_to_v2(&write_txn)?;
285 info!("Migrated WAL records from schema v1 to v2");
286 }
287
288 let mut meta_table = write_txn.open_table(METADATA_TABLE)?;
289 let metadata_bytes = bincode::serialize(&metadata)
290 .map_err(|e| StorageError::serialization(e.to_string()))?;
291 meta_table.insert(METADATA_KEY, metadata_bytes.as_slice())?;
292
293 #[cfg(feature = "sync")]
295 {
296 if meta_table.get(INSTANCE_ID_KEY)?.is_none() {
298 let instance_id = crate::sync::InstanceId::new();
299 meta_table.insert(INSTANCE_ID_KEY, instance_id.as_bytes().as_slice())?;
300 debug!("Generated new instance_id for existing database");
301 }
302 let _ = write_txn.open_table(SYNC_CURSORS_TABLE)?;
303 }
304 }
305 write_txn.commit().map_err(StorageError::from)?;
306
307 #[cfg(feature = "sync")]
309 let instance_id = {
310 let read_txn = db.begin_read().map_err(StorageError::from)?;
311 let meta_table = read_txn.open_table(METADATA_TABLE)?;
312 let entry = meta_table
313 .get(INSTANCE_ID_KEY)?
314 .ok_or_else(|| StorageError::corrupted("Missing instance_id"))?;
315 let bytes: [u8; 16] = entry
316 .value()
317 .try_into()
318 .map_err(|_| StorageError::corrupted("invalid instance_id bytes"))?;
319 crate::sync::InstanceId::from_bytes(bytes)
320 };
321
322 info!(
323 schema_version = metadata.schema_version,
324 dimension = metadata.embedding_dimension.size(),
325 "Database opened successfully"
326 );
327
328 Ok(Self {
329 db,
330 metadata,
331 path,
332 #[cfg(feature = "sync")]
333 instance_id,
334 })
335 }
336
337 #[inline]
341 #[allow(dead_code)] pub(crate) fn database(&self) -> &Database {
343 &self.db
344 }
345
346 fn increment_wal_and_record(
361 &self,
362 write_txn: &::redb::WriteTransaction,
363 entity_id: &[u8; 16],
364 collective_id: CollectiveId,
365 entity_type: EntityTypeTag,
366 event_type: WatchEventTypeTag,
367 timestamp: Timestamp,
368 ) -> Result<u64> {
369 #[cfg(feature = "sync")]
371 if crate::sync::guard::is_sync_applying() {
372 return Ok(0);
373 }
374
375 let mut meta_table = write_txn.open_table(METADATA_TABLE)?;
377 let current_seq = match meta_table.get(WAL_SEQUENCE_KEY)? {
378 Some(entry) => {
379 let bytes: [u8; 8] = entry
380 .value()
381 .try_into()
382 .map_err(|_| StorageError::corrupted("invalid wal_sequence bytes"))?;
383 u64::from_be_bytes(bytes)
384 }
385 None => 0,
386 };
387 let new_seq = current_seq + 1;
388
389 let seq_bytes = new_seq.to_be_bytes();
391 meta_table.insert(WAL_SEQUENCE_KEY, seq_bytes.as_slice())?;
392
393 let record = WatchEventRecord {
395 entity_id: *entity_id,
396 collective_id: *collective_id.as_bytes(),
397 event_type,
398 timestamp_ms: timestamp.as_millis(),
399 entity_type,
400 };
401 let record_bytes =
402 bincode::serialize(&record).map_err(|e| StorageError::serialization(e.to_string()))?;
403
404 let mut events_table = write_txn.open_table(WATCH_EVENTS_TABLE)?;
405 events_table.insert(&seq_bytes, record_bytes.as_slice())?;
406
407 Ok(new_seq)
408 }
409
410 fn migrate_wal_v1_to_v2(write_txn: &::redb::WriteTransaction) -> Result<()> {
415 use super::schema::WatchEventRecordV1;
416
417 let events_table = write_txn.open_table(WATCH_EVENTS_TABLE)?;
418
419 let mut entries: Vec<([u8; 8], WatchEventRecordV1)> = Vec::new();
421 for entry in events_table.iter()? {
422 let (key, value) = entry.map_err(StorageError::from)?;
423 let seq_bytes: [u8; 8] = *key.value();
424 let v1_record: WatchEventRecordV1 = bincode::deserialize(value.value())
425 .map_err(|e| StorageError::serialization(format!("v1 WAL record: {}", e)))?;
426 entries.push((seq_bytes, v1_record));
427 }
428 drop(events_table);
429
430 let mut events_table = write_txn.open_table(WATCH_EVENTS_TABLE)?;
432 for (seq_bytes, v1) in &entries {
433 let v2 = WatchEventRecord {
434 entity_id: v1.experience_id,
435 collective_id: v1.collective_id,
436 event_type: v1.event_type,
437 timestamp_ms: v1.timestamp_ms,
438 entity_type: EntityTypeTag::Experience,
439 };
440 let v2_bytes =
441 bincode::serialize(&v2).map_err(|e| StorageError::serialization(e.to_string()))?;
442 events_table.insert(seq_bytes, v2_bytes.as_slice())?;
443 }
444
445 debug!(count = entries.len(), "Migrated WAL records to v2");
446 Ok(())
447 }
448
449 #[inline]
451 pub fn embedding_dimension(&self) -> EmbeddingDimension {
452 self.metadata.embedding_dimension
453 }
454}
455
456impl StorageEngine for RedbStorage {
457 fn metadata(&self) -> &DatabaseMetadata {
462 &self.metadata
463 }
464
465 #[instrument(skip(self))]
466 fn close(self: Box<Self>) -> Result<()> {
467 info!("Closing storage engine");
468
469 drop(self.db);
474
475 info!("Storage engine closed");
476 Ok(())
477 }
478
479 fn path(&self) -> Option<&Path> {
480 Some(&self.path)
481 }
482
483 fn save_collective(&self, collective: &Collective) -> Result<()> {
488 let bytes = bincode::serialize(collective)
489 .map_err(|e| StorageError::serialization(e.to_string()))?;
490
491 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
492 {
493 let mut table = write_txn.open_table(COLLECTIVES_TABLE)?;
494 table.insert(collective.id.as_bytes(), bytes.as_slice())?;
495 }
496 self.increment_wal_and_record(
497 &write_txn,
498 collective.id.as_bytes(),
499 collective.id,
500 EntityTypeTag::Collective,
501 WatchEventTypeTag::Created,
502 collective.created_at,
503 )?;
504 write_txn.commit().map_err(StorageError::from)?;
505
506 debug!(id = %collective.id, name = %collective.name, "Collective saved");
507 Ok(())
508 }
509
510 fn get_collective(&self, id: CollectiveId) -> Result<Option<Collective>> {
511 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
512 let table = read_txn.open_table(COLLECTIVES_TABLE)?;
513
514 match table.get(id.as_bytes())? {
515 Some(value) => {
516 let collective: Collective = bincode::deserialize(value.value())
517 .map_err(|e| StorageError::serialization(e.to_string()))?;
518 Ok(Some(collective))
519 }
520 None => Ok(None),
521 }
522 }
523
524 fn list_collectives(&self) -> Result<Vec<Collective>> {
525 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
526 let table = read_txn.open_table(COLLECTIVES_TABLE)?;
527
528 let mut collectives = Vec::new();
529 for result in table.iter()? {
530 let (_, value) = result.map_err(StorageError::from)?;
531 let collective: Collective = bincode::deserialize(value.value())
532 .map_err(|e| StorageError::serialization(e.to_string()))?;
533 collectives.push(collective);
534 }
535
536 Ok(collectives)
537 }
538
539 fn delete_collective(&self, id: CollectiveId) -> Result<bool> {
540 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
541 let existed;
542 {
543 let mut table = write_txn.open_table(COLLECTIVES_TABLE)?;
544 existed = table.remove(id.as_bytes())?.is_some();
545 }
546 write_txn.commit().map_err(StorageError::from)?;
547
548 if existed {
549 debug!(id = %id, "Collective deleted");
550 }
551 Ok(existed)
552 }
553
554 fn count_experiences_in_collective(&self, id: CollectiveId) -> Result<u64> {
559 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
560 let table = read_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
561
562 let count = table.get(id.as_bytes())?.count() as u64;
563
564 Ok(count)
565 }
566
567 fn delete_experiences_by_collective(&self, id: CollectiveId) -> Result<u64> {
568 let (exp_ids, relation_ids): (Vec<[u8; 16]>, Vec<[u8; 16]>) = {
570 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
571 let table = read_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
572
573 let mut ids = Vec::new();
574 for result in table.get(id.as_bytes())? {
575 let value = result.map_err(StorageError::from)?;
576 let entry = value.value();
577 let mut exp_id = [0u8; 16];
579 exp_id.copy_from_slice(&entry[8..24]);
580 ids.push(exp_id);
581 }
582
583 let mut rel_ids = std::collections::HashSet::new();
585 let source_table = read_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
586 let target_table = read_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
587 for exp_id in &ids {
588 for result in source_table.get(exp_id)? {
589 let value = result.map_err(StorageError::from)?;
590 rel_ids.insert(*value.value());
591 }
592 for result in target_table.get(exp_id)? {
593 let value = result.map_err(StorageError::from)?;
594 rel_ids.insert(*value.value());
595 }
596 }
597
598 (ids, rel_ids.into_iter().collect())
599 };
600
601 let count = exp_ids.len() as u64;
602 if count == 0 {
603 return Ok(0);
604 }
605
606 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
608 {
609 let mut exp_table = write_txn.open_table(EXPERIENCES_TABLE)?;
611 for exp_id in &exp_ids {
612 exp_table.remove(exp_id)?;
613 }
614 }
615 {
616 let mut emb_table = write_txn.open_table(EMBEDDINGS_TABLE)?;
618 for exp_id in &exp_ids {
619 emb_table.remove(exp_id)?;
620 }
621 }
622 {
623 let mut idx_table = write_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
625 idx_table.remove_all(id.as_bytes())?;
626 }
627 {
628 let mut type_table = write_txn.open_multimap_table(EXPERIENCES_BY_TYPE_TABLE)?;
630 for tag in ExperienceTypeTag::all() {
631 let key = encode_type_index_key(id.as_bytes(), *tag);
632 type_table.remove_all(&key)?;
633 }
634 }
635 {
636 if !relation_ids.is_empty() {
638 let mut rel_table = write_txn.open_table(RELATIONS_TABLE)?;
639 let mut source_idx = write_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
640 let mut target_idx = write_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
641
642 for exp_id in &exp_ids {
644 source_idx.remove_all(exp_id)?;
645 target_idx.remove_all(exp_id)?;
646 }
647 for rel_id in &relation_ids {
649 rel_table.remove(rel_id)?;
650 }
651
652 debug!(
653 count = relation_ids.len(),
654 "Cascade-deleted relations for collective"
655 );
656 }
657 }
658 write_txn.commit().map_err(StorageError::from)?;
659
660 debug!(id = %id, count = count, "Cascade-deleted experiences for collective");
661 Ok(count)
662 }
663
664 fn list_experience_ids_in_collective(&self, id: CollectiveId) -> Result<Vec<ExperienceId>> {
665 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
666 let table = read_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
667
668 let mut ids = Vec::new();
669 for result in table.get(id.as_bytes())? {
670 let value = result.map_err(StorageError::from)?;
671 let entry = value.value();
672 let mut exp_bytes = [0u8; 16];
674 exp_bytes.copy_from_slice(&entry[8..24]);
675 ids.push(ExperienceId::from_bytes(exp_bytes));
676 }
677
678 Ok(ids)
679 }
680
681 fn get_recent_experience_ids(
682 &self,
683 collective_id: CollectiveId,
684 limit: usize,
685 ) -> Result<Vec<(ExperienceId, Timestamp)>> {
686 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
687 let table = read_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
688
689 let mut entries = Vec::new();
693 for result in table.get(collective_id.as_bytes())? {
694 let value = result.map_err(StorageError::from)?;
695 let entry = value.value();
696 let mut ts_bytes = [0u8; 8];
698 ts_bytes.copy_from_slice(&entry[..8]);
699 let timestamp = Timestamp::from_millis(i64::from_be_bytes(ts_bytes));
700
701 let mut exp_bytes = [0u8; 16];
702 exp_bytes.copy_from_slice(&entry[8..24]);
703 entries.push((ExperienceId::from_bytes(exp_bytes), timestamp));
704 }
705
706 let start = entries.len().saturating_sub(limit);
708 let mut recent = entries.split_off(start);
709 recent.reverse();
710
711 Ok(recent)
712 }
713
714 fn save_experience(&self, experience: &Experience) -> Result<()> {
719 let exp_bytes = bincode::serialize(experience)
721 .map_err(|e| StorageError::serialization(e.to_string()))?;
722
723 let emb_bytes = f32_slice_to_bytes(&experience.embedding);
725
726 let type_key = encode_type_index_key(
728 experience.collective_id.as_bytes(),
729 experience.experience_type.type_tag(),
730 );
731
732 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
734 {
735 let mut exp_table = write_txn.open_table(EXPERIENCES_TABLE)?;
737 exp_table.insert(experience.id.as_bytes(), exp_bytes.as_slice())?;
738 }
739 {
740 let mut emb_table = write_txn.open_table(EMBEDDINGS_TABLE)?;
742 emb_table.insert(experience.id.as_bytes(), emb_bytes.as_slice())?;
743 }
744 {
745 let mut idx_table = write_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
747 let mut value = [0u8; 24];
749 value[..8].copy_from_slice(&experience.timestamp.to_be_bytes());
750 value[8..24].copy_from_slice(experience.id.as_bytes());
751 idx_table.insert(experience.collective_id.as_bytes(), &value)?;
752 }
753 {
754 let mut type_table = write_txn.open_multimap_table(EXPERIENCES_BY_TYPE_TABLE)?;
756 type_table.insert(&type_key, experience.id.as_bytes())?;
757 }
758 self.increment_wal_and_record(
760 &write_txn,
761 experience.id.as_bytes(),
762 experience.collective_id,
763 EntityTypeTag::Experience,
764 WatchEventTypeTag::Created,
765 experience.timestamp,
766 )?;
767 write_txn.commit().map_err(StorageError::from)?;
768
769 debug!(
770 id = %experience.id,
771 collective_id = %experience.collective_id,
772 "Experience saved"
773 );
774 Ok(())
775 }
776
777 fn get_experience(&self, id: ExperienceId) -> Result<Option<Experience>> {
778 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
779
780 let exp_table = read_txn.open_table(EXPERIENCES_TABLE)?;
782 let exp_entry = match exp_table.get(id.as_bytes())? {
783 Some(v) => v,
784 None => return Ok(None),
785 };
786
787 let mut experience: Experience = bincode::deserialize(exp_entry.value())
788 .map_err(|e| StorageError::serialization(e.to_string()))?;
789
790 let emb_table = read_txn.open_table(EMBEDDINGS_TABLE)?;
792 if let Some(emb_entry) = emb_table.get(id.as_bytes())? {
793 experience.embedding = bytes_to_f32_vec(emb_entry.value());
794 }
795
796 Ok(Some(experience))
797 }
798
799 fn update_experience(&self, id: ExperienceId, update: &ExperienceUpdate) -> Result<bool> {
800 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
802 let collective_id;
803 let timestamp;
804 let is_archive;
805 {
806 let mut exp_table = write_txn.open_table(EXPERIENCES_TABLE)?;
807
808 let entry = match exp_table.get(id.as_bytes())? {
809 Some(v) => v,
810 None => return Ok(false),
811 };
812
813 let mut experience: Experience = bincode::deserialize(entry.value())
814 .map_err(|e| StorageError::serialization(e.to_string()))?;
815
816 drop(entry);
818
819 collective_id = experience.collective_id;
821 timestamp = experience.timestamp;
822 is_archive = update.archived == Some(true);
823
824 if let Some(importance) = update.importance {
826 experience.importance = importance;
827 }
828 if let Some(confidence) = update.confidence {
829 experience.confidence = confidence;
830 }
831 if let Some(ref domain) = update.domain {
832 experience.domain = domain.clone();
833 }
834 if let Some(ref related_files) = update.related_files {
835 experience.related_files = related_files.clone();
836 }
837 if let Some(archived) = update.archived {
838 experience.archived = archived;
839 }
840
841 let bytes = bincode::serialize(&experience)
843 .map_err(|e| StorageError::serialization(e.to_string()))?;
844 exp_table.insert(id.as_bytes(), bytes.as_slice())?;
845 }
846 let event_type = if is_archive {
848 WatchEventTypeTag::Archived
849 } else {
850 WatchEventTypeTag::Updated
851 };
852 self.increment_wal_and_record(
853 &write_txn,
854 id.as_bytes(),
855 collective_id,
856 EntityTypeTag::Experience,
857 event_type,
858 timestamp,
859 )?;
860 write_txn.commit().map_err(StorageError::from)?;
861
862 debug!(id = %id, "Experience updated");
863 Ok(true)
864 }
865
866 fn delete_experience(&self, id: ExperienceId) -> Result<bool> {
867 let (collective_id, timestamp, type_tag) = {
870 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
871 let exp_table = read_txn.open_table(EXPERIENCES_TABLE)?;
872
873 match exp_table.get(id.as_bytes())? {
874 Some(entry) => {
875 let exp: Experience = bincode::deserialize(entry.value())
876 .map_err(|e| StorageError::serialization(e.to_string()))?;
877 (
878 exp.collective_id,
879 exp.timestamp,
880 exp.experience_type.type_tag(),
881 )
882 }
883 None => return Ok(false),
884 }
885 };
886
887 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
889 {
890 let mut exp_table = write_txn.open_table(EXPERIENCES_TABLE)?;
891 exp_table.remove(id.as_bytes())?;
892 }
893 {
894 let mut emb_table = write_txn.open_table(EMBEDDINGS_TABLE)?;
895 emb_table.remove(id.as_bytes())?;
896 }
897 {
898 let mut idx_table = write_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
900 let mut value = [0u8; 24];
901 value[..8].copy_from_slice(×tamp.to_be_bytes());
902 value[8..24].copy_from_slice(id.as_bytes());
903 idx_table.remove(collective_id.as_bytes(), &value)?;
904 }
905 {
906 let mut type_table = write_txn.open_multimap_table(EXPERIENCES_BY_TYPE_TABLE)?;
908 let type_key = encode_type_index_key(collective_id.as_bytes(), type_tag);
909 type_table.remove(&type_key, id.as_bytes())?;
910 }
911 self.increment_wal_and_record(
913 &write_txn,
914 id.as_bytes(),
915 collective_id,
916 EntityTypeTag::Experience,
917 WatchEventTypeTag::Deleted,
918 timestamp,
919 )?;
920 write_txn.commit().map_err(StorageError::from)?;
921
922 debug!(id = %id, "Experience deleted");
923 Ok(true)
924 }
925
926 fn reinforce_experience(&self, id: ExperienceId) -> Result<Option<u32>> {
927 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
928 let (new_count, collective_id, timestamp) = {
929 let mut exp_table = write_txn.open_table(EXPERIENCES_TABLE)?;
930
931 let entry = match exp_table.get(id.as_bytes())? {
932 Some(v) => v,
933 None => return Ok(None),
934 };
935
936 let mut experience: Experience = bincode::deserialize(entry.value())
937 .map_err(|e| StorageError::serialization(e.to_string()))?;
938 drop(entry);
939
940 experience.applications = experience.applications.saturating_add(1);
941 let new_count = experience.applications;
942 let collective_id = experience.collective_id;
943 let timestamp = experience.timestamp;
944
945 let bytes = bincode::serialize(&experience)
946 .map_err(|e| StorageError::serialization(e.to_string()))?;
947 exp_table.insert(id.as_bytes(), bytes.as_slice())?;
948 (new_count, collective_id, timestamp)
949 };
950 self.increment_wal_and_record(
952 &write_txn,
953 id.as_bytes(),
954 collective_id,
955 EntityTypeTag::Experience,
956 WatchEventTypeTag::Updated,
957 timestamp,
958 )?;
959 write_txn.commit().map_err(StorageError::from)?;
960
961 debug!(id = %id, applications = new_count, "Experience reinforced");
962 Ok(Some(new_count))
963 }
964
965 fn save_embedding(&self, id: ExperienceId, embedding: &[f32]) -> Result<()> {
966 let bytes = f32_slice_to_bytes(embedding);
967
968 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
969 {
970 let mut table = write_txn.open_table(EMBEDDINGS_TABLE)?;
971 table.insert(id.as_bytes(), bytes.as_slice())?;
972 }
973 write_txn.commit().map_err(StorageError::from)?;
974
975 debug!(id = %id, dim = embedding.len(), "Embedding saved");
976 Ok(())
977 }
978
979 fn get_embedding(&self, id: ExperienceId) -> Result<Option<Vec<f32>>> {
980 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
981 let table = read_txn.open_table(EMBEDDINGS_TABLE)?;
982
983 match table.get(id.as_bytes())? {
984 Some(entry) => Ok(Some(bytes_to_f32_vec(entry.value()))),
985 None => Ok(None),
986 }
987 }
988
989 fn save_relation(&self, relation: &ExperienceRelation) -> Result<()> {
994 let bytes =
995 bincode::serialize(relation).map_err(|e| StorageError::serialization(e.to_string()))?;
996
997 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
998 {
999 let mut table = write_txn.open_table(RELATIONS_TABLE)?;
1000 table.insert(relation.id.as_bytes(), bytes.as_slice())?;
1001 }
1002 {
1003 let mut table = write_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
1004 table.insert(relation.source_id.as_bytes(), relation.id.as_bytes())?;
1005 }
1006 {
1007 let mut table = write_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
1008 table.insert(relation.target_id.as_bytes(), relation.id.as_bytes())?;
1009 }
1010 let collective_id = {
1012 let exp_table = write_txn.open_table(EXPERIENCES_TABLE)?;
1013 let entry = exp_table
1014 .get(relation.source_id.as_bytes())?
1015 .ok_or_else(|| {
1016 StorageError::corrupted("relation source experience not found for WAL record")
1017 })?;
1018 let exp: Experience = bincode::deserialize(entry.value())
1019 .map_err(|e| StorageError::serialization(e.to_string()))?;
1020 exp.collective_id
1021 };
1022 self.increment_wal_and_record(
1023 &write_txn,
1024 relation.id.as_bytes(),
1025 collective_id,
1026 EntityTypeTag::Relation,
1027 WatchEventTypeTag::Created,
1028 relation.created_at,
1029 )?;
1030 write_txn.commit().map_err(StorageError::from)?;
1031
1032 debug!(id = %relation.id, "Relation saved");
1033 Ok(())
1034 }
1035
1036 fn get_relation(&self, id: RelationId) -> Result<Option<ExperienceRelation>> {
1037 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1038 let table = read_txn.open_table(RELATIONS_TABLE)?;
1039
1040 match table.get(id.as_bytes())? {
1041 Some(value) => {
1042 let relation: ExperienceRelation = bincode::deserialize(value.value())
1043 .map_err(|e| StorageError::serialization(e.to_string()))?;
1044 Ok(Some(relation))
1045 }
1046 None => Ok(None),
1047 }
1048 }
1049
1050 fn delete_relation(&self, id: RelationId) -> Result<bool> {
1051 let (source_id, target_id, collective_id) = {
1054 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1055 let rel_table = read_txn.open_table(RELATIONS_TABLE)?;
1056
1057 match rel_table.get(id.as_bytes())? {
1058 Some(entry) => {
1059 let rel: ExperienceRelation = bincode::deserialize(entry.value())
1060 .map_err(|e| StorageError::serialization(e.to_string()))?;
1061 let exp_table = read_txn.open_table(EXPERIENCES_TABLE)?;
1063 let cid = match exp_table.get(rel.source_id.as_bytes())? {
1064 Some(exp_entry) => {
1065 let exp: Experience = bincode::deserialize(exp_entry.value())
1066 .map_err(|e| StorageError::serialization(e.to_string()))?;
1067 exp.collective_id
1068 }
1069 None => CollectiveId::nil(),
1071 };
1072 (rel.source_id, rel.target_id, cid)
1073 }
1074 None => return Ok(false),
1075 }
1076 };
1077
1078 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1080 {
1081 let mut table = write_txn.open_table(RELATIONS_TABLE)?;
1082 table.remove(id.as_bytes())?;
1083 }
1084 {
1085 let mut table = write_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
1086 table.remove(source_id.as_bytes(), id.as_bytes())?;
1087 }
1088 {
1089 let mut table = write_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
1090 table.remove(target_id.as_bytes(), id.as_bytes())?;
1091 }
1092 self.increment_wal_and_record(
1093 &write_txn,
1094 id.as_bytes(),
1095 collective_id,
1096 EntityTypeTag::Relation,
1097 WatchEventTypeTag::Deleted,
1098 Timestamp::now(),
1099 )?;
1100 write_txn.commit().map_err(StorageError::from)?;
1101
1102 debug!(id = %id, "Relation deleted");
1103 Ok(true)
1104 }
1105
1106 fn get_relation_ids_by_source(&self, experience_id: ExperienceId) -> Result<Vec<RelationId>> {
1107 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1108 let table = read_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
1109
1110 let mut ids = Vec::new();
1111 for result in table.get(experience_id.as_bytes())? {
1112 let value = result.map_err(StorageError::from)?;
1113 let bytes = value.value();
1114 ids.push(RelationId::from_bytes(*bytes));
1115 }
1116
1117 Ok(ids)
1118 }
1119
1120 fn get_relation_ids_by_target(&self, experience_id: ExperienceId) -> Result<Vec<RelationId>> {
1121 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1122 let table = read_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
1123
1124 let mut ids = Vec::new();
1125 for result in table.get(experience_id.as_bytes())? {
1126 let value = result.map_err(StorageError::from)?;
1127 let bytes = value.value();
1128 ids.push(RelationId::from_bytes(*bytes));
1129 }
1130
1131 Ok(ids)
1132 }
1133
1134 fn delete_relations_for_experience(&self, experience_id: ExperienceId) -> Result<u64> {
1135 let relation_ids: Vec<RelationId> = {
1137 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1138 let source_table = read_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
1139 let target_table = read_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
1140
1141 let mut ids = std::collections::HashSet::new();
1142
1143 for result in source_table.get(experience_id.as_bytes())? {
1145 let value = result.map_err(StorageError::from)?;
1146 ids.insert(RelationId::from_bytes(*value.value()));
1147 }
1148
1149 for result in target_table.get(experience_id.as_bytes())? {
1151 let value = result.map_err(StorageError::from)?;
1152 ids.insert(RelationId::from_bytes(*value.value()));
1153 }
1154
1155 ids.into_iter().collect()
1156 };
1157
1158 let count = relation_ids.len() as u64;
1159 if count == 0 {
1160 return Ok(0);
1161 }
1162
1163 let relations: Vec<ExperienceRelation> = {
1165 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1166 let table = read_txn.open_table(RELATIONS_TABLE)?;
1167
1168 let mut rels = Vec::with_capacity(relation_ids.len());
1169 for rel_id in &relation_ids {
1170 if let Some(entry) = table.get(rel_id.as_bytes())? {
1171 let rel: ExperienceRelation = bincode::deserialize(entry.value())
1172 .map_err(|e| StorageError::serialization(e.to_string()))?;
1173 rels.push(rel);
1174 }
1175 }
1176 rels
1177 };
1178
1179 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1181 {
1182 let mut rel_table = write_txn.open_table(RELATIONS_TABLE)?;
1183 for rel in &relations {
1184 rel_table.remove(rel.id.as_bytes())?;
1185 }
1186 }
1187 {
1188 let mut source_table = write_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
1189 for rel in &relations {
1190 source_table.remove(rel.source_id.as_bytes(), rel.id.as_bytes())?;
1191 }
1192 }
1193 {
1194 let mut target_table = write_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
1195 for rel in &relations {
1196 target_table.remove(rel.target_id.as_bytes(), rel.id.as_bytes())?;
1197 }
1198 }
1199 write_txn.commit().map_err(StorageError::from)?;
1200
1201 debug!(
1202 experience_id = %experience_id,
1203 count = count,
1204 "Cascade-deleted relations for experience"
1205 );
1206 Ok(count)
1207 }
1208
1209 fn relation_exists(
1210 &self,
1211 source_id: ExperienceId,
1212 target_id: ExperienceId,
1213 relation_type: RelationType,
1214 ) -> Result<bool> {
1215 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1216 let index_table = read_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
1217 let rel_table = read_txn.open_table(RELATIONS_TABLE)?;
1218
1219 for result in index_table.get(source_id.as_bytes())? {
1221 let value = result.map_err(StorageError::from)?;
1222 let rel_id = RelationId::from_bytes(*value.value());
1223
1224 if let Some(entry) = rel_table.get(rel_id.as_bytes())? {
1225 let rel: ExperienceRelation = bincode::deserialize(entry.value())
1226 .map_err(|e| StorageError::serialization(e.to_string()))?;
1227 if rel.target_id == target_id && rel.relation_type == relation_type {
1228 return Ok(true);
1229 }
1230 }
1231 }
1232
1233 Ok(false)
1234 }
1235
1236 fn save_insight(&self, insight: &DerivedInsight) -> Result<()> {
1241 let bytes =
1242 bincode::serialize(insight).map_err(|e| StorageError::serialization(e.to_string()))?;
1243
1244 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1245 {
1246 let mut table = write_txn.open_table(INSIGHTS_TABLE)?;
1247 table.insert(insight.id.as_bytes(), bytes.as_slice())?;
1248 }
1249 {
1250 let mut table = write_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
1251 table.insert(insight.collective_id.as_bytes(), insight.id.as_bytes())?;
1252 }
1253 self.increment_wal_and_record(
1254 &write_txn,
1255 insight.id.as_bytes(),
1256 insight.collective_id,
1257 EntityTypeTag::Insight,
1258 WatchEventTypeTag::Created,
1259 insight.created_at,
1260 )?;
1261 write_txn.commit().map_err(StorageError::from)?;
1262
1263 debug!(id = %insight.id, collective_id = %insight.collective_id, "Insight saved");
1264 Ok(())
1265 }
1266
1267 fn get_insight(&self, id: InsightId) -> Result<Option<DerivedInsight>> {
1268 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1269 let table = read_txn.open_table(INSIGHTS_TABLE)?;
1270
1271 match table.get(id.as_bytes())? {
1272 Some(value) => {
1273 let insight: DerivedInsight = bincode::deserialize(value.value())
1274 .map_err(|e| StorageError::serialization(e.to_string()))?;
1275 Ok(Some(insight))
1276 }
1277 None => Ok(None),
1278 }
1279 }
1280
1281 fn delete_insight(&self, id: InsightId) -> Result<bool> {
1282 let collective_id = {
1284 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1285 let table = read_txn.open_table(INSIGHTS_TABLE)?;
1286
1287 match table.get(id.as_bytes())? {
1288 Some(entry) => {
1289 let insight: DerivedInsight = bincode::deserialize(entry.value())
1290 .map_err(|e| StorageError::serialization(e.to_string()))?;
1291 insight.collective_id
1292 }
1293 None => return Ok(false),
1294 }
1295 };
1296
1297 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1299 {
1300 let mut table = write_txn.open_table(INSIGHTS_TABLE)?;
1301 table.remove(id.as_bytes())?;
1302 }
1303 {
1304 let mut table = write_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
1305 table.remove(collective_id.as_bytes(), id.as_bytes())?;
1306 }
1307 self.increment_wal_and_record(
1308 &write_txn,
1309 id.as_bytes(),
1310 collective_id,
1311 EntityTypeTag::Insight,
1312 WatchEventTypeTag::Deleted,
1313 Timestamp::now(),
1314 )?;
1315 write_txn.commit().map_err(StorageError::from)?;
1316
1317 debug!(id = %id, "Insight deleted");
1318 Ok(true)
1319 }
1320
1321 fn list_insight_ids_in_collective(&self, id: CollectiveId) -> Result<Vec<InsightId>> {
1322 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1323 let table = read_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
1324
1325 let mut ids = Vec::new();
1326 for result in table.get(id.as_bytes())? {
1327 let value = result.map_err(StorageError::from)?;
1328 ids.push(InsightId::from_bytes(*value.value()));
1329 }
1330
1331 Ok(ids)
1332 }
1333
1334 fn delete_insights_by_collective(&self, id: CollectiveId) -> Result<u64> {
1335 let insight_ids: Vec<[u8; 16]> = {
1337 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1338 let table = read_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
1339
1340 let mut ids = Vec::new();
1341 for result in table.get(id.as_bytes())? {
1342 let value = result.map_err(StorageError::from)?;
1343 ids.push(*value.value());
1344 }
1345 ids
1346 };
1347
1348 let count = insight_ids.len() as u64;
1349 if count == 0 {
1350 return Ok(0);
1351 }
1352
1353 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1355 {
1356 let mut table = write_txn.open_table(INSIGHTS_TABLE)?;
1357 for insight_id in &insight_ids {
1358 table.remove(insight_id)?;
1359 }
1360 }
1361 {
1362 let mut table = write_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
1363 table.remove_all(id.as_bytes())?;
1364 }
1365 write_txn.commit().map_err(StorageError::from)?;
1366
1367 debug!(id = %id, count = count, "Cascade-deleted insights for collective");
1368 Ok(count)
1369 }
1370
1371 fn save_activity(&self, activity: &Activity) -> Result<()> {
1376 let key = encode_activity_key(activity.collective_id.as_bytes(), &activity.agent_id);
1377 let bytes =
1378 bincode::serialize(activity).map_err(|e| StorageError::serialization(e.to_string()))?;
1379
1380 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1381 {
1382 let mut table = write_txn.open_table(ACTIVITIES_TABLE)?;
1383 table.insert(key.as_slice(), bytes.as_slice())?;
1384 }
1385 write_txn.commit().map_err(StorageError::from)?;
1386
1387 debug!(
1388 agent_id = %activity.agent_id,
1389 collective_id = %activity.collective_id,
1390 "Activity saved"
1391 );
1392 Ok(())
1393 }
1394
1395 fn get_activity(
1396 &self,
1397 agent_id: &str,
1398 collective_id: CollectiveId,
1399 ) -> Result<Option<Activity>> {
1400 let key = encode_activity_key(collective_id.as_bytes(), agent_id);
1401
1402 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1403 let table = read_txn.open_table(ACTIVITIES_TABLE)?;
1404
1405 match table.get(key.as_slice())? {
1406 Some(value) => {
1407 let activity: Activity = bincode::deserialize(value.value())
1408 .map_err(|e| StorageError::serialization(e.to_string()))?;
1409 Ok(Some(activity))
1410 }
1411 None => Ok(None),
1412 }
1413 }
1414
1415 fn delete_activity(&self, agent_id: &str, collective_id: CollectiveId) -> Result<bool> {
1416 let key = encode_activity_key(collective_id.as_bytes(), agent_id);
1417
1418 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1419 let existed = {
1420 let mut table = write_txn.open_table(ACTIVITIES_TABLE)?;
1421 let removed = table.remove(key.as_slice())?;
1422 removed.is_some()
1423 };
1424 write_txn.commit().map_err(StorageError::from)?;
1425
1426 if existed {
1427 debug!(agent_id = %agent_id, collective_id = %collective_id, "Activity deleted");
1428 }
1429 Ok(existed)
1430 }
1431
1432 fn list_activities_in_collective(&self, collective_id: CollectiveId) -> Result<Vec<Activity>> {
1433 let prefix = collective_id.as_bytes();
1434
1435 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1436 let table = read_txn.open_table(ACTIVITIES_TABLE)?;
1437
1438 let mut activities = Vec::new();
1439 for result in table.iter()? {
1440 let (key, value) = result.map_err(StorageError::from)?;
1441 let key_bytes = key.value();
1442
1443 if key_bytes.len() >= 16 && decode_collective_from_activity_key(key_bytes) == *prefix {
1445 let activity: Activity = bincode::deserialize(value.value())
1446 .map_err(|e| StorageError::serialization(e.to_string()))?;
1447 activities.push(activity);
1448 }
1449 }
1450
1451 Ok(activities)
1452 }
1453
1454 fn delete_activities_by_collective(&self, collective_id: CollectiveId) -> Result<u64> {
1455 let prefix = collective_id.as_bytes();
1456
1457 let keys_to_delete: Vec<Vec<u8>> = {
1459 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1460 let table = read_txn.open_table(ACTIVITIES_TABLE)?;
1461
1462 let mut keys = Vec::new();
1463 for result in table.iter()? {
1464 let (key, _) = result.map_err(StorageError::from)?;
1465 let key_bytes = key.value();
1466
1467 if key_bytes.len() >= 16
1468 && decode_collective_from_activity_key(key_bytes) == *prefix
1469 {
1470 keys.push(key_bytes.to_vec());
1471 }
1472 }
1473 keys
1474 };
1475
1476 let count = keys_to_delete.len() as u64;
1477 if count == 0 {
1478 return Ok(0);
1479 }
1480
1481 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1483 {
1484 let mut table = write_txn.open_table(ACTIVITIES_TABLE)?;
1485 for key in &keys_to_delete {
1486 table.remove(key.as_slice())?;
1487 }
1488 }
1489 write_txn.commit().map_err(StorageError::from)?;
1490
1491 debug!(
1492 collective_id = %collective_id,
1493 count = count,
1494 "Cascade-deleted activities for collective"
1495 );
1496 Ok(count)
1497 }
1498
1499 fn get_wal_sequence(&self) -> Result<u64> {
1504 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1505 let meta_table = read_txn.open_table(METADATA_TABLE)?;
1506 match meta_table.get(WAL_SEQUENCE_KEY)? {
1507 Some(entry) => {
1508 let bytes: [u8; 8] = entry
1509 .value()
1510 .try_into()
1511 .map_err(|_| StorageError::corrupted("invalid wal_sequence bytes"))?;
1512 Ok(u64::from_be_bytes(bytes))
1513 }
1514 None => Ok(0),
1515 }
1516 }
1517
1518 fn poll_watch_events(
1519 &self,
1520 since_seq: u64,
1521 limit: usize,
1522 ) -> Result<(Vec<WatchEventRecord>, u64)> {
1523 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1524 let events_table = read_txn.open_table(WATCH_EVENTS_TABLE)?;
1525
1526 let start_key = (since_seq + 1).to_be_bytes();
1527 let end_key = u64::MAX.to_be_bytes();
1528 let mut events = Vec::new();
1529 let mut max_seq = since_seq;
1530
1531 for entry in events_table.range::<&[u8; 8]>(&start_key..=&end_key)? {
1532 let (key, value) = entry.map_err(StorageError::from)?;
1533 let seq = u64::from_be_bytes(*key.value());
1534 let record: WatchEventRecord = bincode::deserialize(value.value())
1535 .map_err(|e| StorageError::serialization(e.to_string()))?;
1536 events.push(record);
1537 max_seq = seq;
1538 if events.len() >= limit {
1539 break;
1540 }
1541 }
1542
1543 Ok((events, max_seq))
1544 }
1545
1546 #[cfg(feature = "sync")]
1551 fn poll_sync_events(
1552 &self,
1553 since_seq: u64,
1554 limit: usize,
1555 ) -> Result<Vec<(u64, WatchEventRecord)>> {
1556 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1557 let events_table = read_txn.open_table(WATCH_EVENTS_TABLE)?;
1558
1559 let start_key = (since_seq + 1).to_be_bytes();
1560 let end_key = u64::MAX.to_be_bytes();
1561 let mut events = Vec::new();
1562
1563 for entry in events_table.range::<&[u8; 8]>(&start_key..=&end_key)? {
1564 let (key, value) = entry.map_err(StorageError::from)?;
1565 let seq = u64::from_be_bytes(*key.value());
1566 let record: WatchEventRecord = bincode::deserialize(value.value())
1567 .map_err(|e| StorageError::serialization(e.to_string()))?;
1568 events.push((seq, record));
1569 if events.len() >= limit {
1570 break;
1571 }
1572 }
1573
1574 Ok(events)
1575 }
1576
1577 #[cfg(feature = "sync")]
1578 fn instance_id(&self) -> crate::sync::InstanceId {
1579 self.instance_id
1580 }
1581
1582 #[cfg(feature = "sync")]
1583 fn save_sync_cursor(&self, cursor: &crate::sync::SyncCursor) -> Result<()> {
1584 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1585 {
1586 let mut table = write_txn.open_table(SYNC_CURSORS_TABLE)?;
1587 let bytes = bincode::serialize(cursor)
1588 .map_err(|e| StorageError::serialization(e.to_string()))?;
1589 table.insert(cursor.instance_id.as_bytes(), bytes.as_slice())?;
1590 }
1591 write_txn.commit().map_err(StorageError::from)?;
1592 debug!(
1593 peer = %cursor.instance_id,
1594 last_sequence = cursor.last_sequence,
1595 "Saved sync cursor"
1596 );
1597 Ok(())
1598 }
1599
1600 #[cfg(feature = "sync")]
1601 fn load_sync_cursor(
1602 &self,
1603 instance_id: &crate::sync::InstanceId,
1604 ) -> Result<Option<crate::sync::SyncCursor>> {
1605 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1606 let table = read_txn.open_table(SYNC_CURSORS_TABLE)?;
1607 match table.get(instance_id.as_bytes())? {
1608 Some(entry) => {
1609 let cursor: crate::sync::SyncCursor = bincode::deserialize(entry.value())
1610 .map_err(|e| StorageError::serialization(e.to_string()))?;
1611 Ok(Some(cursor))
1612 }
1613 None => Ok(None),
1614 }
1615 }
1616
1617 #[cfg(feature = "sync")]
1618 fn list_sync_cursors(&self) -> Result<Vec<crate::sync::SyncCursor>> {
1619 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1620 let table = read_txn.open_table(SYNC_CURSORS_TABLE)?;
1621 let mut cursors = Vec::new();
1622 for entry in table.iter()? {
1623 let (_, value) = entry.map_err(StorageError::from)?;
1624 let cursor: crate::sync::SyncCursor = bincode::deserialize(value.value())
1625 .map_err(|e| StorageError::serialization(e.to_string()))?;
1626 cursors.push(cursor);
1627 }
1628 Ok(cursors)
1629 }
1630
1631 #[cfg(feature = "sync")]
1632 fn compact_wal_events(&self, up_to_seq: u64) -> Result<u64> {
1633 if up_to_seq == 0 {
1634 return Ok(0);
1635 }
1636
1637 let keys_to_delete: Vec<[u8; 8]> = {
1639 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1640 let events_table = read_txn.open_table(WATCH_EVENTS_TABLE)?;
1641
1642 let start_key = 1u64.to_be_bytes();
1643 let end_key = up_to_seq.to_be_bytes();
1644 let mut keys = Vec::new();
1645
1646 for entry in events_table.range::<&[u8; 8]>(&start_key..=&end_key)? {
1647 let (key, _) = entry.map_err(StorageError::from)?;
1648 keys.push(*key.value());
1649 }
1650 keys
1651 };
1652
1653 if keys_to_delete.is_empty() {
1654 return Ok(0);
1655 }
1656
1657 let count = keys_to_delete.len() as u64;
1658
1659 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1661 {
1662 let mut events_table = write_txn.open_table(WATCH_EVENTS_TABLE)?;
1663 for key in &keys_to_delete {
1664 events_table.remove(key)?;
1665 }
1666 }
1667 write_txn.commit().map_err(StorageError::from)?;
1668
1669 debug!(count, up_to_seq, "Compacted WAL events");
1670 Ok(count)
1671 }
1672}
1673
1674#[inline]
1680fn f32_slice_to_bytes(data: &[f32]) -> Vec<u8> {
1681 let mut bytes = Vec::with_capacity(data.len() * 4);
1682 for &val in data {
1683 bytes.extend_from_slice(&val.to_le_bytes());
1684 }
1685 bytes
1686}
1687
1688#[inline]
1690fn bytes_to_f32_vec(data: &[u8]) -> Vec<f32> {
1691 data.chunks_exact(4)
1692 .map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
1693 .collect()
1694}
1695
1696#[cfg(test)]
1700mod tests {
1701 use super::*;
1702 use tempfile::tempdir;
1703
1704 fn default_config() -> Config {
1705 Config::default()
1706 }
1707
1708 #[test]
1709 fn test_open_creates_new_database() {
1710 let dir = tempdir().unwrap();
1711 let path = dir.path().join("test.db");
1712
1713 assert!(!path.exists());
1714
1715 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1716
1717 assert!(path.exists());
1718 assert_eq!(storage.metadata().schema_version, SCHEMA_VERSION);
1719 assert_eq!(
1720 storage.metadata().embedding_dimension,
1721 EmbeddingDimension::D384
1722 );
1723
1724 Box::new(storage).close().unwrap();
1725 }
1726
1727 #[test]
1728 fn test_open_existing_database() {
1729 let dir = tempdir().unwrap();
1730 let path = dir.path().join("test.db");
1731
1732 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1734 let created_at = storage.metadata().created_at;
1735 Box::new(storage).close().unwrap();
1736
1737 std::thread::sleep(std::time::Duration::from_millis(10));
1739 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1740
1741 assert_eq!(storage.metadata().created_at, created_at);
1743 assert!(storage.metadata().last_opened_at > created_at);
1745
1746 Box::new(storage).close().unwrap();
1747 }
1748
1749 #[test]
1750 fn test_dimension_mismatch_returns_error() {
1751 let dir = tempdir().unwrap();
1752 let path = dir.path().join("test.db");
1753
1754 let config_384 = Config {
1756 embedding_dimension: EmbeddingDimension::D384,
1757 ..Default::default()
1758 };
1759 let storage = RedbStorage::open(&path, &config_384).unwrap();
1760 Box::new(storage).close().unwrap();
1761
1762 let config_768 = Config {
1764 embedding_dimension: EmbeddingDimension::D768,
1765 ..Default::default()
1766 };
1767 let result = RedbStorage::open(&path, &config_768);
1768
1769 assert!(result.is_err());
1770 let err = result.unwrap_err();
1771 assert!(matches!(
1772 err,
1773 PulseDBError::Validation(ValidationError::DimensionMismatch { .. })
1774 ));
1775 }
1776
1777 #[test]
1778 fn test_database_files_created() {
1779 let dir = tempdir().unwrap();
1780 let path = dir.path().join("pulse.db");
1781
1782 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1783
1784 assert!(path.exists());
1786 assert!(storage.path().is_some());
1787 assert_eq!(storage.path().unwrap(), path);
1788
1789 Box::new(storage).close().unwrap();
1790 }
1791
1792 #[test]
1793 fn test_metadata_preserved_across_opens() {
1794 let dir = tempdir().unwrap();
1795 let path = dir.path().join("test.db");
1796
1797 let config = Config {
1798 embedding_dimension: EmbeddingDimension::Custom(512),
1799 ..Default::default()
1800 };
1801
1802 let storage = RedbStorage::open(&path, &config).unwrap();
1804 assert_eq!(
1805 storage.metadata().embedding_dimension,
1806 EmbeddingDimension::Custom(512)
1807 );
1808 Box::new(storage).close().unwrap();
1809
1810 let storage = RedbStorage::open(&path, &config).unwrap();
1812 assert_eq!(
1813 storage.metadata().embedding_dimension,
1814 EmbeddingDimension::Custom(512)
1815 );
1816 Box::new(storage).close().unwrap();
1817 }
1818
1819 #[test]
1820 fn test_embedding_dimension_accessor() {
1821 let dir = tempdir().unwrap();
1822 let path = dir.path().join("test.db");
1823
1824 let config = Config {
1825 embedding_dimension: EmbeddingDimension::D768,
1826 ..Default::default()
1827 };
1828
1829 let storage = RedbStorage::open(&path, &config).unwrap();
1830 assert_eq!(storage.embedding_dimension(), EmbeddingDimension::D768);
1831
1832 Box::new(storage).close().unwrap();
1833 }
1834
1835 #[test]
1836 fn test_all_six_tables_created() {
1837 let dir = tempdir().unwrap();
1838 let path = dir.path().join("test.db");
1839
1840 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1841
1842 let read_txn = storage.database().begin_read().unwrap();
1846
1847 read_txn.open_table(METADATA_TABLE).unwrap();
1848 read_txn.open_table(COLLECTIVES_TABLE).unwrap();
1849 read_txn.open_table(EXPERIENCES_TABLE).unwrap();
1850 read_txn.open_table(EMBEDDINGS_TABLE).unwrap();
1851 read_txn
1852 .open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)
1853 .unwrap();
1854 read_txn
1855 .open_multimap_table(EXPERIENCES_BY_TYPE_TABLE)
1856 .unwrap();
1857
1858 Box::new(storage).close().unwrap();
1859 }
1860
1861 #[test]
1866 fn test_save_and_get_collective() {
1867 let dir = tempdir().unwrap();
1868 let path = dir.path().join("test.db");
1869 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1870
1871 let collective = Collective::new("test-project", 384);
1872 let id = collective.id;
1873
1874 storage.save_collective(&collective).unwrap();
1875
1876 let retrieved = storage.get_collective(id).unwrap().unwrap();
1877 assert_eq!(retrieved.id, id);
1878 assert_eq!(retrieved.name, "test-project");
1879 assert_eq!(retrieved.embedding_dimension, 384);
1880 assert!(retrieved.owner_id.is_none());
1881
1882 Box::new(storage).close().unwrap();
1883 }
1884
1885 #[test]
1886 fn test_get_nonexistent_collective_returns_none() {
1887 let dir = tempdir().unwrap();
1888 let path = dir.path().join("test.db");
1889 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1890
1891 let result = storage.get_collective(CollectiveId::new()).unwrap();
1892 assert!(result.is_none());
1893
1894 Box::new(storage).close().unwrap();
1895 }
1896
1897 #[test]
1898 fn test_save_collective_overwrites_existing() {
1899 let dir = tempdir().unwrap();
1900 let path = dir.path().join("test.db");
1901 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1902
1903 let mut collective = Collective::new("original-name", 384);
1904 let id = collective.id;
1905 storage.save_collective(&collective).unwrap();
1906
1907 collective.name = "updated-name".to_string();
1909 storage.save_collective(&collective).unwrap();
1910
1911 let retrieved = storage.get_collective(id).unwrap().unwrap();
1912 assert_eq!(retrieved.name, "updated-name");
1913
1914 Box::new(storage).close().unwrap();
1915 }
1916
1917 #[test]
1918 fn test_list_collectives_empty() {
1919 let dir = tempdir().unwrap();
1920 let path = dir.path().join("test.db");
1921 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1922
1923 let collectives = storage.list_collectives().unwrap();
1924 assert!(collectives.is_empty());
1925
1926 Box::new(storage).close().unwrap();
1927 }
1928
1929 #[test]
1930 fn test_list_collectives_returns_all() {
1931 let dir = tempdir().unwrap();
1932 let path = dir.path().join("test.db");
1933 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1934
1935 let c1 = Collective::new("project-alpha", 384);
1936 let c2 = Collective::new("project-beta", 384);
1937 let c3 = Collective::new("project-gamma", 384);
1938
1939 storage.save_collective(&c1).unwrap();
1940 storage.save_collective(&c2).unwrap();
1941 storage.save_collective(&c3).unwrap();
1942
1943 let collectives = storage.list_collectives().unwrap();
1944 assert_eq!(collectives.len(), 3);
1945
1946 let ids: Vec<CollectiveId> = collectives.iter().map(|c| c.id).collect();
1948 assert!(ids.contains(&c1.id));
1949 assert!(ids.contains(&c2.id));
1950 assert!(ids.contains(&c3.id));
1951
1952 Box::new(storage).close().unwrap();
1953 }
1954
1955 #[test]
1956 fn test_delete_collective_existing() {
1957 let dir = tempdir().unwrap();
1958 let path = dir.path().join("test.db");
1959 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1960
1961 let collective = Collective::new("to-delete", 384);
1962 let id = collective.id;
1963 storage.save_collective(&collective).unwrap();
1964
1965 let deleted = storage.delete_collective(id).unwrap();
1967 assert!(deleted);
1968
1969 assert!(storage.get_collective(id).unwrap().is_none());
1971
1972 Box::new(storage).close().unwrap();
1973 }
1974
1975 #[test]
1976 fn test_delete_collective_nonexistent() {
1977 let dir = tempdir().unwrap();
1978 let path = dir.path().join("test.db");
1979 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1980
1981 let deleted = storage.delete_collective(CollectiveId::new()).unwrap();
1982 assert!(!deleted);
1983
1984 Box::new(storage).close().unwrap();
1985 }
1986
1987 #[test]
1992 fn test_uncommitted_transaction_is_invisible() {
1993 let dir = tempdir().unwrap();
1996 let path = dir.path().join("test.db");
1997 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1998
1999 let collective = Collective::new("phantom", 384);
2000 let id = collective.id;
2001 let bytes = bincode::serialize(&collective).unwrap();
2002
2003 {
2005 let write_txn = storage.database().begin_write().unwrap();
2006 {
2007 let mut table = write_txn.open_table(COLLECTIVES_TABLE).unwrap();
2008 table.insert(id.as_bytes(), bytes.as_slice()).unwrap();
2009 }
2010 }
2012
2013 let result = storage.get_collective(id).unwrap();
2015 assert!(result.is_none(), "Uncommitted data must not be visible");
2016
2017 Box::new(storage).close().unwrap();
2018 }
2019
2020 #[test]
2021 fn test_committed_transaction_is_visible() {
2022 let dir = tempdir().unwrap();
2025 let path = dir.path().join("test.db");
2026 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2027
2028 let collective = Collective::new("committed", 384);
2029 let id = collective.id;
2030
2031 storage.save_collective(&collective).unwrap();
2032
2033 let result = storage.get_collective(id).unwrap();
2034 assert!(result.is_some(), "Committed data must be visible");
2035
2036 Box::new(storage).close().unwrap();
2037 }
2038
2039 #[test]
2040 fn test_multi_table_atomicity() {
2041 let dir = tempdir().unwrap();
2045 let path = dir.path().join("test.db");
2046 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2047
2048 let collective = Collective::new("multi-table", 384);
2049 let id = collective.id;
2050 let collective_bytes = bincode::serialize(&collective).unwrap();
2051
2052 let write_txn = storage.database().begin_write().unwrap();
2054 {
2055 let mut coll_table = write_txn.open_table(COLLECTIVES_TABLE).unwrap();
2056 coll_table
2057 .insert(id.as_bytes(), collective_bytes.as_slice())
2058 .unwrap();
2059 }
2060 {
2061 let mut meta_table = write_txn.open_table(METADATA_TABLE).unwrap();
2062 meta_table
2063 .insert("test_marker", b"multi_table_test".as_slice())
2064 .unwrap();
2065 }
2066 write_txn.commit().unwrap();
2067
2068 let coll = storage.get_collective(id).unwrap();
2070 assert!(coll.is_some(), "Collective from multi-table txn must exist");
2071
2072 let read_txn = storage.database().begin_read().unwrap();
2073 let meta_table = read_txn.open_table(METADATA_TABLE).unwrap();
2074 let marker = meta_table.get("test_marker").unwrap();
2075 assert!(marker.is_some(), "Metadata from multi-table txn must exist");
2076
2077 Box::new(storage).close().unwrap();
2078 }
2079
2080 #[test]
2081 fn test_mvcc_read_consistency() {
2082 let dir = tempdir().unwrap();
2096 let path = dir.path().join("test.db");
2097 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2098
2099 let c1 = Collective::new("alpha", 384);
2101 let c2 = Collective::new("beta", 384);
2102 let c3 = Collective::new("gamma", 384);
2103
2104 storage.save_collective(&c1).unwrap();
2105 storage.save_collective(&c2).unwrap();
2106 storage.save_collective(&c3).unwrap();
2107
2108 storage.delete_collective(c2.id).unwrap();
2110
2111 let read_txn = storage.database().begin_read().unwrap();
2114 let table = read_txn.open_table(COLLECTIVES_TABLE).unwrap();
2115
2116 assert!(
2117 table.get(c1.id.as_bytes()).unwrap().is_some(),
2118 "c1 must be visible (committed)"
2119 );
2120 assert!(
2121 table.get(c2.id.as_bytes()).unwrap().is_none(),
2122 "c2 must be absent (deleted)"
2123 );
2124 assert!(
2125 table.get(c3.id.as_bytes()).unwrap().is_some(),
2126 "c3 must be visible (committed)"
2127 );
2128
2129 let count = table.iter().unwrap().count();
2131 assert_eq!(count, 2, "Exactly 2 collectives should exist");
2133
2134 drop(table);
2135 drop(read_txn);
2136
2137 Box::new(storage).close().unwrap();
2138 }
2139
2140 #[test]
2145 fn test_corruption_detection_invalid_metadata_bytes() {
2146 let dir = tempdir().unwrap();
2149 let path = dir.path().join("corrupt.db");
2150
2151 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2153 let write_txn = storage.database().begin_write().unwrap();
2154 {
2155 let mut meta = write_txn.open_table(METADATA_TABLE).unwrap();
2156 meta.insert(METADATA_KEY, b"not-valid-bincode-data".as_slice())
2157 .unwrap();
2158 }
2159 write_txn.commit().unwrap();
2160 Box::new(storage).close().unwrap();
2161
2162 let result = RedbStorage::open(&path, &default_config());
2164 assert!(result.is_err(), "Corrupted metadata must be rejected");
2165 let err = result.unwrap_err();
2166 match err {
2167 PulseDBError::Storage(StorageError::Corrupted(msg)) => {
2168 assert!(
2169 msg.contains("Invalid metadata format"),
2170 "Error should mention invalid format, got: {}",
2171 msg
2172 );
2173 }
2174 other => panic!("Expected StorageError::Corrupted, got: {:?}", other),
2175 }
2176 }
2177
2178 #[test]
2179 fn test_corruption_detection_missing_metadata_key() {
2180 let dir = tempdir().unwrap();
2183 let path = dir.path().join("no_key.db");
2184
2185 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2187 let write_txn = storage.database().begin_write().unwrap();
2188 {
2189 let mut meta = write_txn.open_table(METADATA_TABLE).unwrap();
2190 meta.remove(METADATA_KEY).unwrap();
2191 }
2192 write_txn.commit().unwrap();
2193 Box::new(storage).close().unwrap();
2194
2195 let result = RedbStorage::open(&path, &default_config());
2197 assert!(result.is_err(), "Missing metadata key must be rejected");
2198 let err = result.unwrap_err();
2199 match err {
2200 PulseDBError::Storage(StorageError::Corrupted(msg)) => {
2201 assert!(
2202 msg.contains("Missing database metadata"),
2203 "Error should mention missing metadata, got: {}",
2204 msg
2205 );
2206 }
2207 other => panic!("Expected StorageError::Corrupted, got: {:?}", other),
2208 }
2209 }
2210
2211 #[test]
2212 fn test_corruption_detection_missing_metadata_table() {
2213 let dir = tempdir().unwrap();
2217 let path = dir.path().join("no_table.db");
2218
2219 {
2221 let db = ::redb::Database::create(&path).unwrap();
2222 let write_txn = db.begin_write().unwrap();
2223 {
2224 let dummy: ::redb::TableDefinition<&str, &str> =
2225 ::redb::TableDefinition::new("dummy");
2226 let mut table = write_txn.open_table(dummy).unwrap();
2227 table.insert("key", "value").unwrap();
2228 }
2229 write_txn.commit().unwrap();
2230 }
2231
2232 let result = RedbStorage::open(&path, &default_config());
2234 assert!(result.is_err(), "Missing metadata table must be rejected");
2235 let err = result.unwrap_err();
2236 match err {
2237 PulseDBError::Storage(StorageError::Corrupted(msg)) => {
2238 assert!(
2239 msg.contains("Cannot open metadata table"),
2240 "Error should mention metadata table, got: {}",
2241 msg
2242 );
2243 }
2244 other => panic!("Expected StorageError::Corrupted, got: {:?}", other),
2245 }
2246 }
2247
2248 use crate::experience::{Experience, ExperienceType, ExperienceUpdate, Severity};
2253 use crate::types::{AgentId, ExperienceId, Timestamp};
2254
2255 fn test_experience(collective_id: CollectiveId, dim: usize) -> Experience {
2257 Experience {
2258 id: ExperienceId::new(),
2259 collective_id,
2260 content: "Test experience content".into(),
2261 embedding: vec![0.42; dim],
2262 experience_type: ExperienceType::Fact {
2263 statement: "redb uses shadow paging".into(),
2264 source: "docs".into(),
2265 },
2266 importance: 0.8,
2267 confidence: 0.7,
2268 applications: 0,
2269 domain: vec!["rust".into(), "databases".into()],
2270 related_files: vec!["src/storage/redb.rs".into()],
2271 source_agent: AgentId::new("test-agent"),
2272 source_task: None,
2273 timestamp: Timestamp::now(),
2274 archived: false,
2275 }
2276 }
2277
2278 #[test]
2279 fn test_save_and_get_experience() {
2280 let dir = tempdir().unwrap();
2281 let path = dir.path().join("test.db");
2282 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2283
2284 let collective = Collective::new("test", 384);
2285 storage.save_collective(&collective).unwrap();
2286
2287 let exp = test_experience(collective.id, 384);
2288 let exp_id = exp.id;
2289
2290 storage.save_experience(&exp).unwrap();
2291
2292 let retrieved = storage.get_experience(exp_id).unwrap().unwrap();
2293 assert_eq!(retrieved.id, exp_id);
2294 assert_eq!(retrieved.collective_id, collective.id);
2295 assert_eq!(retrieved.content, "Test experience content");
2296 assert_eq!(retrieved.importance, 0.8);
2297 assert_eq!(retrieved.confidence, 0.7);
2298 assert_eq!(retrieved.applications, 0);
2299 assert_eq!(retrieved.domain, vec!["rust", "databases"]);
2300 assert!(!retrieved.archived);
2301 assert_eq!(retrieved.embedding.len(), 384);
2303 assert_eq!(retrieved.embedding[0], 0.42);
2304
2305 Box::new(storage).close().unwrap();
2306 }
2307
2308 #[test]
2309 fn test_get_nonexistent_experience_returns_none() {
2310 let dir = tempdir().unwrap();
2311 let path = dir.path().join("test.db");
2312 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2313
2314 let result = storage.get_experience(ExperienceId::new()).unwrap();
2315 assert!(result.is_none());
2316
2317 Box::new(storage).close().unwrap();
2318 }
2319
2320 #[test]
2321 fn test_update_experience_fields() {
2322 let dir = tempdir().unwrap();
2323 let path = dir.path().join("test.db");
2324 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2325
2326 let collective = Collective::new("test", 384);
2327 storage.save_collective(&collective).unwrap();
2328
2329 let exp = test_experience(collective.id, 384);
2330 let exp_id = exp.id;
2331 storage.save_experience(&exp).unwrap();
2332
2333 let update = ExperienceUpdate {
2335 importance: Some(0.95),
2336 domain: Some(vec!["updated-tag".into()]),
2337 ..Default::default()
2338 };
2339 let updated = storage.update_experience(exp_id, &update).unwrap();
2340 assert!(updated);
2341
2342 let retrieved = storage.get_experience(exp_id).unwrap().unwrap();
2343 assert_eq!(retrieved.importance, 0.95);
2344 assert_eq!(retrieved.domain, vec!["updated-tag"]);
2345 assert_eq!(retrieved.confidence, 0.7);
2347
2348 Box::new(storage).close().unwrap();
2349 }
2350
2351 #[test]
2352 fn test_update_nonexistent_experience_returns_false() {
2353 let dir = tempdir().unwrap();
2354 let path = dir.path().join("test.db");
2355 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2356
2357 let update = ExperienceUpdate {
2358 importance: Some(0.5),
2359 ..Default::default()
2360 };
2361 let result = storage
2362 .update_experience(ExperienceId::new(), &update)
2363 .unwrap();
2364 assert!(!result);
2365
2366 Box::new(storage).close().unwrap();
2367 }
2368
2369 #[test]
2370 fn test_delete_experience() {
2371 let dir = tempdir().unwrap();
2372 let path = dir.path().join("test.db");
2373 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2374
2375 let collective = Collective::new("test", 384);
2376 storage.save_collective(&collective).unwrap();
2377
2378 let exp = test_experience(collective.id, 384);
2379 let exp_id = exp.id;
2380 storage.save_experience(&exp).unwrap();
2381
2382 assert!(storage.get_experience(exp_id).unwrap().is_some());
2384
2385 let deleted = storage.delete_experience(exp_id).unwrap();
2387 assert!(deleted);
2388
2389 assert!(storage.get_experience(exp_id).unwrap().is_none());
2391 assert!(storage.get_embedding(exp_id).unwrap().is_none());
2392
2393 assert_eq!(
2395 storage
2396 .count_experiences_in_collective(collective.id)
2397 .unwrap(),
2398 0
2399 );
2400
2401 Box::new(storage).close().unwrap();
2402 }
2403
2404 #[test]
2405 fn test_delete_nonexistent_experience_returns_false() {
2406 let dir = tempdir().unwrap();
2407 let path = dir.path().join("test.db");
2408 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2409
2410 let result = storage.delete_experience(ExperienceId::new()).unwrap();
2411 assert!(!result);
2412
2413 Box::new(storage).close().unwrap();
2414 }
2415
2416 #[test]
2417 fn test_save_and_get_embedding() {
2418 let dir = tempdir().unwrap();
2419 let path = dir.path().join("test.db");
2420 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2421
2422 let id = ExperienceId::new();
2423 let embedding = vec![0.1, 0.2, 0.3, -0.5, 1.0, f32::MIN_POSITIVE];
2424
2425 storage.save_embedding(id, &embedding).unwrap();
2426
2427 let retrieved = storage.get_embedding(id).unwrap().unwrap();
2428 assert_eq!(retrieved, embedding);
2429
2430 Box::new(storage).close().unwrap();
2431 }
2432
2433 #[test]
2434 fn test_experience_by_collective_index() {
2435 let dir = tempdir().unwrap();
2436 let path = dir.path().join("test.db");
2437 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2438
2439 let collective = Collective::new("test", 384);
2440 storage.save_collective(&collective).unwrap();
2441
2442 for _ in 0..3 {
2444 let exp = test_experience(collective.id, 384);
2445 storage.save_experience(&exp).unwrap();
2446 }
2447
2448 assert_eq!(
2450 storage
2451 .count_experiences_in_collective(collective.id)
2452 .unwrap(),
2453 3
2454 );
2455
2456 Box::new(storage).close().unwrap();
2457 }
2458
2459 #[test]
2460 fn test_cascade_delete_includes_experiences() {
2461 let dir = tempdir().unwrap();
2462 let path = dir.path().join("test.db");
2463 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2464
2465 let collective = Collective::new("test", 384);
2466 storage.save_collective(&collective).unwrap();
2467
2468 let exp1 = test_experience(collective.id, 384);
2469 let exp2 = test_experience(collective.id, 384);
2470 let id1 = exp1.id;
2471 let id2 = exp2.id;
2472 storage.save_experience(&exp1).unwrap();
2473 storage.save_experience(&exp2).unwrap();
2474
2475 let count = storage
2477 .delete_experiences_by_collective(collective.id)
2478 .unwrap();
2479 assert_eq!(count, 2);
2480
2481 assert!(storage.get_experience(id1).unwrap().is_none());
2483 assert!(storage.get_experience(id2).unwrap().is_none());
2484 assert!(storage.get_embedding(id1).unwrap().is_none());
2485 assert!(storage.get_embedding(id2).unwrap().is_none());
2486
2487 Box::new(storage).close().unwrap();
2488 }
2489
2490 #[test]
2491 fn test_update_experience_archived_flag() {
2492 let dir = tempdir().unwrap();
2493 let path = dir.path().join("test.db");
2494 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2495
2496 let collective = Collective::new("test", 384);
2497 storage.save_collective(&collective).unwrap();
2498
2499 let exp = test_experience(collective.id, 384);
2500 let exp_id = exp.id;
2501 storage.save_experience(&exp).unwrap();
2502
2503 let update = ExperienceUpdate {
2505 archived: Some(true),
2506 ..Default::default()
2507 };
2508 storage.update_experience(exp_id, &update).unwrap();
2509
2510 let retrieved = storage.get_experience(exp_id).unwrap().unwrap();
2511 assert!(retrieved.archived);
2512
2513 let update = ExperienceUpdate {
2515 archived: Some(false),
2516 ..Default::default()
2517 };
2518 storage.update_experience(exp_id, &update).unwrap();
2519
2520 let retrieved = storage.get_experience(exp_id).unwrap().unwrap();
2521 assert!(!retrieved.archived);
2522
2523 Box::new(storage).close().unwrap();
2524 }
2525
2526 #[test]
2527 fn test_f32_byte_conversion_roundtrip() {
2528 let original = vec![0.0, 1.0, -1.0, f32::MAX, f32::MIN, std::f32::consts::PI];
2529 let bytes = f32_slice_to_bytes(&original);
2530 assert_eq!(bytes.len(), original.len() * 4);
2531
2532 let restored = bytes_to_f32_vec(&bytes);
2533 assert_eq!(original, restored);
2534 }
2535
2536 #[test]
2537 fn test_experience_with_all_type_variants() {
2538 let dir = tempdir().unwrap();
2539 let path = dir.path().join("test.db");
2540 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2541
2542 let collective = Collective::new("test", 384);
2543 storage.save_collective(&collective).unwrap();
2544
2545 let types = vec![
2547 ExperienceType::Difficulty {
2548 description: "test".into(),
2549 severity: Severity::High,
2550 },
2551 ExperienceType::Solution {
2552 problem_ref: None,
2553 approach: "test".into(),
2554 worked: true,
2555 },
2556 ExperienceType::ErrorPattern {
2557 signature: "E0308".into(),
2558 fix: "check types".into(),
2559 prevention: "use clippy".into(),
2560 },
2561 ExperienceType::SuccessPattern {
2562 task_type: "refactor".into(),
2563 approach: "extract method".into(),
2564 quality: 0.9,
2565 },
2566 ExperienceType::UserPreference {
2567 category: "style".into(),
2568 preference: "snake_case".into(),
2569 strength: 1.0,
2570 },
2571 ExperienceType::ArchitecturalDecision {
2572 decision: "use redb".into(),
2573 rationale: "pure Rust".into(),
2574 },
2575 ExperienceType::TechInsight {
2576 technology: "tokio".into(),
2577 insight: "spawn_blocking".into(),
2578 },
2579 ExperienceType::Fact {
2580 statement: "Rust is safe".into(),
2581 source: "docs".into(),
2582 },
2583 ExperienceType::Generic { category: None },
2584 ];
2585
2586 for experience_type in types {
2587 let mut exp = test_experience(collective.id, 384);
2588 exp.experience_type = experience_type;
2589 storage.save_experience(&exp).unwrap();
2590
2591 let retrieved = storage.get_experience(exp.id).unwrap().unwrap();
2593 assert_eq!(
2594 retrieved.experience_type.type_tag(),
2595 exp.experience_type.type_tag()
2596 );
2597 }
2598
2599 assert_eq!(
2600 storage
2601 .count_experiences_in_collective(collective.id)
2602 .unwrap(),
2603 9
2604 );
2605
2606 Box::new(storage).close().unwrap();
2607 }
2608
2609 #[test]
2610 fn test_reinforce_experience_atomic() {
2611 let dir = tempdir().unwrap();
2612 let path = dir.path().join("test.db");
2613 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2614
2615 let collective = Collective::new("test", 384);
2616 storage.save_collective(&collective).unwrap();
2617
2618 let exp = test_experience(collective.id, 384);
2619 let exp_id = exp.id;
2620 storage.save_experience(&exp).unwrap();
2621
2622 assert_eq!(storage.reinforce_experience(exp_id).unwrap(), Some(1));
2624 assert_eq!(storage.reinforce_experience(exp_id).unwrap(), Some(2));
2625 assert_eq!(storage.reinforce_experience(exp_id).unwrap(), Some(3));
2626
2627 let retrieved = storage.get_experience(exp_id).unwrap().unwrap();
2629 assert_eq!(retrieved.applications, 3);
2630
2631 let emb = storage.get_embedding(exp_id).unwrap().unwrap();
2633 assert_eq!(emb.len(), 384);
2634
2635 Box::new(storage).close().unwrap();
2636 }
2637
2638 #[test]
2639 fn test_reinforce_experience_nonexistent() {
2640 let dir = tempdir().unwrap();
2641 let path = dir.path().join("test.db");
2642 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2643
2644 let result = storage.reinforce_experience(ExperienceId::new()).unwrap();
2645 assert!(result.is_none());
2646
2647 Box::new(storage).close().unwrap();
2648 }
2649
2650 #[test]
2655 fn test_wal_sequence_starts_at_zero() {
2656 let dir = tempdir().unwrap();
2657 let path = dir.path().join("test.db");
2658 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2659
2660 assert_eq!(storage.get_wal_sequence().unwrap(), 0);
2661
2662 Box::new(storage).close().unwrap();
2663 }
2664
2665 #[test]
2666 fn test_save_experience_increments_wal_sequence() {
2667 let dir = tempdir().unwrap();
2668 let path = dir.path().join("test.db");
2669 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2670
2671 let collective = Collective::new("test", 384);
2672 storage.save_collective(&collective).unwrap();
2673 assert_eq!(storage.get_wal_sequence().unwrap(), 1);
2675
2676 let exp1 = test_experience(collective.id, 384);
2677 storage.save_experience(&exp1).unwrap();
2678 assert_eq!(storage.get_wal_sequence().unwrap(), 2);
2679
2680 let exp2 = test_experience(collective.id, 384);
2681 storage.save_experience(&exp2).unwrap();
2682 assert_eq!(storage.get_wal_sequence().unwrap(), 3);
2683
2684 Box::new(storage).close().unwrap();
2685 }
2686
2687 #[test]
2688 fn test_poll_watch_events_returns_correct_events() {
2689 let dir = tempdir().unwrap();
2690 let path = dir.path().join("test.db");
2691 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2692
2693 let collective = Collective::new("test", 384);
2694 storage.save_collective(&collective).unwrap();
2695 let exp1 = test_experience(collective.id, 384);
2698 let exp2 = test_experience(collective.id, 384);
2699 let exp3 = test_experience(collective.id, 384);
2700 storage.save_experience(&exp1).unwrap();
2701 storage.save_experience(&exp2).unwrap();
2702 storage.save_experience(&exp3).unwrap();
2703
2704 let (events, max_seq) = storage.poll_watch_events(0, 100).unwrap();
2706 assert_eq!(events.len(), 4);
2707 assert_eq!(max_seq, 4);
2708 assert!(events
2710 .iter()
2711 .all(|e| e.event_type == WatchEventTypeTag::Created));
2712 assert_eq!(events[1].entity_id, *exp1.id.as_bytes());
2714 assert_eq!(events[2].entity_id, *exp2.id.as_bytes());
2715 assert_eq!(events[3].entity_id, *exp3.id.as_bytes());
2716
2717 Box::new(storage).close().unwrap();
2718 }
2719
2720 #[test]
2721 fn test_poll_watch_events_since_midpoint() {
2722 let dir = tempdir().unwrap();
2723 let path = dir.path().join("test.db");
2724 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2725
2726 let collective = Collective::new("test", 384);
2727 storage.save_collective(&collective).unwrap();
2728
2729 for _ in 0..5 {
2731 let exp = test_experience(collective.id, 384);
2732 storage.save_experience(&exp).unwrap();
2733 }
2734
2735 let (events, max_seq) = storage.poll_watch_events(4, 100).unwrap();
2738 assert_eq!(events.len(), 2);
2739 assert_eq!(max_seq, 6);
2740
2741 Box::new(storage).close().unwrap();
2742 }
2743
2744 #[test]
2745 fn test_poll_watch_events_empty_when_caught_up() {
2746 let dir = tempdir().unwrap();
2747 let path = dir.path().join("test.db");
2748 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2749
2750 let collective = Collective::new("test", 384);
2751 storage.save_collective(&collective).unwrap();
2752
2753 let exp = test_experience(collective.id, 384);
2754 storage.save_experience(&exp).unwrap();
2755
2756 let (events, max_seq) = storage.poll_watch_events(0, 100).unwrap();
2758 assert_eq!(events.len(), 2);
2759 assert_eq!(max_seq, 2);
2760
2761 let (events, max_seq) = storage.poll_watch_events(2, 100).unwrap();
2763 assert_eq!(events.len(), 0);
2764 assert_eq!(max_seq, 2); Box::new(storage).close().unwrap();
2767 }
2768
2769 #[test]
2770 fn test_delete_records_watch_event() {
2771 let dir = tempdir().unwrap();
2772 let path = dir.path().join("test.db");
2773 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2774
2775 let collective = Collective::new("test", 384);
2776 storage.save_collective(&collective).unwrap();
2777
2778 let exp = test_experience(collective.id, 384);
2779 storage.save_experience(&exp).unwrap();
2780 storage.delete_experience(exp.id).unwrap();
2781
2782 let (events, max_seq) = storage.poll_watch_events(0, 100).unwrap();
2784 assert_eq!(events.len(), 3);
2785 assert_eq!(max_seq, 3);
2786 assert_eq!(events[0].event_type, WatchEventTypeTag::Created); assert_eq!(events[1].event_type, WatchEventTypeTag::Created); assert_eq!(events[2].event_type, WatchEventTypeTag::Deleted); assert_eq!(events[2].entity_id, *exp.id.as_bytes());
2790
2791 Box::new(storage).close().unwrap();
2792 }
2793
2794 #[test]
2795 fn test_update_records_watch_event() {
2796 let dir = tempdir().unwrap();
2797 let path = dir.path().join("test.db");
2798 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2799
2800 let collective = Collective::new("test", 384);
2801 storage.save_collective(&collective).unwrap();
2802
2803 let exp = test_experience(collective.id, 384);
2804 storage.save_experience(&exp).unwrap();
2805
2806 let update = ExperienceUpdate {
2807 importance: Some(0.99),
2808 ..Default::default()
2809 };
2810 storage.update_experience(exp.id, &update).unwrap();
2811
2812 let (events, _) = storage.poll_watch_events(0, 100).unwrap();
2814 assert_eq!(events.len(), 3);
2815 assert_eq!(events[2].event_type, WatchEventTypeTag::Updated);
2816
2817 Box::new(storage).close().unwrap();
2818 }
2819
2820 #[test]
2821 fn test_reinforce_records_watch_event() {
2822 let dir = tempdir().unwrap();
2823 let path = dir.path().join("test.db");
2824 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2825
2826 let collective = Collective::new("test", 384);
2827 storage.save_collective(&collective).unwrap();
2828
2829 let exp = test_experience(collective.id, 384);
2830 storage.save_experience(&exp).unwrap();
2831 storage.reinforce_experience(exp.id).unwrap();
2832
2833 let (events, _) = storage.poll_watch_events(0, 100).unwrap();
2835 assert_eq!(events.len(), 3);
2836 assert_eq!(events[1].event_type, WatchEventTypeTag::Created);
2837 assert_eq!(events[2].event_type, WatchEventTypeTag::Updated);
2838
2839 Box::new(storage).close().unwrap();
2840 }
2841
2842 #[test]
2843 fn test_archive_records_archived_event() {
2844 let dir = tempdir().unwrap();
2845 let path = dir.path().join("test.db");
2846 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2847
2848 let collective = Collective::new("test", 384);
2849 storage.save_collective(&collective).unwrap();
2850
2851 let exp = test_experience(collective.id, 384);
2852 storage.save_experience(&exp).unwrap();
2853
2854 let update = ExperienceUpdate {
2855 archived: Some(true),
2856 ..Default::default()
2857 };
2858 storage.update_experience(exp.id, &update).unwrap();
2859
2860 let (events, _) = storage.poll_watch_events(0, 100).unwrap();
2862 assert_eq!(events.len(), 3);
2863 assert_eq!(events[2].event_type, WatchEventTypeTag::Archived);
2864
2865 Box::new(storage).close().unwrap();
2866 }
2867
2868 #[test]
2869 fn test_poll_watch_events_batch_limit() {
2870 let dir = tempdir().unwrap();
2871 let path = dir.path().join("test.db");
2872 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2873
2874 let collective = Collective::new("test", 384);
2875 storage.save_collective(&collective).unwrap();
2876
2877 for _ in 0..10 {
2879 let exp = test_experience(collective.id, 384);
2880 storage.save_experience(&exp).unwrap();
2881 }
2882
2883 let (events, max_seq) = storage.poll_watch_events(0, 3).unwrap();
2885 assert_eq!(events.len(), 3);
2886 assert_eq!(max_seq, 3);
2887
2888 let (events, max_seq) = storage.poll_watch_events(3, 3).unwrap();
2890 assert_eq!(events.len(), 3);
2891 assert_eq!(max_seq, 6);
2892
2893 Box::new(storage).close().unwrap();
2894 }
2895}