1use std::path::{Path, PathBuf};
20
21use ::redb::{Database, ReadableTable};
22use tracing::{debug, info, instrument, warn};
23
24use crate::activity::Activity;
25use crate::collective::Collective;
26use crate::experience::{Experience, ExperienceUpdate};
27use crate::insight::DerivedInsight;
28use crate::relation::{ExperienceRelation, RelationType};
29use crate::types::{CollectiveId, ExperienceId, InsightId, RelationId, Timestamp};
30
31use super::schema::{
32 decode_collective_from_activity_key, encode_activity_key, encode_type_index_key,
33 DatabaseMetadata, ExperienceTypeTag, WatchEventRecord, WatchEventTypeTag, ACTIVITIES_TABLE,
34 COLLECTIVES_TABLE, EMBEDDINGS_TABLE, EXPERIENCES_BY_COLLECTIVE_TABLE,
35 EXPERIENCES_BY_TYPE_TABLE, EXPERIENCES_TABLE, INSIGHTS_BY_COLLECTIVE_TABLE, INSIGHTS_TABLE,
36 METADATA_TABLE, RELATIONS_BY_SOURCE_TABLE, RELATIONS_BY_TARGET_TABLE, RELATIONS_TABLE,
37 SCHEMA_VERSION, WAL_SEQUENCE_KEY, WATCH_EVENTS_TABLE,
38};
39use super::StorageEngine;
40use crate::config::{Config, EmbeddingDimension};
41use crate::error::{PulseDBError, Result, StorageError, ValidationError};
42
43const METADATA_KEY: &str = "db_metadata";
45
46#[derive(Debug)]
56pub struct RedbStorage {
57 db: Database,
59
60 metadata: DatabaseMetadata,
62
63 path: PathBuf,
65}
66
67impl RedbStorage {
68 #[instrument(skip(config), fields(path = %path.as_ref().display()))]
99 pub fn open(path: impl AsRef<Path>, config: &Config) -> Result<Self> {
100 let path = path.as_ref();
101 let db_exists = path.exists();
102
103 debug!(db_exists = db_exists, "Opening storage engine");
104
105 let db = Self::create_database(path, config)?;
107
108 if db_exists {
109 Self::open_existing(db, path.to_path_buf(), config)
111 } else {
112 Self::initialize_new(db, path.to_path_buf(), config)
114 }
115 }
116
117 fn create_database(path: &Path, _config: &Config) -> Result<Database> {
119 let builder = Database::builder();
120
121 let db = builder.create(path).map_err(|e| {
128 if e.to_string().contains("locked") {
129 StorageError::DatabaseLocked
130 } else {
131 StorageError::Redb(e.to_string())
132 }
133 })?;
134
135 debug!("Database file opened successfully");
136 Ok(db)
137 }
138
139 #[instrument(skip(db, config), fields(path = %path.display()))]
141 fn initialize_new(db: Database, path: PathBuf, config: &Config) -> Result<Self> {
142 info!("Initializing new database");
143
144 let metadata = DatabaseMetadata::new(config.embedding_dimension);
145
146 let write_txn = db.begin_write().map_err(StorageError::from)?;
148
149 {
150 let mut meta_table = write_txn.open_table(METADATA_TABLE)?;
152 let metadata_bytes = bincode::serialize(&metadata)
153 .map_err(|e| StorageError::serialization(e.to_string()))?;
154 meta_table.insert(METADATA_KEY, metadata_bytes.as_slice())?;
155
156 let _ = write_txn.open_table(COLLECTIVES_TABLE)?;
158 let _ = write_txn.open_table(EXPERIENCES_TABLE)?;
159 let _ = write_txn.open_table(EMBEDDINGS_TABLE)?;
160 let _ = write_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
161 let _ = write_txn.open_multimap_table(EXPERIENCES_BY_TYPE_TABLE)?;
162 let _ = write_txn.open_table(RELATIONS_TABLE)?;
163 let _ = write_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
164 let _ = write_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
165 let _ = write_txn.open_table(INSIGHTS_TABLE)?;
166 let _ = write_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
167 let _ = write_txn.open_table(ACTIVITIES_TABLE)?;
168 let _ = write_txn.open_table(WATCH_EVENTS_TABLE)?;
169 }
170
171 write_txn.commit().map_err(StorageError::from)?;
172
173 info!(
174 schema_version = SCHEMA_VERSION,
175 dimension = config.embedding_dimension.size(),
176 "Database initialized"
177 );
178
179 Ok(Self { db, metadata, path })
180 }
181
182 #[instrument(skip(db, config), fields(path = %path.display()))]
184 fn open_existing(db: Database, path: PathBuf, config: &Config) -> Result<Self> {
185 info!("Opening existing database");
186
187 let read_txn = db.begin_read().map_err(StorageError::from)?;
189
190 let metadata = {
191 let meta_table = read_txn.open_table(METADATA_TABLE).map_err(|e| {
192 StorageError::corrupted(format!("Cannot open metadata table: {}", e))
193 })?;
194
195 let metadata_bytes = meta_table
196 .get(METADATA_KEY)
197 .map_err(StorageError::from)?
198 .ok_or_else(|| StorageError::corrupted("Missing database metadata"))?;
199
200 bincode::deserialize::<DatabaseMetadata>(metadata_bytes.value())
201 .map_err(|e| StorageError::corrupted(format!("Invalid metadata format: {}", e)))?
202 };
203
204 drop(read_txn);
205
206 if metadata.schema_version != SCHEMA_VERSION {
208 warn!(
209 expected = SCHEMA_VERSION,
210 found = metadata.schema_version,
211 "Schema version mismatch"
212 );
213 return Err(PulseDBError::Storage(StorageError::SchemaVersionMismatch {
214 expected: SCHEMA_VERSION,
215 found: metadata.schema_version,
216 }));
217 }
218
219 if metadata.embedding_dimension != config.embedding_dimension {
221 warn!(
222 expected = config.embedding_dimension.size(),
223 found = metadata.embedding_dimension.size(),
224 "Embedding dimension mismatch"
225 );
226 return Err(PulseDBError::Validation(
227 ValidationError::DimensionMismatch {
228 expected: config.embedding_dimension.size(),
229 got: metadata.embedding_dimension.size(),
230 },
231 ));
232 }
233
234 let mut metadata = metadata;
236 metadata.touch();
237
238 let write_txn = db.begin_write().map_err(StorageError::from)?;
239 {
240 let mut meta_table = write_txn.open_table(METADATA_TABLE)?;
241 let metadata_bytes = bincode::serialize(&metadata)
242 .map_err(|e| StorageError::serialization(e.to_string()))?;
243 meta_table.insert(METADATA_KEY, metadata_bytes.as_slice())?;
244
245 let _ = write_txn.open_table(WATCH_EVENTS_TABLE)?;
247 }
248 write_txn.commit().map_err(StorageError::from)?;
249
250 info!(
251 schema_version = metadata.schema_version,
252 dimension = metadata.embedding_dimension.size(),
253 "Database opened successfully"
254 );
255
256 Ok(Self { db, metadata, path })
257 }
258
259 #[inline]
263 #[allow(dead_code)] pub(crate) fn database(&self) -> &Database {
265 &self.db
266 }
267
268 fn increment_wal_and_record(
282 &self,
283 write_txn: &::redb::WriteTransaction,
284 experience_id: ExperienceId,
285 collective_id: CollectiveId,
286 event_type: WatchEventTypeTag,
287 timestamp: Timestamp,
288 ) -> Result<u64> {
289 let mut meta_table = write_txn.open_table(METADATA_TABLE)?;
291 let current_seq = match meta_table.get(WAL_SEQUENCE_KEY)? {
292 Some(entry) => {
293 let bytes: [u8; 8] = entry
294 .value()
295 .try_into()
296 .map_err(|_| StorageError::corrupted("invalid wal_sequence bytes"))?;
297 u64::from_be_bytes(bytes)
298 }
299 None => 0,
300 };
301 let new_seq = current_seq + 1;
302
303 let seq_bytes = new_seq.to_be_bytes();
305 meta_table.insert(WAL_SEQUENCE_KEY, seq_bytes.as_slice())?;
306
307 let record = WatchEventRecord {
309 experience_id: *experience_id.as_bytes(),
310 collective_id: *collective_id.as_bytes(),
311 event_type,
312 timestamp_ms: timestamp.as_millis(),
313 };
314 let record_bytes =
315 bincode::serialize(&record).map_err(|e| StorageError::serialization(e.to_string()))?;
316
317 let mut events_table = write_txn.open_table(WATCH_EVENTS_TABLE)?;
318 events_table.insert(&seq_bytes, record_bytes.as_slice())?;
319
320 Ok(new_seq)
321 }
322
323 #[inline]
325 pub fn embedding_dimension(&self) -> EmbeddingDimension {
326 self.metadata.embedding_dimension
327 }
328}
329
330impl StorageEngine for RedbStorage {
331 fn metadata(&self) -> &DatabaseMetadata {
336 &self.metadata
337 }
338
339 #[instrument(skip(self))]
340 fn close(self: Box<Self>) -> Result<()> {
341 info!("Closing storage engine");
342
343 drop(self.db);
348
349 info!("Storage engine closed");
350 Ok(())
351 }
352
353 fn path(&self) -> Option<&Path> {
354 Some(&self.path)
355 }
356
357 fn save_collective(&self, collective: &Collective) -> Result<()> {
362 let bytes = bincode::serialize(collective)
363 .map_err(|e| StorageError::serialization(e.to_string()))?;
364
365 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
366 {
367 let mut table = write_txn.open_table(COLLECTIVES_TABLE)?;
368 table.insert(collective.id.as_bytes(), bytes.as_slice())?;
369 }
370 write_txn.commit().map_err(StorageError::from)?;
371
372 debug!(id = %collective.id, name = %collective.name, "Collective saved");
373 Ok(())
374 }
375
376 fn get_collective(&self, id: CollectiveId) -> Result<Option<Collective>> {
377 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
378 let table = read_txn.open_table(COLLECTIVES_TABLE)?;
379
380 match table.get(id.as_bytes())? {
381 Some(value) => {
382 let collective: Collective = bincode::deserialize(value.value())
383 .map_err(|e| StorageError::serialization(e.to_string()))?;
384 Ok(Some(collective))
385 }
386 None => Ok(None),
387 }
388 }
389
390 fn list_collectives(&self) -> Result<Vec<Collective>> {
391 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
392 let table = read_txn.open_table(COLLECTIVES_TABLE)?;
393
394 let mut collectives = Vec::new();
395 for result in table.iter()? {
396 let (_, value) = result.map_err(StorageError::from)?;
397 let collective: Collective = bincode::deserialize(value.value())
398 .map_err(|e| StorageError::serialization(e.to_string()))?;
399 collectives.push(collective);
400 }
401
402 Ok(collectives)
403 }
404
405 fn delete_collective(&self, id: CollectiveId) -> Result<bool> {
406 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
407 let existed;
408 {
409 let mut table = write_txn.open_table(COLLECTIVES_TABLE)?;
410 existed = table.remove(id.as_bytes())?.is_some();
411 }
412 write_txn.commit().map_err(StorageError::from)?;
413
414 if existed {
415 debug!(id = %id, "Collective deleted");
416 }
417 Ok(existed)
418 }
419
420 fn count_experiences_in_collective(&self, id: CollectiveId) -> Result<u64> {
425 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
426 let table = read_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
427
428 let count = table.get(id.as_bytes())?.count() as u64;
429
430 Ok(count)
431 }
432
433 fn delete_experiences_by_collective(&self, id: CollectiveId) -> Result<u64> {
434 let (exp_ids, relation_ids): (Vec<[u8; 16]>, Vec<[u8; 16]>) = {
436 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
437 let table = read_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
438
439 let mut ids = Vec::new();
440 for result in table.get(id.as_bytes())? {
441 let value = result.map_err(StorageError::from)?;
442 let entry = value.value();
443 let mut exp_id = [0u8; 16];
445 exp_id.copy_from_slice(&entry[8..24]);
446 ids.push(exp_id);
447 }
448
449 let mut rel_ids = std::collections::HashSet::new();
451 let source_table = read_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
452 let target_table = read_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
453 for exp_id in &ids {
454 for result in source_table.get(exp_id)? {
455 let value = result.map_err(StorageError::from)?;
456 rel_ids.insert(*value.value());
457 }
458 for result in target_table.get(exp_id)? {
459 let value = result.map_err(StorageError::from)?;
460 rel_ids.insert(*value.value());
461 }
462 }
463
464 (ids, rel_ids.into_iter().collect())
465 };
466
467 let count = exp_ids.len() as u64;
468 if count == 0 {
469 return Ok(0);
470 }
471
472 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
474 {
475 let mut exp_table = write_txn.open_table(EXPERIENCES_TABLE)?;
477 for exp_id in &exp_ids {
478 exp_table.remove(exp_id)?;
479 }
480 }
481 {
482 let mut emb_table = write_txn.open_table(EMBEDDINGS_TABLE)?;
484 for exp_id in &exp_ids {
485 emb_table.remove(exp_id)?;
486 }
487 }
488 {
489 let mut idx_table = write_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
491 idx_table.remove_all(id.as_bytes())?;
492 }
493 {
494 let mut type_table = write_txn.open_multimap_table(EXPERIENCES_BY_TYPE_TABLE)?;
496 for tag in ExperienceTypeTag::all() {
497 let key = encode_type_index_key(id.as_bytes(), *tag);
498 type_table.remove_all(&key)?;
499 }
500 }
501 {
502 if !relation_ids.is_empty() {
504 let mut rel_table = write_txn.open_table(RELATIONS_TABLE)?;
505 let mut source_idx = write_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
506 let mut target_idx = write_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
507
508 for exp_id in &exp_ids {
510 source_idx.remove_all(exp_id)?;
511 target_idx.remove_all(exp_id)?;
512 }
513 for rel_id in &relation_ids {
515 rel_table.remove(rel_id)?;
516 }
517
518 debug!(
519 count = relation_ids.len(),
520 "Cascade-deleted relations for collective"
521 );
522 }
523 }
524 write_txn.commit().map_err(StorageError::from)?;
525
526 debug!(id = %id, count = count, "Cascade-deleted experiences for collective");
527 Ok(count)
528 }
529
530 fn list_experience_ids_in_collective(&self, id: CollectiveId) -> Result<Vec<ExperienceId>> {
531 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
532 let table = read_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
533
534 let mut ids = Vec::new();
535 for result in table.get(id.as_bytes())? {
536 let value = result.map_err(StorageError::from)?;
537 let entry = value.value();
538 let mut exp_bytes = [0u8; 16];
540 exp_bytes.copy_from_slice(&entry[8..24]);
541 ids.push(ExperienceId::from_bytes(exp_bytes));
542 }
543
544 Ok(ids)
545 }
546
547 fn get_recent_experience_ids(
548 &self,
549 collective_id: CollectiveId,
550 limit: usize,
551 ) -> Result<Vec<(ExperienceId, Timestamp)>> {
552 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
553 let table = read_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
554
555 let mut entries = Vec::new();
559 for result in table.get(collective_id.as_bytes())? {
560 let value = result.map_err(StorageError::from)?;
561 let entry = value.value();
562 let mut ts_bytes = [0u8; 8];
564 ts_bytes.copy_from_slice(&entry[..8]);
565 let timestamp = Timestamp::from_millis(i64::from_be_bytes(ts_bytes));
566
567 let mut exp_bytes = [0u8; 16];
568 exp_bytes.copy_from_slice(&entry[8..24]);
569 entries.push((ExperienceId::from_bytes(exp_bytes), timestamp));
570 }
571
572 let start = entries.len().saturating_sub(limit);
574 let mut recent = entries.split_off(start);
575 recent.reverse();
576
577 Ok(recent)
578 }
579
580 fn save_experience(&self, experience: &Experience) -> Result<()> {
585 let exp_bytes = bincode::serialize(experience)
587 .map_err(|e| StorageError::serialization(e.to_string()))?;
588
589 let emb_bytes = f32_slice_to_bytes(&experience.embedding);
591
592 let type_key = encode_type_index_key(
594 experience.collective_id.as_bytes(),
595 experience.experience_type.type_tag(),
596 );
597
598 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
600 {
601 let mut exp_table = write_txn.open_table(EXPERIENCES_TABLE)?;
603 exp_table.insert(experience.id.as_bytes(), exp_bytes.as_slice())?;
604 }
605 {
606 let mut emb_table = write_txn.open_table(EMBEDDINGS_TABLE)?;
608 emb_table.insert(experience.id.as_bytes(), emb_bytes.as_slice())?;
609 }
610 {
611 let mut idx_table = write_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
613 let mut value = [0u8; 24];
615 value[..8].copy_from_slice(&experience.timestamp.to_be_bytes());
616 value[8..24].copy_from_slice(experience.id.as_bytes());
617 idx_table.insert(experience.collective_id.as_bytes(), &value)?;
618 }
619 {
620 let mut type_table = write_txn.open_multimap_table(EXPERIENCES_BY_TYPE_TABLE)?;
622 type_table.insert(&type_key, experience.id.as_bytes())?;
623 }
624 self.increment_wal_and_record(
626 &write_txn,
627 experience.id,
628 experience.collective_id,
629 WatchEventTypeTag::Created,
630 experience.timestamp,
631 )?;
632 write_txn.commit().map_err(StorageError::from)?;
633
634 debug!(
635 id = %experience.id,
636 collective_id = %experience.collective_id,
637 "Experience saved"
638 );
639 Ok(())
640 }
641
642 fn get_experience(&self, id: ExperienceId) -> Result<Option<Experience>> {
643 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
644
645 let exp_table = read_txn.open_table(EXPERIENCES_TABLE)?;
647 let exp_entry = match exp_table.get(id.as_bytes())? {
648 Some(v) => v,
649 None => return Ok(None),
650 };
651
652 let mut experience: Experience = bincode::deserialize(exp_entry.value())
653 .map_err(|e| StorageError::serialization(e.to_string()))?;
654
655 let emb_table = read_txn.open_table(EMBEDDINGS_TABLE)?;
657 if let Some(emb_entry) = emb_table.get(id.as_bytes())? {
658 experience.embedding = bytes_to_f32_vec(emb_entry.value());
659 }
660
661 Ok(Some(experience))
662 }
663
664 fn update_experience(&self, id: ExperienceId, update: &ExperienceUpdate) -> Result<bool> {
665 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
667 let collective_id;
668 let timestamp;
669 let is_archive;
670 {
671 let mut exp_table = write_txn.open_table(EXPERIENCES_TABLE)?;
672
673 let entry = match exp_table.get(id.as_bytes())? {
674 Some(v) => v,
675 None => return Ok(false),
676 };
677
678 let mut experience: Experience = bincode::deserialize(entry.value())
679 .map_err(|e| StorageError::serialization(e.to_string()))?;
680
681 drop(entry);
683
684 collective_id = experience.collective_id;
686 timestamp = experience.timestamp;
687 is_archive = update.archived == Some(true);
688
689 if let Some(importance) = update.importance {
691 experience.importance = importance;
692 }
693 if let Some(confidence) = update.confidence {
694 experience.confidence = confidence;
695 }
696 if let Some(ref domain) = update.domain {
697 experience.domain = domain.clone();
698 }
699 if let Some(ref related_files) = update.related_files {
700 experience.related_files = related_files.clone();
701 }
702 if let Some(archived) = update.archived {
703 experience.archived = archived;
704 }
705
706 let bytes = bincode::serialize(&experience)
708 .map_err(|e| StorageError::serialization(e.to_string()))?;
709 exp_table.insert(id.as_bytes(), bytes.as_slice())?;
710 }
711 let event_type = if is_archive {
713 WatchEventTypeTag::Archived
714 } else {
715 WatchEventTypeTag::Updated
716 };
717 self.increment_wal_and_record(&write_txn, id, collective_id, event_type, timestamp)?;
718 write_txn.commit().map_err(StorageError::from)?;
719
720 debug!(id = %id, "Experience updated");
721 Ok(true)
722 }
723
724 fn delete_experience(&self, id: ExperienceId) -> Result<bool> {
725 let (collective_id, timestamp, type_tag) = {
728 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
729 let exp_table = read_txn.open_table(EXPERIENCES_TABLE)?;
730
731 match exp_table.get(id.as_bytes())? {
732 Some(entry) => {
733 let exp: Experience = bincode::deserialize(entry.value())
734 .map_err(|e| StorageError::serialization(e.to_string()))?;
735 (
736 exp.collective_id,
737 exp.timestamp,
738 exp.experience_type.type_tag(),
739 )
740 }
741 None => return Ok(false),
742 }
743 };
744
745 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
747 {
748 let mut exp_table = write_txn.open_table(EXPERIENCES_TABLE)?;
749 exp_table.remove(id.as_bytes())?;
750 }
751 {
752 let mut emb_table = write_txn.open_table(EMBEDDINGS_TABLE)?;
753 emb_table.remove(id.as_bytes())?;
754 }
755 {
756 let mut idx_table = write_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
758 let mut value = [0u8; 24];
759 value[..8].copy_from_slice(×tamp.to_be_bytes());
760 value[8..24].copy_from_slice(id.as_bytes());
761 idx_table.remove(collective_id.as_bytes(), &value)?;
762 }
763 {
764 let mut type_table = write_txn.open_multimap_table(EXPERIENCES_BY_TYPE_TABLE)?;
766 let type_key = encode_type_index_key(collective_id.as_bytes(), type_tag);
767 type_table.remove(&type_key, id.as_bytes())?;
768 }
769 self.increment_wal_and_record(
771 &write_txn,
772 id,
773 collective_id,
774 WatchEventTypeTag::Deleted,
775 timestamp,
776 )?;
777 write_txn.commit().map_err(StorageError::from)?;
778
779 debug!(id = %id, "Experience deleted");
780 Ok(true)
781 }
782
783 fn reinforce_experience(&self, id: ExperienceId) -> Result<Option<u32>> {
784 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
785 let (new_count, collective_id, timestamp) = {
786 let mut exp_table = write_txn.open_table(EXPERIENCES_TABLE)?;
787
788 let entry = match exp_table.get(id.as_bytes())? {
789 Some(v) => v,
790 None => return Ok(None),
791 };
792
793 let mut experience: Experience = bincode::deserialize(entry.value())
794 .map_err(|e| StorageError::serialization(e.to_string()))?;
795 drop(entry);
796
797 experience.applications = experience.applications.saturating_add(1);
798 let new_count = experience.applications;
799 let collective_id = experience.collective_id;
800 let timestamp = experience.timestamp;
801
802 let bytes = bincode::serialize(&experience)
803 .map_err(|e| StorageError::serialization(e.to_string()))?;
804 exp_table.insert(id.as_bytes(), bytes.as_slice())?;
805 (new_count, collective_id, timestamp)
806 };
807 self.increment_wal_and_record(
809 &write_txn,
810 id,
811 collective_id,
812 WatchEventTypeTag::Updated,
813 timestamp,
814 )?;
815 write_txn.commit().map_err(StorageError::from)?;
816
817 debug!(id = %id, applications = new_count, "Experience reinforced");
818 Ok(Some(new_count))
819 }
820
821 fn save_embedding(&self, id: ExperienceId, embedding: &[f32]) -> Result<()> {
822 let bytes = f32_slice_to_bytes(embedding);
823
824 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
825 {
826 let mut table = write_txn.open_table(EMBEDDINGS_TABLE)?;
827 table.insert(id.as_bytes(), bytes.as_slice())?;
828 }
829 write_txn.commit().map_err(StorageError::from)?;
830
831 debug!(id = %id, dim = embedding.len(), "Embedding saved");
832 Ok(())
833 }
834
835 fn get_embedding(&self, id: ExperienceId) -> Result<Option<Vec<f32>>> {
836 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
837 let table = read_txn.open_table(EMBEDDINGS_TABLE)?;
838
839 match table.get(id.as_bytes())? {
840 Some(entry) => Ok(Some(bytes_to_f32_vec(entry.value()))),
841 None => Ok(None),
842 }
843 }
844
845 fn save_relation(&self, relation: &ExperienceRelation) -> Result<()> {
850 let bytes =
851 bincode::serialize(relation).map_err(|e| StorageError::serialization(e.to_string()))?;
852
853 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
854 {
855 let mut table = write_txn.open_table(RELATIONS_TABLE)?;
856 table.insert(relation.id.as_bytes(), bytes.as_slice())?;
857 }
858 {
859 let mut table = write_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
860 table.insert(relation.source_id.as_bytes(), relation.id.as_bytes())?;
861 }
862 {
863 let mut table = write_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
864 table.insert(relation.target_id.as_bytes(), relation.id.as_bytes())?;
865 }
866 write_txn.commit().map_err(StorageError::from)?;
867
868 debug!(id = %relation.id, "Relation saved");
869 Ok(())
870 }
871
872 fn get_relation(&self, id: RelationId) -> Result<Option<ExperienceRelation>> {
873 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
874 let table = read_txn.open_table(RELATIONS_TABLE)?;
875
876 match table.get(id.as_bytes())? {
877 Some(value) => {
878 let relation: ExperienceRelation = bincode::deserialize(value.value())
879 .map_err(|e| StorageError::serialization(e.to_string()))?;
880 Ok(Some(relation))
881 }
882 None => Ok(None),
883 }
884 }
885
886 fn delete_relation(&self, id: RelationId) -> Result<bool> {
887 let (source_id, target_id) = {
889 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
890 let table = read_txn.open_table(RELATIONS_TABLE)?;
891
892 match table.get(id.as_bytes())? {
893 Some(entry) => {
894 let rel: ExperienceRelation = bincode::deserialize(entry.value())
895 .map_err(|e| StorageError::serialization(e.to_string()))?;
896 (rel.source_id, rel.target_id)
897 }
898 None => return Ok(false),
899 }
900 };
901
902 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
904 {
905 let mut table = write_txn.open_table(RELATIONS_TABLE)?;
906 table.remove(id.as_bytes())?;
907 }
908 {
909 let mut table = write_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
910 table.remove(source_id.as_bytes(), id.as_bytes())?;
911 }
912 {
913 let mut table = write_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
914 table.remove(target_id.as_bytes(), id.as_bytes())?;
915 }
916 write_txn.commit().map_err(StorageError::from)?;
917
918 debug!(id = %id, "Relation deleted");
919 Ok(true)
920 }
921
922 fn get_relation_ids_by_source(&self, experience_id: ExperienceId) -> Result<Vec<RelationId>> {
923 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
924 let table = read_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
925
926 let mut ids = Vec::new();
927 for result in table.get(experience_id.as_bytes())? {
928 let value = result.map_err(StorageError::from)?;
929 let bytes = value.value();
930 ids.push(RelationId::from_bytes(*bytes));
931 }
932
933 Ok(ids)
934 }
935
936 fn get_relation_ids_by_target(&self, experience_id: ExperienceId) -> Result<Vec<RelationId>> {
937 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
938 let table = read_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
939
940 let mut ids = Vec::new();
941 for result in table.get(experience_id.as_bytes())? {
942 let value = result.map_err(StorageError::from)?;
943 let bytes = value.value();
944 ids.push(RelationId::from_bytes(*bytes));
945 }
946
947 Ok(ids)
948 }
949
950 fn delete_relations_for_experience(&self, experience_id: ExperienceId) -> Result<u64> {
951 let relation_ids: Vec<RelationId> = {
953 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
954 let source_table = read_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
955 let target_table = read_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
956
957 let mut ids = std::collections::HashSet::new();
958
959 for result in source_table.get(experience_id.as_bytes())? {
961 let value = result.map_err(StorageError::from)?;
962 ids.insert(RelationId::from_bytes(*value.value()));
963 }
964
965 for result in target_table.get(experience_id.as_bytes())? {
967 let value = result.map_err(StorageError::from)?;
968 ids.insert(RelationId::from_bytes(*value.value()));
969 }
970
971 ids.into_iter().collect()
972 };
973
974 let count = relation_ids.len() as u64;
975 if count == 0 {
976 return Ok(0);
977 }
978
979 let relations: Vec<ExperienceRelation> = {
981 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
982 let table = read_txn.open_table(RELATIONS_TABLE)?;
983
984 let mut rels = Vec::with_capacity(relation_ids.len());
985 for rel_id in &relation_ids {
986 if let Some(entry) = table.get(rel_id.as_bytes())? {
987 let rel: ExperienceRelation = bincode::deserialize(entry.value())
988 .map_err(|e| StorageError::serialization(e.to_string()))?;
989 rels.push(rel);
990 }
991 }
992 rels
993 };
994
995 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
997 {
998 let mut rel_table = write_txn.open_table(RELATIONS_TABLE)?;
999 for rel in &relations {
1000 rel_table.remove(rel.id.as_bytes())?;
1001 }
1002 }
1003 {
1004 let mut source_table = write_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
1005 for rel in &relations {
1006 source_table.remove(rel.source_id.as_bytes(), rel.id.as_bytes())?;
1007 }
1008 }
1009 {
1010 let mut target_table = write_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
1011 for rel in &relations {
1012 target_table.remove(rel.target_id.as_bytes(), rel.id.as_bytes())?;
1013 }
1014 }
1015 write_txn.commit().map_err(StorageError::from)?;
1016
1017 debug!(
1018 experience_id = %experience_id,
1019 count = count,
1020 "Cascade-deleted relations for experience"
1021 );
1022 Ok(count)
1023 }
1024
1025 fn relation_exists(
1026 &self,
1027 source_id: ExperienceId,
1028 target_id: ExperienceId,
1029 relation_type: RelationType,
1030 ) -> Result<bool> {
1031 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1032 let index_table = read_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
1033 let rel_table = read_txn.open_table(RELATIONS_TABLE)?;
1034
1035 for result in index_table.get(source_id.as_bytes())? {
1037 let value = result.map_err(StorageError::from)?;
1038 let rel_id = RelationId::from_bytes(*value.value());
1039
1040 if let Some(entry) = rel_table.get(rel_id.as_bytes())? {
1041 let rel: ExperienceRelation = bincode::deserialize(entry.value())
1042 .map_err(|e| StorageError::serialization(e.to_string()))?;
1043 if rel.target_id == target_id && rel.relation_type == relation_type {
1044 return Ok(true);
1045 }
1046 }
1047 }
1048
1049 Ok(false)
1050 }
1051
1052 fn save_insight(&self, insight: &DerivedInsight) -> Result<()> {
1057 let bytes =
1058 bincode::serialize(insight).map_err(|e| StorageError::serialization(e.to_string()))?;
1059
1060 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1061 {
1062 let mut table = write_txn.open_table(INSIGHTS_TABLE)?;
1063 table.insert(insight.id.as_bytes(), bytes.as_slice())?;
1064 }
1065 {
1066 let mut table = write_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
1067 table.insert(insight.collective_id.as_bytes(), insight.id.as_bytes())?;
1068 }
1069 write_txn.commit().map_err(StorageError::from)?;
1070
1071 debug!(id = %insight.id, collective_id = %insight.collective_id, "Insight saved");
1072 Ok(())
1073 }
1074
1075 fn get_insight(&self, id: InsightId) -> Result<Option<DerivedInsight>> {
1076 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1077 let table = read_txn.open_table(INSIGHTS_TABLE)?;
1078
1079 match table.get(id.as_bytes())? {
1080 Some(value) => {
1081 let insight: DerivedInsight = bincode::deserialize(value.value())
1082 .map_err(|e| StorageError::serialization(e.to_string()))?;
1083 Ok(Some(insight))
1084 }
1085 None => Ok(None),
1086 }
1087 }
1088
1089 fn delete_insight(&self, id: InsightId) -> Result<bool> {
1090 let collective_id = {
1092 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1093 let table = read_txn.open_table(INSIGHTS_TABLE)?;
1094
1095 match table.get(id.as_bytes())? {
1096 Some(entry) => {
1097 let insight: DerivedInsight = bincode::deserialize(entry.value())
1098 .map_err(|e| StorageError::serialization(e.to_string()))?;
1099 insight.collective_id
1100 }
1101 None => return Ok(false),
1102 }
1103 };
1104
1105 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1107 {
1108 let mut table = write_txn.open_table(INSIGHTS_TABLE)?;
1109 table.remove(id.as_bytes())?;
1110 }
1111 {
1112 let mut table = write_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
1113 table.remove(collective_id.as_bytes(), id.as_bytes())?;
1114 }
1115 write_txn.commit().map_err(StorageError::from)?;
1116
1117 debug!(id = %id, "Insight deleted");
1118 Ok(true)
1119 }
1120
1121 fn list_insight_ids_in_collective(&self, id: CollectiveId) -> Result<Vec<InsightId>> {
1122 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1123 let table = read_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
1124
1125 let mut ids = Vec::new();
1126 for result in table.get(id.as_bytes())? {
1127 let value = result.map_err(StorageError::from)?;
1128 ids.push(InsightId::from_bytes(*value.value()));
1129 }
1130
1131 Ok(ids)
1132 }
1133
1134 fn delete_insights_by_collective(&self, id: CollectiveId) -> Result<u64> {
1135 let insight_ids: Vec<[u8; 16]> = {
1137 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1138 let table = read_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
1139
1140 let mut ids = Vec::new();
1141 for result in table.get(id.as_bytes())? {
1142 let value = result.map_err(StorageError::from)?;
1143 ids.push(*value.value());
1144 }
1145 ids
1146 };
1147
1148 let count = insight_ids.len() as u64;
1149 if count == 0 {
1150 return Ok(0);
1151 }
1152
1153 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1155 {
1156 let mut table = write_txn.open_table(INSIGHTS_TABLE)?;
1157 for insight_id in &insight_ids {
1158 table.remove(insight_id)?;
1159 }
1160 }
1161 {
1162 let mut table = write_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
1163 table.remove_all(id.as_bytes())?;
1164 }
1165 write_txn.commit().map_err(StorageError::from)?;
1166
1167 debug!(id = %id, count = count, "Cascade-deleted insights for collective");
1168 Ok(count)
1169 }
1170
1171 fn save_activity(&self, activity: &Activity) -> Result<()> {
1176 let key = encode_activity_key(activity.collective_id.as_bytes(), &activity.agent_id);
1177 let bytes =
1178 bincode::serialize(activity).map_err(|e| StorageError::serialization(e.to_string()))?;
1179
1180 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1181 {
1182 let mut table = write_txn.open_table(ACTIVITIES_TABLE)?;
1183 table.insert(key.as_slice(), bytes.as_slice())?;
1184 }
1185 write_txn.commit().map_err(StorageError::from)?;
1186
1187 debug!(
1188 agent_id = %activity.agent_id,
1189 collective_id = %activity.collective_id,
1190 "Activity saved"
1191 );
1192 Ok(())
1193 }
1194
1195 fn get_activity(
1196 &self,
1197 agent_id: &str,
1198 collective_id: CollectiveId,
1199 ) -> Result<Option<Activity>> {
1200 let key = encode_activity_key(collective_id.as_bytes(), agent_id);
1201
1202 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1203 let table = read_txn.open_table(ACTIVITIES_TABLE)?;
1204
1205 match table.get(key.as_slice())? {
1206 Some(value) => {
1207 let activity: Activity = bincode::deserialize(value.value())
1208 .map_err(|e| StorageError::serialization(e.to_string()))?;
1209 Ok(Some(activity))
1210 }
1211 None => Ok(None),
1212 }
1213 }
1214
1215 fn delete_activity(&self, agent_id: &str, collective_id: CollectiveId) -> Result<bool> {
1216 let key = encode_activity_key(collective_id.as_bytes(), agent_id);
1217
1218 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1219 let existed = {
1220 let mut table = write_txn.open_table(ACTIVITIES_TABLE)?;
1221 let removed = table.remove(key.as_slice())?;
1222 removed.is_some()
1223 };
1224 write_txn.commit().map_err(StorageError::from)?;
1225
1226 if existed {
1227 debug!(agent_id = %agent_id, collective_id = %collective_id, "Activity deleted");
1228 }
1229 Ok(existed)
1230 }
1231
1232 fn list_activities_in_collective(&self, collective_id: CollectiveId) -> Result<Vec<Activity>> {
1233 let prefix = collective_id.as_bytes();
1234
1235 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1236 let table = read_txn.open_table(ACTIVITIES_TABLE)?;
1237
1238 let mut activities = Vec::new();
1239 for result in table.iter()? {
1240 let (key, value) = result.map_err(StorageError::from)?;
1241 let key_bytes = key.value();
1242
1243 if key_bytes.len() >= 16 && decode_collective_from_activity_key(key_bytes) == *prefix {
1245 let activity: Activity = bincode::deserialize(value.value())
1246 .map_err(|e| StorageError::serialization(e.to_string()))?;
1247 activities.push(activity);
1248 }
1249 }
1250
1251 Ok(activities)
1252 }
1253
1254 fn delete_activities_by_collective(&self, collective_id: CollectiveId) -> Result<u64> {
1255 let prefix = collective_id.as_bytes();
1256
1257 let keys_to_delete: Vec<Vec<u8>> = {
1259 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1260 let table = read_txn.open_table(ACTIVITIES_TABLE)?;
1261
1262 let mut keys = Vec::new();
1263 for result in table.iter()? {
1264 let (key, _) = result.map_err(StorageError::from)?;
1265 let key_bytes = key.value();
1266
1267 if key_bytes.len() >= 16
1268 && decode_collective_from_activity_key(key_bytes) == *prefix
1269 {
1270 keys.push(key_bytes.to_vec());
1271 }
1272 }
1273 keys
1274 };
1275
1276 let count = keys_to_delete.len() as u64;
1277 if count == 0 {
1278 return Ok(0);
1279 }
1280
1281 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1283 {
1284 let mut table = write_txn.open_table(ACTIVITIES_TABLE)?;
1285 for key in &keys_to_delete {
1286 table.remove(key.as_slice())?;
1287 }
1288 }
1289 write_txn.commit().map_err(StorageError::from)?;
1290
1291 debug!(
1292 collective_id = %collective_id,
1293 count = count,
1294 "Cascade-deleted activities for collective"
1295 );
1296 Ok(count)
1297 }
1298
1299 fn get_wal_sequence(&self) -> Result<u64> {
1304 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1305 let meta_table = read_txn.open_table(METADATA_TABLE)?;
1306 match meta_table.get(WAL_SEQUENCE_KEY)? {
1307 Some(entry) => {
1308 let bytes: [u8; 8] = entry
1309 .value()
1310 .try_into()
1311 .map_err(|_| StorageError::corrupted("invalid wal_sequence bytes"))?;
1312 Ok(u64::from_be_bytes(bytes))
1313 }
1314 None => Ok(0),
1315 }
1316 }
1317
1318 fn poll_watch_events(
1319 &self,
1320 since_seq: u64,
1321 limit: usize,
1322 ) -> Result<(Vec<WatchEventRecord>, u64)> {
1323 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1324 let events_table = read_txn.open_table(WATCH_EVENTS_TABLE)?;
1325
1326 let start_key = (since_seq + 1).to_be_bytes();
1327 let end_key = u64::MAX.to_be_bytes();
1328 let mut events = Vec::new();
1329 let mut max_seq = since_seq;
1330
1331 for entry in events_table.range::<&[u8; 8]>(&start_key..=&end_key)? {
1332 let (key, value) = entry.map_err(StorageError::from)?;
1333 let seq = u64::from_be_bytes(*key.value());
1334 let record: WatchEventRecord = bincode::deserialize(value.value())
1335 .map_err(|e| StorageError::serialization(e.to_string()))?;
1336 events.push(record);
1337 max_seq = seq;
1338 if events.len() >= limit {
1339 break;
1340 }
1341 }
1342
1343 Ok((events, max_seq))
1344 }
1345}
1346
1347#[inline]
1353fn f32_slice_to_bytes(data: &[f32]) -> Vec<u8> {
1354 let mut bytes = Vec::with_capacity(data.len() * 4);
1355 for &val in data {
1356 bytes.extend_from_slice(&val.to_le_bytes());
1357 }
1358 bytes
1359}
1360
1361#[inline]
1363fn bytes_to_f32_vec(data: &[u8]) -> Vec<f32> {
1364 data.chunks_exact(4)
1365 .map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
1366 .collect()
1367}
1368
1369#[cfg(test)]
1373mod tests {
1374 use super::*;
1375 use tempfile::tempdir;
1376
1377 fn default_config() -> Config {
1378 Config::default()
1379 }
1380
1381 #[test]
1382 fn test_open_creates_new_database() {
1383 let dir = tempdir().unwrap();
1384 let path = dir.path().join("test.db");
1385
1386 assert!(!path.exists());
1387
1388 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1389
1390 assert!(path.exists());
1391 assert_eq!(storage.metadata().schema_version, SCHEMA_VERSION);
1392 assert_eq!(
1393 storage.metadata().embedding_dimension,
1394 EmbeddingDimension::D384
1395 );
1396
1397 Box::new(storage).close().unwrap();
1398 }
1399
1400 #[test]
1401 fn test_open_existing_database() {
1402 let dir = tempdir().unwrap();
1403 let path = dir.path().join("test.db");
1404
1405 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1407 let created_at = storage.metadata().created_at;
1408 Box::new(storage).close().unwrap();
1409
1410 std::thread::sleep(std::time::Duration::from_millis(10));
1412 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1413
1414 assert_eq!(storage.metadata().created_at, created_at);
1416 assert!(storage.metadata().last_opened_at > created_at);
1418
1419 Box::new(storage).close().unwrap();
1420 }
1421
1422 #[test]
1423 fn test_dimension_mismatch_returns_error() {
1424 let dir = tempdir().unwrap();
1425 let path = dir.path().join("test.db");
1426
1427 let config_384 = Config {
1429 embedding_dimension: EmbeddingDimension::D384,
1430 ..Default::default()
1431 };
1432 let storage = RedbStorage::open(&path, &config_384).unwrap();
1433 Box::new(storage).close().unwrap();
1434
1435 let config_768 = Config {
1437 embedding_dimension: EmbeddingDimension::D768,
1438 ..Default::default()
1439 };
1440 let result = RedbStorage::open(&path, &config_768);
1441
1442 assert!(result.is_err());
1443 let err = result.unwrap_err();
1444 assert!(matches!(
1445 err,
1446 PulseDBError::Validation(ValidationError::DimensionMismatch { .. })
1447 ));
1448 }
1449
1450 #[test]
1451 fn test_database_files_created() {
1452 let dir = tempdir().unwrap();
1453 let path = dir.path().join("pulse.db");
1454
1455 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1456
1457 assert!(path.exists());
1459 assert!(storage.path().is_some());
1460 assert_eq!(storage.path().unwrap(), path);
1461
1462 Box::new(storage).close().unwrap();
1463 }
1464
1465 #[test]
1466 fn test_metadata_preserved_across_opens() {
1467 let dir = tempdir().unwrap();
1468 let path = dir.path().join("test.db");
1469
1470 let config = Config {
1471 embedding_dimension: EmbeddingDimension::Custom(512),
1472 ..Default::default()
1473 };
1474
1475 let storage = RedbStorage::open(&path, &config).unwrap();
1477 assert_eq!(
1478 storage.metadata().embedding_dimension,
1479 EmbeddingDimension::Custom(512)
1480 );
1481 Box::new(storage).close().unwrap();
1482
1483 let storage = RedbStorage::open(&path, &config).unwrap();
1485 assert_eq!(
1486 storage.metadata().embedding_dimension,
1487 EmbeddingDimension::Custom(512)
1488 );
1489 Box::new(storage).close().unwrap();
1490 }
1491
1492 #[test]
1493 fn test_embedding_dimension_accessor() {
1494 let dir = tempdir().unwrap();
1495 let path = dir.path().join("test.db");
1496
1497 let config = Config {
1498 embedding_dimension: EmbeddingDimension::D768,
1499 ..Default::default()
1500 };
1501
1502 let storage = RedbStorage::open(&path, &config).unwrap();
1503 assert_eq!(storage.embedding_dimension(), EmbeddingDimension::D768);
1504
1505 Box::new(storage).close().unwrap();
1506 }
1507
1508 #[test]
1509 fn test_all_six_tables_created() {
1510 let dir = tempdir().unwrap();
1511 let path = dir.path().join("test.db");
1512
1513 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1514
1515 let read_txn = storage.database().begin_read().unwrap();
1519
1520 read_txn.open_table(METADATA_TABLE).unwrap();
1521 read_txn.open_table(COLLECTIVES_TABLE).unwrap();
1522 read_txn.open_table(EXPERIENCES_TABLE).unwrap();
1523 read_txn.open_table(EMBEDDINGS_TABLE).unwrap();
1524 read_txn
1525 .open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)
1526 .unwrap();
1527 read_txn
1528 .open_multimap_table(EXPERIENCES_BY_TYPE_TABLE)
1529 .unwrap();
1530
1531 Box::new(storage).close().unwrap();
1532 }
1533
1534 #[test]
1539 fn test_save_and_get_collective() {
1540 let dir = tempdir().unwrap();
1541 let path = dir.path().join("test.db");
1542 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1543
1544 let collective = Collective::new("test-project", 384);
1545 let id = collective.id;
1546
1547 storage.save_collective(&collective).unwrap();
1548
1549 let retrieved = storage.get_collective(id).unwrap().unwrap();
1550 assert_eq!(retrieved.id, id);
1551 assert_eq!(retrieved.name, "test-project");
1552 assert_eq!(retrieved.embedding_dimension, 384);
1553 assert!(retrieved.owner_id.is_none());
1554
1555 Box::new(storage).close().unwrap();
1556 }
1557
1558 #[test]
1559 fn test_get_nonexistent_collective_returns_none() {
1560 let dir = tempdir().unwrap();
1561 let path = dir.path().join("test.db");
1562 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1563
1564 let result = storage.get_collective(CollectiveId::new()).unwrap();
1565 assert!(result.is_none());
1566
1567 Box::new(storage).close().unwrap();
1568 }
1569
1570 #[test]
1571 fn test_save_collective_overwrites_existing() {
1572 let dir = tempdir().unwrap();
1573 let path = dir.path().join("test.db");
1574 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1575
1576 let mut collective = Collective::new("original-name", 384);
1577 let id = collective.id;
1578 storage.save_collective(&collective).unwrap();
1579
1580 collective.name = "updated-name".to_string();
1582 storage.save_collective(&collective).unwrap();
1583
1584 let retrieved = storage.get_collective(id).unwrap().unwrap();
1585 assert_eq!(retrieved.name, "updated-name");
1586
1587 Box::new(storage).close().unwrap();
1588 }
1589
1590 #[test]
1591 fn test_list_collectives_empty() {
1592 let dir = tempdir().unwrap();
1593 let path = dir.path().join("test.db");
1594 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1595
1596 let collectives = storage.list_collectives().unwrap();
1597 assert!(collectives.is_empty());
1598
1599 Box::new(storage).close().unwrap();
1600 }
1601
1602 #[test]
1603 fn test_list_collectives_returns_all() {
1604 let dir = tempdir().unwrap();
1605 let path = dir.path().join("test.db");
1606 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1607
1608 let c1 = Collective::new("project-alpha", 384);
1609 let c2 = Collective::new("project-beta", 384);
1610 let c3 = Collective::new("project-gamma", 384);
1611
1612 storage.save_collective(&c1).unwrap();
1613 storage.save_collective(&c2).unwrap();
1614 storage.save_collective(&c3).unwrap();
1615
1616 let collectives = storage.list_collectives().unwrap();
1617 assert_eq!(collectives.len(), 3);
1618
1619 let ids: Vec<CollectiveId> = collectives.iter().map(|c| c.id).collect();
1621 assert!(ids.contains(&c1.id));
1622 assert!(ids.contains(&c2.id));
1623 assert!(ids.contains(&c3.id));
1624
1625 Box::new(storage).close().unwrap();
1626 }
1627
1628 #[test]
1629 fn test_delete_collective_existing() {
1630 let dir = tempdir().unwrap();
1631 let path = dir.path().join("test.db");
1632 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1633
1634 let collective = Collective::new("to-delete", 384);
1635 let id = collective.id;
1636 storage.save_collective(&collective).unwrap();
1637
1638 let deleted = storage.delete_collective(id).unwrap();
1640 assert!(deleted);
1641
1642 assert!(storage.get_collective(id).unwrap().is_none());
1644
1645 Box::new(storage).close().unwrap();
1646 }
1647
1648 #[test]
1649 fn test_delete_collective_nonexistent() {
1650 let dir = tempdir().unwrap();
1651 let path = dir.path().join("test.db");
1652 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1653
1654 let deleted = storage.delete_collective(CollectiveId::new()).unwrap();
1655 assert!(!deleted);
1656
1657 Box::new(storage).close().unwrap();
1658 }
1659
1660 #[test]
1665 fn test_uncommitted_transaction_is_invisible() {
1666 let dir = tempdir().unwrap();
1669 let path = dir.path().join("test.db");
1670 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1671
1672 let collective = Collective::new("phantom", 384);
1673 let id = collective.id;
1674 let bytes = bincode::serialize(&collective).unwrap();
1675
1676 {
1678 let write_txn = storage.database().begin_write().unwrap();
1679 {
1680 let mut table = write_txn.open_table(COLLECTIVES_TABLE).unwrap();
1681 table.insert(id.as_bytes(), bytes.as_slice()).unwrap();
1682 }
1683 }
1685
1686 let result = storage.get_collective(id).unwrap();
1688 assert!(result.is_none(), "Uncommitted data must not be visible");
1689
1690 Box::new(storage).close().unwrap();
1691 }
1692
1693 #[test]
1694 fn test_committed_transaction_is_visible() {
1695 let dir = tempdir().unwrap();
1698 let path = dir.path().join("test.db");
1699 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1700
1701 let collective = Collective::new("committed", 384);
1702 let id = collective.id;
1703
1704 storage.save_collective(&collective).unwrap();
1705
1706 let result = storage.get_collective(id).unwrap();
1707 assert!(result.is_some(), "Committed data must be visible");
1708
1709 Box::new(storage).close().unwrap();
1710 }
1711
1712 #[test]
1713 fn test_multi_table_atomicity() {
1714 let dir = tempdir().unwrap();
1718 let path = dir.path().join("test.db");
1719 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1720
1721 let collective = Collective::new("multi-table", 384);
1722 let id = collective.id;
1723 let collective_bytes = bincode::serialize(&collective).unwrap();
1724
1725 let write_txn = storage.database().begin_write().unwrap();
1727 {
1728 let mut coll_table = write_txn.open_table(COLLECTIVES_TABLE).unwrap();
1729 coll_table
1730 .insert(id.as_bytes(), collective_bytes.as_slice())
1731 .unwrap();
1732 }
1733 {
1734 let mut meta_table = write_txn.open_table(METADATA_TABLE).unwrap();
1735 meta_table
1736 .insert("test_marker", b"multi_table_test".as_slice())
1737 .unwrap();
1738 }
1739 write_txn.commit().unwrap();
1740
1741 let coll = storage.get_collective(id).unwrap();
1743 assert!(coll.is_some(), "Collective from multi-table txn must exist");
1744
1745 let read_txn = storage.database().begin_read().unwrap();
1746 let meta_table = read_txn.open_table(METADATA_TABLE).unwrap();
1747 let marker = meta_table.get("test_marker").unwrap();
1748 assert!(marker.is_some(), "Metadata from multi-table txn must exist");
1749
1750 Box::new(storage).close().unwrap();
1751 }
1752
1753 #[test]
1754 fn test_mvcc_read_consistency() {
1755 let dir = tempdir().unwrap();
1769 let path = dir.path().join("test.db");
1770 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1771
1772 let c1 = Collective::new("alpha", 384);
1774 let c2 = Collective::new("beta", 384);
1775 let c3 = Collective::new("gamma", 384);
1776
1777 storage.save_collective(&c1).unwrap();
1778 storage.save_collective(&c2).unwrap();
1779 storage.save_collective(&c3).unwrap();
1780
1781 storage.delete_collective(c2.id).unwrap();
1783
1784 let read_txn = storage.database().begin_read().unwrap();
1787 let table = read_txn.open_table(COLLECTIVES_TABLE).unwrap();
1788
1789 assert!(
1790 table.get(c1.id.as_bytes()).unwrap().is_some(),
1791 "c1 must be visible (committed)"
1792 );
1793 assert!(
1794 table.get(c2.id.as_bytes()).unwrap().is_none(),
1795 "c2 must be absent (deleted)"
1796 );
1797 assert!(
1798 table.get(c3.id.as_bytes()).unwrap().is_some(),
1799 "c3 must be visible (committed)"
1800 );
1801
1802 let count = table.iter().unwrap().count();
1804 assert_eq!(count, 2, "Exactly 2 collectives should exist");
1806
1807 drop(table);
1808 drop(read_txn);
1809
1810 Box::new(storage).close().unwrap();
1811 }
1812
1813 #[test]
1818 fn test_corruption_detection_invalid_metadata_bytes() {
1819 let dir = tempdir().unwrap();
1822 let path = dir.path().join("corrupt.db");
1823
1824 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1826 let write_txn = storage.database().begin_write().unwrap();
1827 {
1828 let mut meta = write_txn.open_table(METADATA_TABLE).unwrap();
1829 meta.insert(METADATA_KEY, b"not-valid-bincode-data".as_slice())
1830 .unwrap();
1831 }
1832 write_txn.commit().unwrap();
1833 Box::new(storage).close().unwrap();
1834
1835 let result = RedbStorage::open(&path, &default_config());
1837 assert!(result.is_err(), "Corrupted metadata must be rejected");
1838 let err = result.unwrap_err();
1839 match err {
1840 PulseDBError::Storage(StorageError::Corrupted(msg)) => {
1841 assert!(
1842 msg.contains("Invalid metadata format"),
1843 "Error should mention invalid format, got: {}",
1844 msg
1845 );
1846 }
1847 other => panic!("Expected StorageError::Corrupted, got: {:?}", other),
1848 }
1849 }
1850
1851 #[test]
1852 fn test_corruption_detection_missing_metadata_key() {
1853 let dir = tempdir().unwrap();
1856 let path = dir.path().join("no_key.db");
1857
1858 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1860 let write_txn = storage.database().begin_write().unwrap();
1861 {
1862 let mut meta = write_txn.open_table(METADATA_TABLE).unwrap();
1863 meta.remove(METADATA_KEY).unwrap();
1864 }
1865 write_txn.commit().unwrap();
1866 Box::new(storage).close().unwrap();
1867
1868 let result = RedbStorage::open(&path, &default_config());
1870 assert!(result.is_err(), "Missing metadata key must be rejected");
1871 let err = result.unwrap_err();
1872 match err {
1873 PulseDBError::Storage(StorageError::Corrupted(msg)) => {
1874 assert!(
1875 msg.contains("Missing database metadata"),
1876 "Error should mention missing metadata, got: {}",
1877 msg
1878 );
1879 }
1880 other => panic!("Expected StorageError::Corrupted, got: {:?}", other),
1881 }
1882 }
1883
1884 #[test]
1885 fn test_corruption_detection_missing_metadata_table() {
1886 let dir = tempdir().unwrap();
1890 let path = dir.path().join("no_table.db");
1891
1892 {
1894 let db = ::redb::Database::create(&path).unwrap();
1895 let write_txn = db.begin_write().unwrap();
1896 {
1897 let dummy: ::redb::TableDefinition<&str, &str> =
1898 ::redb::TableDefinition::new("dummy");
1899 let mut table = write_txn.open_table(dummy).unwrap();
1900 table.insert("key", "value").unwrap();
1901 }
1902 write_txn.commit().unwrap();
1903 }
1904
1905 let result = RedbStorage::open(&path, &default_config());
1907 assert!(result.is_err(), "Missing metadata table must be rejected");
1908 let err = result.unwrap_err();
1909 match err {
1910 PulseDBError::Storage(StorageError::Corrupted(msg)) => {
1911 assert!(
1912 msg.contains("Cannot open metadata table"),
1913 "Error should mention metadata table, got: {}",
1914 msg
1915 );
1916 }
1917 other => panic!("Expected StorageError::Corrupted, got: {:?}", other),
1918 }
1919 }
1920
1921 use crate::experience::{Experience, ExperienceType, ExperienceUpdate, Severity};
1926 use crate::types::{AgentId, ExperienceId, Timestamp};
1927
1928 fn test_experience(collective_id: CollectiveId, dim: usize) -> Experience {
1930 Experience {
1931 id: ExperienceId::new(),
1932 collective_id,
1933 content: "Test experience content".into(),
1934 embedding: vec![0.42; dim],
1935 experience_type: ExperienceType::Fact {
1936 statement: "redb uses shadow paging".into(),
1937 source: "docs".into(),
1938 },
1939 importance: 0.8,
1940 confidence: 0.7,
1941 applications: 0,
1942 domain: vec!["rust".into(), "databases".into()],
1943 related_files: vec!["src/storage/redb.rs".into()],
1944 source_agent: AgentId::new("test-agent"),
1945 source_task: None,
1946 timestamp: Timestamp::now(),
1947 archived: false,
1948 }
1949 }
1950
1951 #[test]
1952 fn test_save_and_get_experience() {
1953 let dir = tempdir().unwrap();
1954 let path = dir.path().join("test.db");
1955 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1956
1957 let collective = Collective::new("test", 384);
1958 storage.save_collective(&collective).unwrap();
1959
1960 let exp = test_experience(collective.id, 384);
1961 let exp_id = exp.id;
1962
1963 storage.save_experience(&exp).unwrap();
1964
1965 let retrieved = storage.get_experience(exp_id).unwrap().unwrap();
1966 assert_eq!(retrieved.id, exp_id);
1967 assert_eq!(retrieved.collective_id, collective.id);
1968 assert_eq!(retrieved.content, "Test experience content");
1969 assert_eq!(retrieved.importance, 0.8);
1970 assert_eq!(retrieved.confidence, 0.7);
1971 assert_eq!(retrieved.applications, 0);
1972 assert_eq!(retrieved.domain, vec!["rust", "databases"]);
1973 assert!(!retrieved.archived);
1974 assert_eq!(retrieved.embedding.len(), 384);
1976 assert_eq!(retrieved.embedding[0], 0.42);
1977
1978 Box::new(storage).close().unwrap();
1979 }
1980
1981 #[test]
1982 fn test_get_nonexistent_experience_returns_none() {
1983 let dir = tempdir().unwrap();
1984 let path = dir.path().join("test.db");
1985 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1986
1987 let result = storage.get_experience(ExperienceId::new()).unwrap();
1988 assert!(result.is_none());
1989
1990 Box::new(storage).close().unwrap();
1991 }
1992
1993 #[test]
1994 fn test_update_experience_fields() {
1995 let dir = tempdir().unwrap();
1996 let path = dir.path().join("test.db");
1997 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1998
1999 let collective = Collective::new("test", 384);
2000 storage.save_collective(&collective).unwrap();
2001
2002 let exp = test_experience(collective.id, 384);
2003 let exp_id = exp.id;
2004 storage.save_experience(&exp).unwrap();
2005
2006 let update = ExperienceUpdate {
2008 importance: Some(0.95),
2009 domain: Some(vec!["updated-tag".into()]),
2010 ..Default::default()
2011 };
2012 let updated = storage.update_experience(exp_id, &update).unwrap();
2013 assert!(updated);
2014
2015 let retrieved = storage.get_experience(exp_id).unwrap().unwrap();
2016 assert_eq!(retrieved.importance, 0.95);
2017 assert_eq!(retrieved.domain, vec!["updated-tag"]);
2018 assert_eq!(retrieved.confidence, 0.7);
2020
2021 Box::new(storage).close().unwrap();
2022 }
2023
2024 #[test]
2025 fn test_update_nonexistent_experience_returns_false() {
2026 let dir = tempdir().unwrap();
2027 let path = dir.path().join("test.db");
2028 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2029
2030 let update = ExperienceUpdate {
2031 importance: Some(0.5),
2032 ..Default::default()
2033 };
2034 let result = storage
2035 .update_experience(ExperienceId::new(), &update)
2036 .unwrap();
2037 assert!(!result);
2038
2039 Box::new(storage).close().unwrap();
2040 }
2041
2042 #[test]
2043 fn test_delete_experience() {
2044 let dir = tempdir().unwrap();
2045 let path = dir.path().join("test.db");
2046 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2047
2048 let collective = Collective::new("test", 384);
2049 storage.save_collective(&collective).unwrap();
2050
2051 let exp = test_experience(collective.id, 384);
2052 let exp_id = exp.id;
2053 storage.save_experience(&exp).unwrap();
2054
2055 assert!(storage.get_experience(exp_id).unwrap().is_some());
2057
2058 let deleted = storage.delete_experience(exp_id).unwrap();
2060 assert!(deleted);
2061
2062 assert!(storage.get_experience(exp_id).unwrap().is_none());
2064 assert!(storage.get_embedding(exp_id).unwrap().is_none());
2065
2066 assert_eq!(
2068 storage
2069 .count_experiences_in_collective(collective.id)
2070 .unwrap(),
2071 0
2072 );
2073
2074 Box::new(storage).close().unwrap();
2075 }
2076
2077 #[test]
2078 fn test_delete_nonexistent_experience_returns_false() {
2079 let dir = tempdir().unwrap();
2080 let path = dir.path().join("test.db");
2081 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2082
2083 let result = storage.delete_experience(ExperienceId::new()).unwrap();
2084 assert!(!result);
2085
2086 Box::new(storage).close().unwrap();
2087 }
2088
2089 #[test]
2090 fn test_save_and_get_embedding() {
2091 let dir = tempdir().unwrap();
2092 let path = dir.path().join("test.db");
2093 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2094
2095 let id = ExperienceId::new();
2096 let embedding = vec![0.1, 0.2, 0.3, -0.5, 1.0, f32::MIN_POSITIVE];
2097
2098 storage.save_embedding(id, &embedding).unwrap();
2099
2100 let retrieved = storage.get_embedding(id).unwrap().unwrap();
2101 assert_eq!(retrieved, embedding);
2102
2103 Box::new(storage).close().unwrap();
2104 }
2105
2106 #[test]
2107 fn test_experience_by_collective_index() {
2108 let dir = tempdir().unwrap();
2109 let path = dir.path().join("test.db");
2110 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2111
2112 let collective = Collective::new("test", 384);
2113 storage.save_collective(&collective).unwrap();
2114
2115 for _ in 0..3 {
2117 let exp = test_experience(collective.id, 384);
2118 storage.save_experience(&exp).unwrap();
2119 }
2120
2121 assert_eq!(
2123 storage
2124 .count_experiences_in_collective(collective.id)
2125 .unwrap(),
2126 3
2127 );
2128
2129 Box::new(storage).close().unwrap();
2130 }
2131
2132 #[test]
2133 fn test_cascade_delete_includes_experiences() {
2134 let dir = tempdir().unwrap();
2135 let path = dir.path().join("test.db");
2136 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2137
2138 let collective = Collective::new("test", 384);
2139 storage.save_collective(&collective).unwrap();
2140
2141 let exp1 = test_experience(collective.id, 384);
2142 let exp2 = test_experience(collective.id, 384);
2143 let id1 = exp1.id;
2144 let id2 = exp2.id;
2145 storage.save_experience(&exp1).unwrap();
2146 storage.save_experience(&exp2).unwrap();
2147
2148 let count = storage
2150 .delete_experiences_by_collective(collective.id)
2151 .unwrap();
2152 assert_eq!(count, 2);
2153
2154 assert!(storage.get_experience(id1).unwrap().is_none());
2156 assert!(storage.get_experience(id2).unwrap().is_none());
2157 assert!(storage.get_embedding(id1).unwrap().is_none());
2158 assert!(storage.get_embedding(id2).unwrap().is_none());
2159
2160 Box::new(storage).close().unwrap();
2161 }
2162
2163 #[test]
2164 fn test_update_experience_archived_flag() {
2165 let dir = tempdir().unwrap();
2166 let path = dir.path().join("test.db");
2167 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2168
2169 let collective = Collective::new("test", 384);
2170 storage.save_collective(&collective).unwrap();
2171
2172 let exp = test_experience(collective.id, 384);
2173 let exp_id = exp.id;
2174 storage.save_experience(&exp).unwrap();
2175
2176 let update = ExperienceUpdate {
2178 archived: Some(true),
2179 ..Default::default()
2180 };
2181 storage.update_experience(exp_id, &update).unwrap();
2182
2183 let retrieved = storage.get_experience(exp_id).unwrap().unwrap();
2184 assert!(retrieved.archived);
2185
2186 let update = ExperienceUpdate {
2188 archived: Some(false),
2189 ..Default::default()
2190 };
2191 storage.update_experience(exp_id, &update).unwrap();
2192
2193 let retrieved = storage.get_experience(exp_id).unwrap().unwrap();
2194 assert!(!retrieved.archived);
2195
2196 Box::new(storage).close().unwrap();
2197 }
2198
2199 #[test]
2200 fn test_f32_byte_conversion_roundtrip() {
2201 let original = vec![0.0, 1.0, -1.0, f32::MAX, f32::MIN, std::f32::consts::PI];
2202 let bytes = f32_slice_to_bytes(&original);
2203 assert_eq!(bytes.len(), original.len() * 4);
2204
2205 let restored = bytes_to_f32_vec(&bytes);
2206 assert_eq!(original, restored);
2207 }
2208
2209 #[test]
2210 fn test_experience_with_all_type_variants() {
2211 let dir = tempdir().unwrap();
2212 let path = dir.path().join("test.db");
2213 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2214
2215 let collective = Collective::new("test", 384);
2216 storage.save_collective(&collective).unwrap();
2217
2218 let types = vec![
2220 ExperienceType::Difficulty {
2221 description: "test".into(),
2222 severity: Severity::High,
2223 },
2224 ExperienceType::Solution {
2225 problem_ref: None,
2226 approach: "test".into(),
2227 worked: true,
2228 },
2229 ExperienceType::ErrorPattern {
2230 signature: "E0308".into(),
2231 fix: "check types".into(),
2232 prevention: "use clippy".into(),
2233 },
2234 ExperienceType::SuccessPattern {
2235 task_type: "refactor".into(),
2236 approach: "extract method".into(),
2237 quality: 0.9,
2238 },
2239 ExperienceType::UserPreference {
2240 category: "style".into(),
2241 preference: "snake_case".into(),
2242 strength: 1.0,
2243 },
2244 ExperienceType::ArchitecturalDecision {
2245 decision: "use redb".into(),
2246 rationale: "pure Rust".into(),
2247 },
2248 ExperienceType::TechInsight {
2249 technology: "tokio".into(),
2250 insight: "spawn_blocking".into(),
2251 },
2252 ExperienceType::Fact {
2253 statement: "Rust is safe".into(),
2254 source: "docs".into(),
2255 },
2256 ExperienceType::Generic { category: None },
2257 ];
2258
2259 for experience_type in types {
2260 let mut exp = test_experience(collective.id, 384);
2261 exp.experience_type = experience_type;
2262 storage.save_experience(&exp).unwrap();
2263
2264 let retrieved = storage.get_experience(exp.id).unwrap().unwrap();
2266 assert_eq!(
2267 retrieved.experience_type.type_tag(),
2268 exp.experience_type.type_tag()
2269 );
2270 }
2271
2272 assert_eq!(
2273 storage
2274 .count_experiences_in_collective(collective.id)
2275 .unwrap(),
2276 9
2277 );
2278
2279 Box::new(storage).close().unwrap();
2280 }
2281
2282 #[test]
2283 fn test_reinforce_experience_atomic() {
2284 let dir = tempdir().unwrap();
2285 let path = dir.path().join("test.db");
2286 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2287
2288 let collective = Collective::new("test", 384);
2289 storage.save_collective(&collective).unwrap();
2290
2291 let exp = test_experience(collective.id, 384);
2292 let exp_id = exp.id;
2293 storage.save_experience(&exp).unwrap();
2294
2295 assert_eq!(storage.reinforce_experience(exp_id).unwrap(), Some(1));
2297 assert_eq!(storage.reinforce_experience(exp_id).unwrap(), Some(2));
2298 assert_eq!(storage.reinforce_experience(exp_id).unwrap(), Some(3));
2299
2300 let retrieved = storage.get_experience(exp_id).unwrap().unwrap();
2302 assert_eq!(retrieved.applications, 3);
2303
2304 let emb = storage.get_embedding(exp_id).unwrap().unwrap();
2306 assert_eq!(emb.len(), 384);
2307
2308 Box::new(storage).close().unwrap();
2309 }
2310
2311 #[test]
2312 fn test_reinforce_experience_nonexistent() {
2313 let dir = tempdir().unwrap();
2314 let path = dir.path().join("test.db");
2315 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2316
2317 let result = storage.reinforce_experience(ExperienceId::new()).unwrap();
2318 assert!(result.is_none());
2319
2320 Box::new(storage).close().unwrap();
2321 }
2322
2323 #[test]
2328 fn test_wal_sequence_starts_at_zero() {
2329 let dir = tempdir().unwrap();
2330 let path = dir.path().join("test.db");
2331 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2332
2333 assert_eq!(storage.get_wal_sequence().unwrap(), 0);
2334
2335 Box::new(storage).close().unwrap();
2336 }
2337
2338 #[test]
2339 fn test_save_experience_increments_wal_sequence() {
2340 let dir = tempdir().unwrap();
2341 let path = dir.path().join("test.db");
2342 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2343
2344 let collective = Collective::new("test", 384);
2345 storage.save_collective(&collective).unwrap();
2346
2347 let exp1 = test_experience(collective.id, 384);
2348 storage.save_experience(&exp1).unwrap();
2349 assert_eq!(storage.get_wal_sequence().unwrap(), 1);
2350
2351 let exp2 = test_experience(collective.id, 384);
2352 storage.save_experience(&exp2).unwrap();
2353 assert_eq!(storage.get_wal_sequence().unwrap(), 2);
2354
2355 Box::new(storage).close().unwrap();
2356 }
2357
2358 #[test]
2359 fn test_poll_watch_events_returns_correct_events() {
2360 let dir = tempdir().unwrap();
2361 let path = dir.path().join("test.db");
2362 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2363
2364 let collective = Collective::new("test", 384);
2365 storage.save_collective(&collective).unwrap();
2366
2367 let exp1 = test_experience(collective.id, 384);
2368 let exp2 = test_experience(collective.id, 384);
2369 let exp3 = test_experience(collective.id, 384);
2370 storage.save_experience(&exp1).unwrap();
2371 storage.save_experience(&exp2).unwrap();
2372 storage.save_experience(&exp3).unwrap();
2373
2374 let (events, max_seq) = storage.poll_watch_events(0, 100).unwrap();
2375 assert_eq!(events.len(), 3);
2376 assert_eq!(max_seq, 3);
2377 assert!(events
2379 .iter()
2380 .all(|e| e.event_type == WatchEventTypeTag::Created));
2381 assert_eq!(events[0].experience_id, *exp1.id.as_bytes());
2383 assert_eq!(events[1].experience_id, *exp2.id.as_bytes());
2384 assert_eq!(events[2].experience_id, *exp3.id.as_bytes());
2385
2386 Box::new(storage).close().unwrap();
2387 }
2388
2389 #[test]
2390 fn test_poll_watch_events_since_midpoint() {
2391 let dir = tempdir().unwrap();
2392 let path = dir.path().join("test.db");
2393 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2394
2395 let collective = Collective::new("test", 384);
2396 storage.save_collective(&collective).unwrap();
2397
2398 for _ in 0..5 {
2400 let exp = test_experience(collective.id, 384);
2401 storage.save_experience(&exp).unwrap();
2402 }
2403
2404 let (events, max_seq) = storage.poll_watch_events(3, 100).unwrap();
2406 assert_eq!(events.len(), 2);
2407 assert_eq!(max_seq, 5);
2408
2409 Box::new(storage).close().unwrap();
2410 }
2411
2412 #[test]
2413 fn test_poll_watch_events_empty_when_caught_up() {
2414 let dir = tempdir().unwrap();
2415 let path = dir.path().join("test.db");
2416 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2417
2418 let collective = Collective::new("test", 384);
2419 storage.save_collective(&collective).unwrap();
2420
2421 let exp = test_experience(collective.id, 384);
2422 storage.save_experience(&exp).unwrap();
2423
2424 let (events, max_seq) = storage.poll_watch_events(0, 100).unwrap();
2426 assert_eq!(events.len(), 1);
2427 assert_eq!(max_seq, 1);
2428
2429 let (events, max_seq) = storage.poll_watch_events(1, 100).unwrap();
2431 assert_eq!(events.len(), 0);
2432 assert_eq!(max_seq, 1); Box::new(storage).close().unwrap();
2435 }
2436
2437 #[test]
2438 fn test_delete_records_watch_event() {
2439 let dir = tempdir().unwrap();
2440 let path = dir.path().join("test.db");
2441 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2442
2443 let collective = Collective::new("test", 384);
2444 storage.save_collective(&collective).unwrap();
2445
2446 let exp = test_experience(collective.id, 384);
2447 storage.save_experience(&exp).unwrap();
2448 storage.delete_experience(exp.id).unwrap();
2449
2450 let (events, max_seq) = storage.poll_watch_events(0, 100).unwrap();
2451 assert_eq!(events.len(), 2);
2452 assert_eq!(max_seq, 2);
2453 assert_eq!(events[0].event_type, WatchEventTypeTag::Created);
2454 assert_eq!(events[1].event_type, WatchEventTypeTag::Deleted);
2455 assert_eq!(events[1].experience_id, *exp.id.as_bytes());
2456
2457 Box::new(storage).close().unwrap();
2458 }
2459
2460 #[test]
2461 fn test_update_records_watch_event() {
2462 let dir = tempdir().unwrap();
2463 let path = dir.path().join("test.db");
2464 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2465
2466 let collective = Collective::new("test", 384);
2467 storage.save_collective(&collective).unwrap();
2468
2469 let exp = test_experience(collective.id, 384);
2470 storage.save_experience(&exp).unwrap();
2471
2472 let update = ExperienceUpdate {
2473 importance: Some(0.99),
2474 ..Default::default()
2475 };
2476 storage.update_experience(exp.id, &update).unwrap();
2477
2478 let (events, _) = storage.poll_watch_events(0, 100).unwrap();
2479 assert_eq!(events.len(), 2);
2480 assert_eq!(events[1].event_type, WatchEventTypeTag::Updated);
2481
2482 Box::new(storage).close().unwrap();
2483 }
2484
2485 #[test]
2486 fn test_reinforce_records_watch_event() {
2487 let dir = tempdir().unwrap();
2488 let path = dir.path().join("test.db");
2489 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2490
2491 let collective = Collective::new("test", 384);
2492 storage.save_collective(&collective).unwrap();
2493
2494 let exp = test_experience(collective.id, 384);
2495 storage.save_experience(&exp).unwrap();
2496 storage.reinforce_experience(exp.id).unwrap();
2497
2498 let (events, _) = storage.poll_watch_events(0, 100).unwrap();
2499 assert_eq!(events.len(), 2);
2500 assert_eq!(events[0].event_type, WatchEventTypeTag::Created);
2501 assert_eq!(events[1].event_type, WatchEventTypeTag::Updated);
2502
2503 Box::new(storage).close().unwrap();
2504 }
2505
2506 #[test]
2507 fn test_archive_records_archived_event() {
2508 let dir = tempdir().unwrap();
2509 let path = dir.path().join("test.db");
2510 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2511
2512 let collective = Collective::new("test", 384);
2513 storage.save_collective(&collective).unwrap();
2514
2515 let exp = test_experience(collective.id, 384);
2516 storage.save_experience(&exp).unwrap();
2517
2518 let update = ExperienceUpdate {
2519 archived: Some(true),
2520 ..Default::default()
2521 };
2522 storage.update_experience(exp.id, &update).unwrap();
2523
2524 let (events, _) = storage.poll_watch_events(0, 100).unwrap();
2525 assert_eq!(events.len(), 2);
2526 assert_eq!(events[1].event_type, WatchEventTypeTag::Archived);
2527
2528 Box::new(storage).close().unwrap();
2529 }
2530
2531 #[test]
2532 fn test_poll_watch_events_batch_limit() {
2533 let dir = tempdir().unwrap();
2534 let path = dir.path().join("test.db");
2535 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2536
2537 let collective = Collective::new("test", 384);
2538 storage.save_collective(&collective).unwrap();
2539
2540 for _ in 0..10 {
2542 let exp = test_experience(collective.id, 384);
2543 storage.save_experience(&exp).unwrap();
2544 }
2545
2546 let (events, max_seq) = storage.poll_watch_events(0, 3).unwrap();
2548 assert_eq!(events.len(), 3);
2549 assert_eq!(max_seq, 3);
2550
2551 let (events, max_seq) = storage.poll_watch_events(3, 3).unwrap();
2553 assert_eq!(events.len(), 3);
2554 assert_eq!(max_seq, 6);
2555
2556 Box::new(storage).close().unwrap();
2557 }
2558}