1use std::path::{Path, PathBuf};
20
21use ::redb::{Database, ReadableTable};
22use tracing::{debug, info, instrument, warn};
23
24use crate::activity::Activity;
25use crate::collective::Collective;
26use crate::experience::{Experience, ExperienceUpdate};
27use crate::insight::DerivedInsight;
28use crate::relation::{ExperienceRelation, RelationType};
29use crate::types::{CollectiveId, ExperienceId, InsightId, RelationId, Timestamp};
30
31use super::schema::{
32 decode_collective_from_activity_key, encode_activity_key, encode_type_index_key,
33 DatabaseMetadata, EntityTypeTag, ExperienceTypeTag, WatchEventRecord, WatchEventTypeTag,
34 ACTIVITIES_TABLE, COLLECTIVES_TABLE, EMBEDDINGS_TABLE, EXPERIENCES_BY_COLLECTIVE_TABLE,
35 EXPERIENCES_BY_TYPE_TABLE, EXPERIENCES_TABLE, INSIGHTS_BY_COLLECTIVE_TABLE, INSIGHTS_TABLE,
36 METADATA_TABLE, RELATIONS_BY_SOURCE_TABLE, RELATIONS_BY_TARGET_TABLE, RELATIONS_TABLE,
37 SCHEMA_VERSION, WAL_SEQUENCE_KEY, WATCH_EVENTS_TABLE,
38};
39#[cfg(feature = "sync")]
40use super::schema::{INSTANCE_ID_KEY, SYNC_CURSORS_TABLE};
41use super::StorageEngine;
42use crate::config::{Config, EmbeddingDimension};
43use crate::error::{PulseDBError, Result, StorageError, ValidationError};
44
45const METADATA_KEY: &str = "db_metadata";
47
48#[derive(Debug)]
58pub struct RedbStorage {
59 db: Database,
61
62 metadata: DatabaseMetadata,
64
65 path: PathBuf,
67
68 #[cfg(feature = "sync")]
70 instance_id: crate::sync::InstanceId,
71}
72
73impl RedbStorage {
74 #[instrument(skip(config), fields(path = %path.as_ref().display()))]
105 pub fn open(path: impl AsRef<Path>, config: &Config) -> Result<Self> {
106 let path = path.as_ref();
107 let db_exists = path.exists();
108
109 debug!(db_exists = db_exists, "Opening storage engine");
110
111 let db = Self::create_database(path, config)?;
113
114 if db_exists {
115 Self::open_existing(db, path.to_path_buf(), config)
117 } else {
118 Self::initialize_new(db, path.to_path_buf(), config)
120 }
121 }
122
123 fn create_database(path: &Path, _config: &Config) -> Result<Database> {
125 let builder = Database::builder();
126
127 let db = builder.create(path).map_err(|e| {
134 if e.to_string().contains("locked") {
135 StorageError::DatabaseLocked
136 } else {
137 StorageError::Redb(e.to_string())
138 }
139 })?;
140
141 debug!("Database file opened successfully");
142 Ok(db)
143 }
144
145 #[instrument(skip(db, config), fields(path = %path.display()))]
147 fn initialize_new(db: Database, path: PathBuf, config: &Config) -> Result<Self> {
148 info!("Initializing new database");
149
150 let metadata = DatabaseMetadata::new(config.embedding_dimension);
151
152 let write_txn = db.begin_write().map_err(StorageError::from)?;
154
155 {
156 let mut meta_table = write_txn.open_table(METADATA_TABLE)?;
158 let metadata_bytes = bincode::serialize(&metadata)
159 .map_err(|e| StorageError::serialization(e.to_string()))?;
160 meta_table.insert(METADATA_KEY, metadata_bytes.as_slice())?;
161
162 let _ = write_txn.open_table(COLLECTIVES_TABLE)?;
164 let _ = write_txn.open_table(EXPERIENCES_TABLE)?;
165 let _ = write_txn.open_table(EMBEDDINGS_TABLE)?;
166 let _ = write_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
167 let _ = write_txn.open_multimap_table(EXPERIENCES_BY_TYPE_TABLE)?;
168 let _ = write_txn.open_table(RELATIONS_TABLE)?;
169 let _ = write_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
170 let _ = write_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
171 let _ = write_txn.open_table(INSIGHTS_TABLE)?;
172 let _ = write_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
173 let _ = write_txn.open_table(ACTIVITIES_TABLE)?;
174 let _ = write_txn.open_table(WATCH_EVENTS_TABLE)?;
175
176 #[cfg(feature = "sync")]
178 {
179 let instance_id = crate::sync::InstanceId::new();
180 meta_table.insert(INSTANCE_ID_KEY, instance_id.as_bytes().as_slice())?;
181 let _ = write_txn.open_table(SYNC_CURSORS_TABLE)?;
182 }
183 }
184
185 write_txn.commit().map_err(StorageError::from)?;
186
187 #[cfg(feature = "sync")]
189 let instance_id = {
190 let read_txn = db.begin_read().map_err(StorageError::from)?;
191 let meta_table = read_txn.open_table(METADATA_TABLE)?;
192 let entry = meta_table
193 .get(INSTANCE_ID_KEY)?
194 .ok_or_else(|| StorageError::corrupted("Missing instance_id after init"))?;
195 let bytes: [u8; 16] = entry
196 .value()
197 .try_into()
198 .map_err(|_| StorageError::corrupted("invalid instance_id bytes"))?;
199 crate::sync::InstanceId::from_bytes(bytes)
200 };
201
202 info!(
203 schema_version = SCHEMA_VERSION,
204 dimension = config.embedding_dimension.size(),
205 "Database initialized"
206 );
207
208 Ok(Self {
209 db,
210 metadata,
211 path,
212 #[cfg(feature = "sync")]
213 instance_id,
214 })
215 }
216
217 #[instrument(skip(db, config), fields(path = %path.display()))]
219 fn open_existing(db: Database, path: PathBuf, config: &Config) -> Result<Self> {
220 info!("Opening existing database");
221
222 let read_txn = db.begin_read().map_err(StorageError::from)?;
224
225 let metadata = {
226 let meta_table = read_txn.open_table(METADATA_TABLE).map_err(|e| {
227 StorageError::corrupted(format!("Cannot open metadata table: {}", e))
228 })?;
229
230 let metadata_bytes = meta_table
231 .get(METADATA_KEY)
232 .map_err(StorageError::from)?
233 .ok_or_else(|| StorageError::corrupted("Missing database metadata"))?;
234
235 bincode::deserialize::<DatabaseMetadata>(metadata_bytes.value())
236 .map_err(|e| StorageError::corrupted(format!("Invalid metadata format: {}", e)))?
237 };
238
239 drop(read_txn);
240
241 if metadata.schema_version != SCHEMA_VERSION && metadata.schema_version != 1 {
243 warn!(
244 expected = SCHEMA_VERSION,
245 found = metadata.schema_version,
246 "Schema version mismatch"
247 );
248 return Err(PulseDBError::Storage(StorageError::SchemaVersionMismatch {
249 expected: SCHEMA_VERSION,
250 found: metadata.schema_version,
251 }));
252 }
253 let needs_v2_migration = metadata.schema_version == 1;
254
255 if metadata.embedding_dimension != config.embedding_dimension {
257 warn!(
258 expected = config.embedding_dimension.size(),
259 found = metadata.embedding_dimension.size(),
260 "Embedding dimension mismatch"
261 );
262 return Err(PulseDBError::Validation(
263 ValidationError::DimensionMismatch {
264 expected: config.embedding_dimension.size(),
265 got: metadata.embedding_dimension.size(),
266 },
267 ));
268 }
269
270 let mut metadata = metadata;
272 metadata.touch();
273 if needs_v2_migration {
274 metadata.schema_version = SCHEMA_VERSION;
275 }
276
277 let write_txn = db.begin_write().map_err(StorageError::from)?;
278 {
279 let _ = write_txn.open_table(WATCH_EVENTS_TABLE)?;
281
282 if needs_v2_migration {
284 Self::migrate_wal_v1_to_v2(&write_txn)?;
285 info!("Migrated WAL records from schema v1 to v2");
286 }
287
288 let mut meta_table = write_txn.open_table(METADATA_TABLE)?;
289 let metadata_bytes = bincode::serialize(&metadata)
290 .map_err(|e| StorageError::serialization(e.to_string()))?;
291 meta_table.insert(METADATA_KEY, metadata_bytes.as_slice())?;
292
293 #[cfg(feature = "sync")]
295 {
296 if meta_table.get(INSTANCE_ID_KEY)?.is_none() {
298 let instance_id = crate::sync::InstanceId::new();
299 meta_table.insert(INSTANCE_ID_KEY, instance_id.as_bytes().as_slice())?;
300 debug!("Generated new instance_id for existing database");
301 }
302 let _ = write_txn.open_table(SYNC_CURSORS_TABLE)?;
303 }
304 }
305 write_txn.commit().map_err(StorageError::from)?;
306
307 #[cfg(feature = "sync")]
309 let instance_id = {
310 let read_txn = db.begin_read().map_err(StorageError::from)?;
311 let meta_table = read_txn.open_table(METADATA_TABLE)?;
312 let entry = meta_table
313 .get(INSTANCE_ID_KEY)?
314 .ok_or_else(|| StorageError::corrupted("Missing instance_id"))?;
315 let bytes: [u8; 16] = entry
316 .value()
317 .try_into()
318 .map_err(|_| StorageError::corrupted("invalid instance_id bytes"))?;
319 crate::sync::InstanceId::from_bytes(bytes)
320 };
321
322 info!(
323 schema_version = metadata.schema_version,
324 dimension = metadata.embedding_dimension.size(),
325 "Database opened successfully"
326 );
327
328 Ok(Self {
329 db,
330 metadata,
331 path,
332 #[cfg(feature = "sync")]
333 instance_id,
334 })
335 }
336
337 #[inline]
341 #[allow(dead_code)] pub(crate) fn database(&self) -> &Database {
343 &self.db
344 }
345
346 fn increment_wal_and_record(
361 &self,
362 write_txn: &::redb::WriteTransaction,
363 entity_id: &[u8; 16],
364 collective_id: CollectiveId,
365 entity_type: EntityTypeTag,
366 event_type: WatchEventTypeTag,
367 timestamp: Timestamp,
368 ) -> Result<u64> {
369 #[cfg(feature = "sync")]
371 if crate::sync::guard::is_sync_applying() {
372 return Ok(0);
373 }
374
375 let mut meta_table = write_txn.open_table(METADATA_TABLE)?;
377 let current_seq = match meta_table.get(WAL_SEQUENCE_KEY)? {
378 Some(entry) => {
379 let bytes: [u8; 8] = entry
380 .value()
381 .try_into()
382 .map_err(|_| StorageError::corrupted("invalid wal_sequence bytes"))?;
383 u64::from_be_bytes(bytes)
384 }
385 None => 0,
386 };
387 let new_seq = current_seq + 1;
388
389 let seq_bytes = new_seq.to_be_bytes();
391 meta_table.insert(WAL_SEQUENCE_KEY, seq_bytes.as_slice())?;
392
393 let record = WatchEventRecord {
395 entity_id: *entity_id,
396 collective_id: *collective_id.as_bytes(),
397 event_type,
398 timestamp_ms: timestamp.as_millis(),
399 entity_type,
400 };
401 let record_bytes =
402 bincode::serialize(&record).map_err(|e| StorageError::serialization(e.to_string()))?;
403
404 let mut events_table = write_txn.open_table(WATCH_EVENTS_TABLE)?;
405 events_table.insert(&seq_bytes, record_bytes.as_slice())?;
406
407 Ok(new_seq)
408 }
409
410 fn migrate_wal_v1_to_v2(write_txn: &::redb::WriteTransaction) -> Result<()> {
415 use super::schema::WatchEventRecordV1;
416
417 let events_table = write_txn.open_table(WATCH_EVENTS_TABLE)?;
418
419 let mut entries: Vec<([u8; 8], WatchEventRecordV1)> = Vec::new();
421 for entry in events_table.iter()? {
422 let (key, value) = entry.map_err(StorageError::from)?;
423 let seq_bytes: [u8; 8] = *key.value();
424 let v1_record: WatchEventRecordV1 = bincode::deserialize(value.value())
425 .map_err(|e| StorageError::serialization(format!("v1 WAL record: {}", e)))?;
426 entries.push((seq_bytes, v1_record));
427 }
428 drop(events_table);
429
430 let mut events_table = write_txn.open_table(WATCH_EVENTS_TABLE)?;
432 for (seq_bytes, v1) in &entries {
433 let v2 = WatchEventRecord {
434 entity_id: v1.experience_id,
435 collective_id: v1.collective_id,
436 event_type: v1.event_type,
437 timestamp_ms: v1.timestamp_ms,
438 entity_type: EntityTypeTag::Experience,
439 };
440 let v2_bytes =
441 bincode::serialize(&v2).map_err(|e| StorageError::serialization(e.to_string()))?;
442 events_table.insert(seq_bytes, v2_bytes.as_slice())?;
443 }
444
445 debug!(count = entries.len(), "Migrated WAL records to v2");
446 Ok(())
447 }
448
449 #[inline]
451 pub fn embedding_dimension(&self) -> EmbeddingDimension {
452 self.metadata.embedding_dimension
453 }
454}
455
456impl StorageEngine for RedbStorage {
457 fn metadata(&self) -> &DatabaseMetadata {
462 &self.metadata
463 }
464
465 #[instrument(skip(self))]
466 fn close(self: Box<Self>) -> Result<()> {
467 info!("Closing storage engine");
468
469 drop(self.db);
474
475 info!("Storage engine closed");
476 Ok(())
477 }
478
479 fn path(&self) -> Option<&Path> {
480 Some(&self.path)
481 }
482
483 fn save_collective(&self, collective: &Collective) -> Result<()> {
488 let bytes = bincode::serialize(collective)
489 .map_err(|e| StorageError::serialization(e.to_string()))?;
490
491 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
492 {
493 let mut table = write_txn.open_table(COLLECTIVES_TABLE)?;
494 table.insert(collective.id.as_bytes(), bytes.as_slice())?;
495 }
496 self.increment_wal_and_record(
497 &write_txn,
498 collective.id.as_bytes(),
499 collective.id,
500 EntityTypeTag::Collective,
501 WatchEventTypeTag::Created,
502 collective.created_at,
503 )?;
504 write_txn.commit().map_err(StorageError::from)?;
505
506 debug!(id = %collective.id, name = %collective.name, "Collective saved");
507 Ok(())
508 }
509
510 fn get_collective(&self, id: CollectiveId) -> Result<Option<Collective>> {
511 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
512 let table = read_txn.open_table(COLLECTIVES_TABLE)?;
513
514 match table.get(id.as_bytes())? {
515 Some(value) => {
516 let collective: Collective = bincode::deserialize(value.value())
517 .map_err(|e| StorageError::serialization(e.to_string()))?;
518 Ok(Some(collective))
519 }
520 None => Ok(None),
521 }
522 }
523
524 fn list_collectives(&self) -> Result<Vec<Collective>> {
525 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
526 let table = read_txn.open_table(COLLECTIVES_TABLE)?;
527
528 let mut collectives = Vec::new();
529 for result in table.iter()? {
530 let (_, value) = result.map_err(StorageError::from)?;
531 let collective: Collective = bincode::deserialize(value.value())
532 .map_err(|e| StorageError::serialization(e.to_string()))?;
533 collectives.push(collective);
534 }
535
536 Ok(collectives)
537 }
538
539 fn delete_collective(&self, id: CollectiveId) -> Result<bool> {
540 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
541 let existed;
542 {
543 let mut table = write_txn.open_table(COLLECTIVES_TABLE)?;
544 existed = table.remove(id.as_bytes())?.is_some();
545 }
546 write_txn.commit().map_err(StorageError::from)?;
547
548 if existed {
549 debug!(id = %id, "Collective deleted");
550 }
551 Ok(existed)
552 }
553
554 fn count_experiences_in_collective(&self, id: CollectiveId) -> Result<u64> {
559 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
560 let table = read_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
561
562 let count = table.get(id.as_bytes())?.count() as u64;
563
564 Ok(count)
565 }
566
567 fn delete_experiences_by_collective(&self, id: CollectiveId) -> Result<u64> {
568 let (exp_ids, relation_ids): (Vec<[u8; 16]>, Vec<[u8; 16]>) = {
570 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
571 let table = read_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
572
573 let mut ids = Vec::new();
574 for result in table.get(id.as_bytes())? {
575 let value = result.map_err(StorageError::from)?;
576 let entry = value.value();
577 let mut exp_id = [0u8; 16];
579 exp_id.copy_from_slice(&entry[8..24]);
580 ids.push(exp_id);
581 }
582
583 let mut rel_ids = std::collections::HashSet::new();
585 let source_table = read_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
586 let target_table = read_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
587 for exp_id in &ids {
588 for result in source_table.get(exp_id)? {
589 let value = result.map_err(StorageError::from)?;
590 rel_ids.insert(*value.value());
591 }
592 for result in target_table.get(exp_id)? {
593 let value = result.map_err(StorageError::from)?;
594 rel_ids.insert(*value.value());
595 }
596 }
597
598 (ids, rel_ids.into_iter().collect())
599 };
600
601 let count = exp_ids.len() as u64;
602 if count == 0 {
603 return Ok(0);
604 }
605
606 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
608 {
609 let mut exp_table = write_txn.open_table(EXPERIENCES_TABLE)?;
611 for exp_id in &exp_ids {
612 exp_table.remove(exp_id)?;
613 }
614 }
615 {
616 let mut emb_table = write_txn.open_table(EMBEDDINGS_TABLE)?;
618 for exp_id in &exp_ids {
619 emb_table.remove(exp_id)?;
620 }
621 }
622 {
623 let mut idx_table = write_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
625 idx_table.remove_all(id.as_bytes())?;
626 }
627 {
628 let mut type_table = write_txn.open_multimap_table(EXPERIENCES_BY_TYPE_TABLE)?;
630 for tag in ExperienceTypeTag::all() {
631 let key = encode_type_index_key(id.as_bytes(), *tag);
632 type_table.remove_all(&key)?;
633 }
634 }
635 {
636 if !relation_ids.is_empty() {
638 let mut rel_table = write_txn.open_table(RELATIONS_TABLE)?;
639 let mut source_idx = write_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
640 let mut target_idx = write_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
641
642 for exp_id in &exp_ids {
644 source_idx.remove_all(exp_id)?;
645 target_idx.remove_all(exp_id)?;
646 }
647 for rel_id in &relation_ids {
649 rel_table.remove(rel_id)?;
650 }
651
652 debug!(
653 count = relation_ids.len(),
654 "Cascade-deleted relations for collective"
655 );
656 }
657 }
658 write_txn.commit().map_err(StorageError::from)?;
659
660 debug!(id = %id, count = count, "Cascade-deleted experiences for collective");
661 Ok(count)
662 }
663
664 fn list_experience_ids_in_collective(&self, id: CollectiveId) -> Result<Vec<ExperienceId>> {
665 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
666 let table = read_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
667
668 let mut ids = Vec::new();
669 for result in table.get(id.as_bytes())? {
670 let value = result.map_err(StorageError::from)?;
671 let entry = value.value();
672 let mut exp_bytes = [0u8; 16];
674 exp_bytes.copy_from_slice(&entry[8..24]);
675 ids.push(ExperienceId::from_bytes(exp_bytes));
676 }
677
678 Ok(ids)
679 }
680
681 fn get_recent_experience_ids(
682 &self,
683 collective_id: CollectiveId,
684 limit: usize,
685 ) -> Result<Vec<(ExperienceId, Timestamp)>> {
686 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
687 let table = read_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
688
689 let mut entries = Vec::new();
693 for result in table.get(collective_id.as_bytes())? {
694 let value = result.map_err(StorageError::from)?;
695 let entry = value.value();
696 let mut ts_bytes = [0u8; 8];
698 ts_bytes.copy_from_slice(&entry[..8]);
699 let timestamp = Timestamp::from_millis(i64::from_be_bytes(ts_bytes));
700
701 let mut exp_bytes = [0u8; 16];
702 exp_bytes.copy_from_slice(&entry[8..24]);
703 entries.push((ExperienceId::from_bytes(exp_bytes), timestamp));
704 }
705
706 let start = entries.len().saturating_sub(limit);
708 let mut recent = entries.split_off(start);
709 recent.reverse();
710
711 Ok(recent)
712 }
713
714 fn save_experience(&self, experience: &Experience) -> Result<()> {
719 let exp_bytes = bincode::serialize(experience)
721 .map_err(|e| StorageError::serialization(e.to_string()))?;
722
723 let emb_bytes = f32_slice_to_bytes(&experience.embedding);
725
726 let type_key = encode_type_index_key(
728 experience.collective_id.as_bytes(),
729 experience.experience_type.type_tag(),
730 );
731
732 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
734 {
735 let mut exp_table = write_txn.open_table(EXPERIENCES_TABLE)?;
737 exp_table.insert(experience.id.as_bytes(), exp_bytes.as_slice())?;
738 }
739 {
740 let mut emb_table = write_txn.open_table(EMBEDDINGS_TABLE)?;
742 emb_table.insert(experience.id.as_bytes(), emb_bytes.as_slice())?;
743 }
744 {
745 let mut idx_table = write_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
747 let mut value = [0u8; 24];
749 value[..8].copy_from_slice(&experience.timestamp.to_be_bytes());
750 value[8..24].copy_from_slice(experience.id.as_bytes());
751 idx_table.insert(experience.collective_id.as_bytes(), &value)?;
752 }
753 {
754 let mut type_table = write_txn.open_multimap_table(EXPERIENCES_BY_TYPE_TABLE)?;
756 type_table.insert(&type_key, experience.id.as_bytes())?;
757 }
758 self.increment_wal_and_record(
760 &write_txn,
761 experience.id.as_bytes(),
762 experience.collective_id,
763 EntityTypeTag::Experience,
764 WatchEventTypeTag::Created,
765 experience.timestamp,
766 )?;
767 write_txn.commit().map_err(StorageError::from)?;
768
769 debug!(
770 id = %experience.id,
771 collective_id = %experience.collective_id,
772 "Experience saved"
773 );
774 Ok(())
775 }
776
777 fn get_experience(&self, id: ExperienceId) -> Result<Option<Experience>> {
778 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
779
780 let exp_table = read_txn.open_table(EXPERIENCES_TABLE)?;
782 let exp_entry = match exp_table.get(id.as_bytes())? {
783 Some(v) => v,
784 None => return Ok(None),
785 };
786
787 let mut experience: Experience = bincode::deserialize(exp_entry.value())
788 .map_err(|e| StorageError::serialization(e.to_string()))?;
789
790 let emb_table = read_txn.open_table(EMBEDDINGS_TABLE)?;
792 if let Some(emb_entry) = emb_table.get(id.as_bytes())? {
793 experience.embedding = bytes_to_f32_vec(emb_entry.value());
794 }
795
796 Ok(Some(experience))
797 }
798
799 fn update_experience(&self, id: ExperienceId, update: &ExperienceUpdate) -> Result<bool> {
800 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
802 let collective_id;
803 let timestamp;
804 let is_archive;
805 {
806 let mut exp_table = write_txn.open_table(EXPERIENCES_TABLE)?;
807
808 let entry = match exp_table.get(id.as_bytes())? {
809 Some(v) => v,
810 None => return Ok(false),
811 };
812
813 let mut experience: Experience = bincode::deserialize(entry.value())
814 .map_err(|e| StorageError::serialization(e.to_string()))?;
815
816 drop(entry);
818
819 collective_id = experience.collective_id;
821 timestamp = experience.timestamp;
822 is_archive = update.archived == Some(true);
823
824 if let Some(importance) = update.importance {
826 experience.importance = importance;
827 }
828 if let Some(confidence) = update.confidence {
829 experience.confidence = confidence;
830 }
831 if let Some(ref domain) = update.domain {
832 experience.domain = domain.clone();
833 }
834 if let Some(ref related_files) = update.related_files {
835 experience.related_files = related_files.clone();
836 }
837 if let Some(archived) = update.archived {
838 experience.archived = archived;
839 }
840
841 let bytes = bincode::serialize(&experience)
843 .map_err(|e| StorageError::serialization(e.to_string()))?;
844 exp_table.insert(id.as_bytes(), bytes.as_slice())?;
845 }
846 let event_type = if is_archive {
848 WatchEventTypeTag::Archived
849 } else {
850 WatchEventTypeTag::Updated
851 };
852 self.increment_wal_and_record(
853 &write_txn,
854 id.as_bytes(),
855 collective_id,
856 EntityTypeTag::Experience,
857 event_type,
858 timestamp,
859 )?;
860 write_txn.commit().map_err(StorageError::from)?;
861
862 debug!(id = %id, "Experience updated");
863 Ok(true)
864 }
865
866 fn delete_experience(&self, id: ExperienceId) -> Result<bool> {
867 let (collective_id, timestamp, type_tag) = {
870 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
871 let exp_table = read_txn.open_table(EXPERIENCES_TABLE)?;
872
873 match exp_table.get(id.as_bytes())? {
874 Some(entry) => {
875 let exp: Experience = bincode::deserialize(entry.value())
876 .map_err(|e| StorageError::serialization(e.to_string()))?;
877 (
878 exp.collective_id,
879 exp.timestamp,
880 exp.experience_type.type_tag(),
881 )
882 }
883 None => return Ok(false),
884 }
885 };
886
887 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
889 {
890 let mut exp_table = write_txn.open_table(EXPERIENCES_TABLE)?;
891 exp_table.remove(id.as_bytes())?;
892 }
893 {
894 let mut emb_table = write_txn.open_table(EMBEDDINGS_TABLE)?;
895 emb_table.remove(id.as_bytes())?;
896 }
897 {
898 let mut idx_table = write_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
900 let mut value = [0u8; 24];
901 value[..8].copy_from_slice(×tamp.to_be_bytes());
902 value[8..24].copy_from_slice(id.as_bytes());
903 idx_table.remove(collective_id.as_bytes(), &value)?;
904 }
905 {
906 let mut type_table = write_txn.open_multimap_table(EXPERIENCES_BY_TYPE_TABLE)?;
908 let type_key = encode_type_index_key(collective_id.as_bytes(), type_tag);
909 type_table.remove(&type_key, id.as_bytes())?;
910 }
911 self.increment_wal_and_record(
913 &write_txn,
914 id.as_bytes(),
915 collective_id,
916 EntityTypeTag::Experience,
917 WatchEventTypeTag::Deleted,
918 timestamp,
919 )?;
920 write_txn.commit().map_err(StorageError::from)?;
921
922 debug!(id = %id, "Experience deleted");
923 Ok(true)
924 }
925
926 fn reinforce_experience(&self, id: ExperienceId) -> Result<Option<u32>> {
927 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
928 let (new_count, collective_id, timestamp) = {
929 let mut exp_table = write_txn.open_table(EXPERIENCES_TABLE)?;
930
931 let entry = match exp_table.get(id.as_bytes())? {
932 Some(v) => v,
933 None => return Ok(None),
934 };
935
936 let mut experience: Experience = bincode::deserialize(entry.value())
937 .map_err(|e| StorageError::serialization(e.to_string()))?;
938 drop(entry);
939
940 experience.applications = experience.applications.saturating_add(1);
941 let new_count = experience.applications;
942 let collective_id = experience.collective_id;
943 let timestamp = experience.timestamp;
944
945 let bytes = bincode::serialize(&experience)
946 .map_err(|e| StorageError::serialization(e.to_string()))?;
947 exp_table.insert(id.as_bytes(), bytes.as_slice())?;
948 (new_count, collective_id, timestamp)
949 };
950 self.increment_wal_and_record(
952 &write_txn,
953 id.as_bytes(),
954 collective_id,
955 EntityTypeTag::Experience,
956 WatchEventTypeTag::Updated,
957 timestamp,
958 )?;
959 write_txn.commit().map_err(StorageError::from)?;
960
961 debug!(id = %id, applications = new_count, "Experience reinforced");
962 Ok(Some(new_count))
963 }
964
965 fn save_embedding(&self, id: ExperienceId, embedding: &[f32]) -> Result<()> {
966 let bytes = f32_slice_to_bytes(embedding);
967
968 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
969 {
970 let mut table = write_txn.open_table(EMBEDDINGS_TABLE)?;
971 table.insert(id.as_bytes(), bytes.as_slice())?;
972 }
973 write_txn.commit().map_err(StorageError::from)?;
974
975 debug!(id = %id, dim = embedding.len(), "Embedding saved");
976 Ok(())
977 }
978
979 fn get_embedding(&self, id: ExperienceId) -> Result<Option<Vec<f32>>> {
980 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
981 let table = read_txn.open_table(EMBEDDINGS_TABLE)?;
982
983 match table.get(id.as_bytes())? {
984 Some(entry) => Ok(Some(bytes_to_f32_vec(entry.value()))),
985 None => Ok(None),
986 }
987 }
988
989 fn save_relation(&self, relation: &ExperienceRelation) -> Result<()> {
994 let bytes =
995 bincode::serialize(relation).map_err(|e| StorageError::serialization(e.to_string()))?;
996
997 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
998 {
999 let mut table = write_txn.open_table(RELATIONS_TABLE)?;
1000 table.insert(relation.id.as_bytes(), bytes.as_slice())?;
1001 }
1002 {
1003 let mut table = write_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
1004 table.insert(relation.source_id.as_bytes(), relation.id.as_bytes())?;
1005 }
1006 {
1007 let mut table = write_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
1008 table.insert(relation.target_id.as_bytes(), relation.id.as_bytes())?;
1009 }
1010 let collective_id = {
1012 let exp_table = write_txn.open_table(EXPERIENCES_TABLE)?;
1013 let entry = exp_table
1014 .get(relation.source_id.as_bytes())?
1015 .ok_or_else(|| {
1016 StorageError::corrupted("relation source experience not found for WAL record")
1017 })?;
1018 let exp: Experience = bincode::deserialize(entry.value())
1019 .map_err(|e| StorageError::serialization(e.to_string()))?;
1020 exp.collective_id
1021 };
1022 self.increment_wal_and_record(
1023 &write_txn,
1024 relation.id.as_bytes(),
1025 collective_id,
1026 EntityTypeTag::Relation,
1027 WatchEventTypeTag::Created,
1028 relation.created_at,
1029 )?;
1030 write_txn.commit().map_err(StorageError::from)?;
1031
1032 debug!(id = %relation.id, "Relation saved");
1033 Ok(())
1034 }
1035
1036 fn get_relation(&self, id: RelationId) -> Result<Option<ExperienceRelation>> {
1037 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1038 let table = read_txn.open_table(RELATIONS_TABLE)?;
1039
1040 match table.get(id.as_bytes())? {
1041 Some(value) => {
1042 let relation: ExperienceRelation = bincode::deserialize(value.value())
1043 .map_err(|e| StorageError::serialization(e.to_string()))?;
1044 Ok(Some(relation))
1045 }
1046 None => Ok(None),
1047 }
1048 }
1049
1050 fn delete_relation(&self, id: RelationId) -> Result<bool> {
1051 let (source_id, target_id, collective_id) = {
1054 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1055 let rel_table = read_txn.open_table(RELATIONS_TABLE)?;
1056
1057 match rel_table.get(id.as_bytes())? {
1058 Some(entry) => {
1059 let rel: ExperienceRelation = bincode::deserialize(entry.value())
1060 .map_err(|e| StorageError::serialization(e.to_string()))?;
1061 let exp_table = read_txn.open_table(EXPERIENCES_TABLE)?;
1063 let cid = match exp_table.get(rel.source_id.as_bytes())? {
1064 Some(exp_entry) => {
1065 let exp: Experience = bincode::deserialize(exp_entry.value())
1066 .map_err(|e| StorageError::serialization(e.to_string()))?;
1067 exp.collective_id
1068 }
1069 None => CollectiveId::nil(),
1071 };
1072 (rel.source_id, rel.target_id, cid)
1073 }
1074 None => return Ok(false),
1075 }
1076 };
1077
1078 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1080 {
1081 let mut table = write_txn.open_table(RELATIONS_TABLE)?;
1082 table.remove(id.as_bytes())?;
1083 }
1084 {
1085 let mut table = write_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
1086 table.remove(source_id.as_bytes(), id.as_bytes())?;
1087 }
1088 {
1089 let mut table = write_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
1090 table.remove(target_id.as_bytes(), id.as_bytes())?;
1091 }
1092 self.increment_wal_and_record(
1093 &write_txn,
1094 id.as_bytes(),
1095 collective_id,
1096 EntityTypeTag::Relation,
1097 WatchEventTypeTag::Deleted,
1098 Timestamp::now(),
1099 )?;
1100 write_txn.commit().map_err(StorageError::from)?;
1101
1102 debug!(id = %id, "Relation deleted");
1103 Ok(true)
1104 }
1105
1106 fn get_relation_ids_by_source(&self, experience_id: ExperienceId) -> Result<Vec<RelationId>> {
1107 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1108 let table = read_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
1109
1110 let mut ids = Vec::new();
1111 for result in table.get(experience_id.as_bytes())? {
1112 let value = result.map_err(StorageError::from)?;
1113 let bytes = value.value();
1114 ids.push(RelationId::from_bytes(*bytes));
1115 }
1116
1117 Ok(ids)
1118 }
1119
1120 fn get_relation_ids_by_target(&self, experience_id: ExperienceId) -> Result<Vec<RelationId>> {
1121 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1122 let table = read_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
1123
1124 let mut ids = Vec::new();
1125 for result in table.get(experience_id.as_bytes())? {
1126 let value = result.map_err(StorageError::from)?;
1127 let bytes = value.value();
1128 ids.push(RelationId::from_bytes(*bytes));
1129 }
1130
1131 Ok(ids)
1132 }
1133
1134 fn delete_relations_for_experience(&self, experience_id: ExperienceId) -> Result<u64> {
1135 let relation_ids: Vec<RelationId> = {
1137 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1138 let source_table = read_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
1139 let target_table = read_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
1140
1141 let mut ids = std::collections::HashSet::new();
1142
1143 for result in source_table.get(experience_id.as_bytes())? {
1145 let value = result.map_err(StorageError::from)?;
1146 ids.insert(RelationId::from_bytes(*value.value()));
1147 }
1148
1149 for result in target_table.get(experience_id.as_bytes())? {
1151 let value = result.map_err(StorageError::from)?;
1152 ids.insert(RelationId::from_bytes(*value.value()));
1153 }
1154
1155 ids.into_iter().collect()
1156 };
1157
1158 let count = relation_ids.len() as u64;
1159 if count == 0 {
1160 return Ok(0);
1161 }
1162
1163 let relations: Vec<ExperienceRelation> = {
1165 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1166 let table = read_txn.open_table(RELATIONS_TABLE)?;
1167
1168 let mut rels = Vec::with_capacity(relation_ids.len());
1169 for rel_id in &relation_ids {
1170 if let Some(entry) = table.get(rel_id.as_bytes())? {
1171 let rel: ExperienceRelation = bincode::deserialize(entry.value())
1172 .map_err(|e| StorageError::serialization(e.to_string()))?;
1173 rels.push(rel);
1174 }
1175 }
1176 rels
1177 };
1178
1179 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1181 {
1182 let mut rel_table = write_txn.open_table(RELATIONS_TABLE)?;
1183 for rel in &relations {
1184 rel_table.remove(rel.id.as_bytes())?;
1185 }
1186 }
1187 {
1188 let mut source_table = write_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
1189 for rel in &relations {
1190 source_table.remove(rel.source_id.as_bytes(), rel.id.as_bytes())?;
1191 }
1192 }
1193 {
1194 let mut target_table = write_txn.open_multimap_table(RELATIONS_BY_TARGET_TABLE)?;
1195 for rel in &relations {
1196 target_table.remove(rel.target_id.as_bytes(), rel.id.as_bytes())?;
1197 }
1198 }
1199 write_txn.commit().map_err(StorageError::from)?;
1200
1201 debug!(
1202 experience_id = %experience_id,
1203 count = count,
1204 "Cascade-deleted relations for experience"
1205 );
1206 Ok(count)
1207 }
1208
1209 fn relation_exists(
1210 &self,
1211 source_id: ExperienceId,
1212 target_id: ExperienceId,
1213 relation_type: RelationType,
1214 ) -> Result<bool> {
1215 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1216 let index_table = read_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
1217 let rel_table = read_txn.open_table(RELATIONS_TABLE)?;
1218
1219 for result in index_table.get(source_id.as_bytes())? {
1221 let value = result.map_err(StorageError::from)?;
1222 let rel_id = RelationId::from_bytes(*value.value());
1223
1224 if let Some(entry) = rel_table.get(rel_id.as_bytes())? {
1225 let rel: ExperienceRelation = bincode::deserialize(entry.value())
1226 .map_err(|e| StorageError::serialization(e.to_string()))?;
1227 if rel.target_id == target_id && rel.relation_type == relation_type {
1228 return Ok(true);
1229 }
1230 }
1231 }
1232
1233 Ok(false)
1234 }
1235
1236 fn save_insight(&self, insight: &DerivedInsight) -> Result<()> {
1241 let bytes =
1242 bincode::serialize(insight).map_err(|e| StorageError::serialization(e.to_string()))?;
1243
1244 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1245 {
1246 let mut table = write_txn.open_table(INSIGHTS_TABLE)?;
1247 table.insert(insight.id.as_bytes(), bytes.as_slice())?;
1248 }
1249 {
1250 let mut table = write_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
1251 table.insert(insight.collective_id.as_bytes(), insight.id.as_bytes())?;
1252 }
1253 self.increment_wal_and_record(
1254 &write_txn,
1255 insight.id.as_bytes(),
1256 insight.collective_id,
1257 EntityTypeTag::Insight,
1258 WatchEventTypeTag::Created,
1259 insight.created_at,
1260 )?;
1261 write_txn.commit().map_err(StorageError::from)?;
1262
1263 debug!(id = %insight.id, collective_id = %insight.collective_id, "Insight saved");
1264 Ok(())
1265 }
1266
1267 fn get_insight(&self, id: InsightId) -> Result<Option<DerivedInsight>> {
1268 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1269 let table = read_txn.open_table(INSIGHTS_TABLE)?;
1270
1271 match table.get(id.as_bytes())? {
1272 Some(value) => {
1273 let insight: DerivedInsight = bincode::deserialize(value.value())
1274 .map_err(|e| StorageError::serialization(e.to_string()))?;
1275 Ok(Some(insight))
1276 }
1277 None => Ok(None),
1278 }
1279 }
1280
1281 fn delete_insight(&self, id: InsightId) -> Result<bool> {
1282 let collective_id = {
1284 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1285 let table = read_txn.open_table(INSIGHTS_TABLE)?;
1286
1287 match table.get(id.as_bytes())? {
1288 Some(entry) => {
1289 let insight: DerivedInsight = bincode::deserialize(entry.value())
1290 .map_err(|e| StorageError::serialization(e.to_string()))?;
1291 insight.collective_id
1292 }
1293 None => return Ok(false),
1294 }
1295 };
1296
1297 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1299 {
1300 let mut table = write_txn.open_table(INSIGHTS_TABLE)?;
1301 table.remove(id.as_bytes())?;
1302 }
1303 {
1304 let mut table = write_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
1305 table.remove(collective_id.as_bytes(), id.as_bytes())?;
1306 }
1307 self.increment_wal_and_record(
1308 &write_txn,
1309 id.as_bytes(),
1310 collective_id,
1311 EntityTypeTag::Insight,
1312 WatchEventTypeTag::Deleted,
1313 Timestamp::now(),
1314 )?;
1315 write_txn.commit().map_err(StorageError::from)?;
1316
1317 debug!(id = %id, "Insight deleted");
1318 Ok(true)
1319 }
1320
1321 fn list_insight_ids_in_collective(&self, id: CollectiveId) -> Result<Vec<InsightId>> {
1322 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1323 let table = read_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
1324
1325 let mut ids = Vec::new();
1326 for result in table.get(id.as_bytes())? {
1327 let value = result.map_err(StorageError::from)?;
1328 ids.push(InsightId::from_bytes(*value.value()));
1329 }
1330
1331 Ok(ids)
1332 }
1333
1334 fn delete_insights_by_collective(&self, id: CollectiveId) -> Result<u64> {
1335 let insight_ids: Vec<[u8; 16]> = {
1337 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1338 let table = read_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
1339
1340 let mut ids = Vec::new();
1341 for result in table.get(id.as_bytes())? {
1342 let value = result.map_err(StorageError::from)?;
1343 ids.push(*value.value());
1344 }
1345 ids
1346 };
1347
1348 let count = insight_ids.len() as u64;
1349 if count == 0 {
1350 return Ok(0);
1351 }
1352
1353 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1355 {
1356 let mut table = write_txn.open_table(INSIGHTS_TABLE)?;
1357 for insight_id in &insight_ids {
1358 table.remove(insight_id)?;
1359 }
1360 }
1361 {
1362 let mut table = write_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
1363 table.remove_all(id.as_bytes())?;
1364 }
1365 write_txn.commit().map_err(StorageError::from)?;
1366
1367 debug!(id = %id, count = count, "Cascade-deleted insights for collective");
1368 Ok(count)
1369 }
1370
1371 fn save_activity(&self, activity: &Activity) -> Result<()> {
1376 let key = encode_activity_key(activity.collective_id.as_bytes(), &activity.agent_id);
1377 let bytes =
1378 bincode::serialize(activity).map_err(|e| StorageError::serialization(e.to_string()))?;
1379
1380 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1381 {
1382 let mut table = write_txn.open_table(ACTIVITIES_TABLE)?;
1383 table.insert(key.as_slice(), bytes.as_slice())?;
1384 }
1385 write_txn.commit().map_err(StorageError::from)?;
1386
1387 debug!(
1388 agent_id = %activity.agent_id,
1389 collective_id = %activity.collective_id,
1390 "Activity saved"
1391 );
1392 Ok(())
1393 }
1394
1395 fn get_activity(
1396 &self,
1397 agent_id: &str,
1398 collective_id: CollectiveId,
1399 ) -> Result<Option<Activity>> {
1400 let key = encode_activity_key(collective_id.as_bytes(), agent_id);
1401
1402 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1403 let table = read_txn.open_table(ACTIVITIES_TABLE)?;
1404
1405 match table.get(key.as_slice())? {
1406 Some(value) => {
1407 let activity: Activity = bincode::deserialize(value.value())
1408 .map_err(|e| StorageError::serialization(e.to_string()))?;
1409 Ok(Some(activity))
1410 }
1411 None => Ok(None),
1412 }
1413 }
1414
1415 fn delete_activity(&self, agent_id: &str, collective_id: CollectiveId) -> Result<bool> {
1416 let key = encode_activity_key(collective_id.as_bytes(), agent_id);
1417
1418 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1419 let existed = {
1420 let mut table = write_txn.open_table(ACTIVITIES_TABLE)?;
1421 let removed = table.remove(key.as_slice())?;
1422 removed.is_some()
1423 };
1424 write_txn.commit().map_err(StorageError::from)?;
1425
1426 if existed {
1427 debug!(agent_id = %agent_id, collective_id = %collective_id, "Activity deleted");
1428 }
1429 Ok(existed)
1430 }
1431
1432 fn list_activities_in_collective(&self, collective_id: CollectiveId) -> Result<Vec<Activity>> {
1433 let prefix = collective_id.as_bytes();
1434
1435 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1436 let table = read_txn.open_table(ACTIVITIES_TABLE)?;
1437
1438 let mut activities = Vec::new();
1439 for result in table.iter()? {
1440 let (key, value) = result.map_err(StorageError::from)?;
1441 let key_bytes = key.value();
1442
1443 if key_bytes.len() >= 16 && decode_collective_from_activity_key(key_bytes) == *prefix {
1445 let activity: Activity = bincode::deserialize(value.value())
1446 .map_err(|e| StorageError::serialization(e.to_string()))?;
1447 activities.push(activity);
1448 }
1449 }
1450
1451 Ok(activities)
1452 }
1453
1454 fn delete_activities_by_collective(&self, collective_id: CollectiveId) -> Result<u64> {
1455 let prefix = collective_id.as_bytes();
1456
1457 let keys_to_delete: Vec<Vec<u8>> = {
1459 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1460 let table = read_txn.open_table(ACTIVITIES_TABLE)?;
1461
1462 let mut keys = Vec::new();
1463 for result in table.iter()? {
1464 let (key, _) = result.map_err(StorageError::from)?;
1465 let key_bytes = key.value();
1466
1467 if key_bytes.len() >= 16
1468 && decode_collective_from_activity_key(key_bytes) == *prefix
1469 {
1470 keys.push(key_bytes.to_vec());
1471 }
1472 }
1473 keys
1474 };
1475
1476 let count = keys_to_delete.len() as u64;
1477 if count == 0 {
1478 return Ok(0);
1479 }
1480
1481 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1483 {
1484 let mut table = write_txn.open_table(ACTIVITIES_TABLE)?;
1485 for key in &keys_to_delete {
1486 table.remove(key.as_slice())?;
1487 }
1488 }
1489 write_txn.commit().map_err(StorageError::from)?;
1490
1491 debug!(
1492 collective_id = %collective_id,
1493 count = count,
1494 "Cascade-deleted activities for collective"
1495 );
1496 Ok(count)
1497 }
1498
1499 fn list_experience_ids_paginated(
1504 &self,
1505 collective_id: CollectiveId,
1506 limit: usize,
1507 offset: usize,
1508 ) -> Result<Vec<ExperienceId>> {
1509 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1510 let table = read_txn.open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)?;
1511
1512 let mut ids = Vec::new();
1513 let mut skipped = 0usize;
1514
1515 for result in table.get(collective_id.as_bytes())? {
1517 let value = result.map_err(StorageError::from)?;
1518 if skipped < offset {
1519 skipped += 1;
1520 continue;
1521 }
1522 let entry = value.value();
1523 let mut id_bytes = [0u8; 16];
1524 id_bytes.copy_from_slice(&entry[8..24]);
1525 ids.push(ExperienceId::from_bytes(id_bytes));
1526 if ids.len() >= limit {
1527 return Ok(ids);
1528 }
1529 }
1530
1531 Ok(ids)
1532 }
1533
1534 fn list_relations_in_collective(
1535 &self,
1536 collective_id: CollectiveId,
1537 limit: usize,
1538 offset: usize,
1539 ) -> Result<Vec<crate::relation::ExperienceRelation>> {
1540 let exp_ids = self.list_experience_ids_in_collective(collective_id)?;
1542
1543 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1544 let source_table = read_txn.open_multimap_table(RELATIONS_BY_SOURCE_TABLE)?;
1545 let rel_table = read_txn.open_table(RELATIONS_TABLE)?;
1546
1547 let mut relations = Vec::new();
1548 let mut skipped = 0usize;
1549
1550 for exp_id in &exp_ids {
1551 for result in source_table.get(exp_id.as_bytes())? {
1552 let rel_id_value = result.map_err(StorageError::from)?;
1553 let rel_id = RelationId::from_bytes(*rel_id_value.value());
1554
1555 if skipped < offset {
1556 skipped += 1;
1557 continue;
1558 }
1559
1560 if let Some(entry) = rel_table.get(rel_id.as_bytes())? {
1561 let relation: crate::relation::ExperienceRelation =
1562 bincode::deserialize(entry.value())
1563 .map_err(|e| StorageError::serialization(e.to_string()))?;
1564 relations.push(relation);
1565 if relations.len() >= limit {
1566 return Ok(relations);
1567 }
1568 }
1569 }
1570 }
1571
1572 Ok(relations)
1573 }
1574
1575 fn list_insight_ids_paginated(
1576 &self,
1577 collective_id: CollectiveId,
1578 limit: usize,
1579 offset: usize,
1580 ) -> Result<Vec<InsightId>> {
1581 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1582 let table = read_txn.open_multimap_table(INSIGHTS_BY_COLLECTIVE_TABLE)?;
1583
1584 let mut ids = Vec::new();
1585 let mut skipped = 0usize;
1586
1587 for result in table.get(collective_id.as_bytes())? {
1588 let value = result.map_err(StorageError::from)?;
1589 if skipped < offset {
1590 skipped += 1;
1591 continue;
1592 }
1593 ids.push(InsightId::from_bytes(*value.value()));
1594 if ids.len() >= limit {
1595 return Ok(ids);
1596 }
1597 }
1598
1599 Ok(ids)
1600 }
1601
1602 fn get_wal_sequence(&self) -> Result<u64> {
1607 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1608 let meta_table = read_txn.open_table(METADATA_TABLE)?;
1609 match meta_table.get(WAL_SEQUENCE_KEY)? {
1610 Some(entry) => {
1611 let bytes: [u8; 8] = entry
1612 .value()
1613 .try_into()
1614 .map_err(|_| StorageError::corrupted("invalid wal_sequence bytes"))?;
1615 Ok(u64::from_be_bytes(bytes))
1616 }
1617 None => Ok(0),
1618 }
1619 }
1620
1621 fn poll_watch_events(
1622 &self,
1623 since_seq: u64,
1624 limit: usize,
1625 ) -> Result<(Vec<WatchEventRecord>, u64)> {
1626 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1627 let events_table = read_txn.open_table(WATCH_EVENTS_TABLE)?;
1628
1629 let start_key = (since_seq + 1).to_be_bytes();
1630 let end_key = u64::MAX.to_be_bytes();
1631 let mut events = Vec::new();
1632 let mut max_seq = since_seq;
1633
1634 for entry in events_table.range::<&[u8; 8]>(&start_key..=&end_key)? {
1635 let (key, value) = entry.map_err(StorageError::from)?;
1636 let seq = u64::from_be_bytes(*key.value());
1637 let record: WatchEventRecord = bincode::deserialize(value.value())
1638 .map_err(|e| StorageError::serialization(e.to_string()))?;
1639 events.push(record);
1640 max_seq = seq;
1641 if events.len() >= limit {
1642 break;
1643 }
1644 }
1645
1646 Ok((events, max_seq))
1647 }
1648
1649 #[cfg(feature = "sync")]
1654 fn poll_sync_events(
1655 &self,
1656 since_seq: u64,
1657 limit: usize,
1658 ) -> Result<Vec<(u64, WatchEventRecord)>> {
1659 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1660 let events_table = read_txn.open_table(WATCH_EVENTS_TABLE)?;
1661
1662 let start_key = (since_seq + 1).to_be_bytes();
1663 let end_key = u64::MAX.to_be_bytes();
1664 let mut events = Vec::new();
1665
1666 for entry in events_table.range::<&[u8; 8]>(&start_key..=&end_key)? {
1667 let (key, value) = entry.map_err(StorageError::from)?;
1668 let seq = u64::from_be_bytes(*key.value());
1669 let record: WatchEventRecord = bincode::deserialize(value.value())
1670 .map_err(|e| StorageError::serialization(e.to_string()))?;
1671 events.push((seq, record));
1672 if events.len() >= limit {
1673 break;
1674 }
1675 }
1676
1677 Ok(events)
1678 }
1679
1680 #[cfg(feature = "sync")]
1681 fn instance_id(&self) -> crate::sync::InstanceId {
1682 self.instance_id
1683 }
1684
1685 #[cfg(feature = "sync")]
1686 fn save_sync_cursor(&self, cursor: &crate::sync::SyncCursor) -> Result<()> {
1687 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1688 {
1689 let mut table = write_txn.open_table(SYNC_CURSORS_TABLE)?;
1690 let bytes = bincode::serialize(cursor)
1691 .map_err(|e| StorageError::serialization(e.to_string()))?;
1692 table.insert(cursor.instance_id.as_bytes(), bytes.as_slice())?;
1693 }
1694 write_txn.commit().map_err(StorageError::from)?;
1695 debug!(
1696 peer = %cursor.instance_id,
1697 last_sequence = cursor.last_sequence,
1698 "Saved sync cursor"
1699 );
1700 Ok(())
1701 }
1702
1703 #[cfg(feature = "sync")]
1704 fn load_sync_cursor(
1705 &self,
1706 instance_id: &crate::sync::InstanceId,
1707 ) -> Result<Option<crate::sync::SyncCursor>> {
1708 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1709 let table = read_txn.open_table(SYNC_CURSORS_TABLE)?;
1710 match table.get(instance_id.as_bytes())? {
1711 Some(entry) => {
1712 let cursor: crate::sync::SyncCursor = bincode::deserialize(entry.value())
1713 .map_err(|e| StorageError::serialization(e.to_string()))?;
1714 Ok(Some(cursor))
1715 }
1716 None => Ok(None),
1717 }
1718 }
1719
1720 #[cfg(feature = "sync")]
1721 fn list_sync_cursors(&self) -> Result<Vec<crate::sync::SyncCursor>> {
1722 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1723 let table = read_txn.open_table(SYNC_CURSORS_TABLE)?;
1724 let mut cursors = Vec::new();
1725 for entry in table.iter()? {
1726 let (_, value) = entry.map_err(StorageError::from)?;
1727 let cursor: crate::sync::SyncCursor = bincode::deserialize(value.value())
1728 .map_err(|e| StorageError::serialization(e.to_string()))?;
1729 cursors.push(cursor);
1730 }
1731 Ok(cursors)
1732 }
1733
1734 #[cfg(feature = "sync")]
1735 fn compact_wal_events(&self, up_to_seq: u64) -> Result<u64> {
1736 if up_to_seq == 0 {
1737 return Ok(0);
1738 }
1739
1740 let keys_to_delete: Vec<[u8; 8]> = {
1742 let read_txn = self.db.begin_read().map_err(StorageError::from)?;
1743 let events_table = read_txn.open_table(WATCH_EVENTS_TABLE)?;
1744
1745 let start_key = 1u64.to_be_bytes();
1746 let end_key = up_to_seq.to_be_bytes();
1747 let mut keys = Vec::new();
1748
1749 for entry in events_table.range::<&[u8; 8]>(&start_key..=&end_key)? {
1750 let (key, _) = entry.map_err(StorageError::from)?;
1751 keys.push(*key.value());
1752 }
1753 keys
1754 };
1755
1756 if keys_to_delete.is_empty() {
1757 return Ok(0);
1758 }
1759
1760 let count = keys_to_delete.len() as u64;
1761
1762 let write_txn = self.db.begin_write().map_err(StorageError::from)?;
1764 {
1765 let mut events_table = write_txn.open_table(WATCH_EVENTS_TABLE)?;
1766 for key in &keys_to_delete {
1767 events_table.remove(key)?;
1768 }
1769 }
1770 write_txn.commit().map_err(StorageError::from)?;
1771
1772 debug!(count, up_to_seq, "Compacted WAL events");
1773 Ok(count)
1774 }
1775}
1776
1777#[inline]
1783fn f32_slice_to_bytes(data: &[f32]) -> Vec<u8> {
1784 let mut bytes = Vec::with_capacity(data.len() * 4);
1785 for &val in data {
1786 bytes.extend_from_slice(&val.to_le_bytes());
1787 }
1788 bytes
1789}
1790
1791#[inline]
1793fn bytes_to_f32_vec(data: &[u8]) -> Vec<f32> {
1794 data.chunks_exact(4)
1795 .map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
1796 .collect()
1797}
1798
1799#[cfg(test)]
1803mod tests {
1804 use super::*;
1805 use tempfile::tempdir;
1806
1807 fn default_config() -> Config {
1808 Config::default()
1809 }
1810
1811 #[test]
1812 fn test_open_creates_new_database() {
1813 let dir = tempdir().unwrap();
1814 let path = dir.path().join("test.db");
1815
1816 assert!(!path.exists());
1817
1818 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1819
1820 assert!(path.exists());
1821 assert_eq!(storage.metadata().schema_version, SCHEMA_VERSION);
1822 assert_eq!(
1823 storage.metadata().embedding_dimension,
1824 EmbeddingDimension::D384
1825 );
1826
1827 Box::new(storage).close().unwrap();
1828 }
1829
1830 #[test]
1831 fn test_open_existing_database() {
1832 let dir = tempdir().unwrap();
1833 let path = dir.path().join("test.db");
1834
1835 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1837 let created_at = storage.metadata().created_at;
1838 Box::new(storage).close().unwrap();
1839
1840 std::thread::sleep(std::time::Duration::from_millis(10));
1842 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1843
1844 assert_eq!(storage.metadata().created_at, created_at);
1846 assert!(storage.metadata().last_opened_at > created_at);
1848
1849 Box::new(storage).close().unwrap();
1850 }
1851
1852 #[test]
1853 fn test_dimension_mismatch_returns_error() {
1854 let dir = tempdir().unwrap();
1855 let path = dir.path().join("test.db");
1856
1857 let config_384 = Config {
1859 embedding_dimension: EmbeddingDimension::D384,
1860 ..Default::default()
1861 };
1862 let storage = RedbStorage::open(&path, &config_384).unwrap();
1863 Box::new(storage).close().unwrap();
1864
1865 let config_768 = Config {
1867 embedding_dimension: EmbeddingDimension::D768,
1868 ..Default::default()
1869 };
1870 let result = RedbStorage::open(&path, &config_768);
1871
1872 assert!(result.is_err());
1873 let err = result.unwrap_err();
1874 assert!(matches!(
1875 err,
1876 PulseDBError::Validation(ValidationError::DimensionMismatch { .. })
1877 ));
1878 }
1879
1880 #[test]
1881 fn test_database_files_created() {
1882 let dir = tempdir().unwrap();
1883 let path = dir.path().join("pulse.db");
1884
1885 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1886
1887 assert!(path.exists());
1889 assert!(storage.path().is_some());
1890 assert_eq!(storage.path().unwrap(), path);
1891
1892 Box::new(storage).close().unwrap();
1893 }
1894
1895 #[test]
1896 fn test_metadata_preserved_across_opens() {
1897 let dir = tempdir().unwrap();
1898 let path = dir.path().join("test.db");
1899
1900 let config = Config {
1901 embedding_dimension: EmbeddingDimension::Custom(512),
1902 ..Default::default()
1903 };
1904
1905 let storage = RedbStorage::open(&path, &config).unwrap();
1907 assert_eq!(
1908 storage.metadata().embedding_dimension,
1909 EmbeddingDimension::Custom(512)
1910 );
1911 Box::new(storage).close().unwrap();
1912
1913 let storage = RedbStorage::open(&path, &config).unwrap();
1915 assert_eq!(
1916 storage.metadata().embedding_dimension,
1917 EmbeddingDimension::Custom(512)
1918 );
1919 Box::new(storage).close().unwrap();
1920 }
1921
1922 #[test]
1923 fn test_embedding_dimension_accessor() {
1924 let dir = tempdir().unwrap();
1925 let path = dir.path().join("test.db");
1926
1927 let config = Config {
1928 embedding_dimension: EmbeddingDimension::D768,
1929 ..Default::default()
1930 };
1931
1932 let storage = RedbStorage::open(&path, &config).unwrap();
1933 assert_eq!(storage.embedding_dimension(), EmbeddingDimension::D768);
1934
1935 Box::new(storage).close().unwrap();
1936 }
1937
1938 #[test]
1939 fn test_all_six_tables_created() {
1940 let dir = tempdir().unwrap();
1941 let path = dir.path().join("test.db");
1942
1943 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1944
1945 let read_txn = storage.database().begin_read().unwrap();
1949
1950 read_txn.open_table(METADATA_TABLE).unwrap();
1951 read_txn.open_table(COLLECTIVES_TABLE).unwrap();
1952 read_txn.open_table(EXPERIENCES_TABLE).unwrap();
1953 read_txn.open_table(EMBEDDINGS_TABLE).unwrap();
1954 read_txn
1955 .open_multimap_table(EXPERIENCES_BY_COLLECTIVE_TABLE)
1956 .unwrap();
1957 read_txn
1958 .open_multimap_table(EXPERIENCES_BY_TYPE_TABLE)
1959 .unwrap();
1960
1961 Box::new(storage).close().unwrap();
1962 }
1963
1964 #[test]
1969 fn test_save_and_get_collective() {
1970 let dir = tempdir().unwrap();
1971 let path = dir.path().join("test.db");
1972 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1973
1974 let collective = Collective::new("test-project", 384);
1975 let id = collective.id;
1976
1977 storage.save_collective(&collective).unwrap();
1978
1979 let retrieved = storage.get_collective(id).unwrap().unwrap();
1980 assert_eq!(retrieved.id, id);
1981 assert_eq!(retrieved.name, "test-project");
1982 assert_eq!(retrieved.embedding_dimension, 384);
1983 assert!(retrieved.owner_id.is_none());
1984
1985 Box::new(storage).close().unwrap();
1986 }
1987
1988 #[test]
1989 fn test_get_nonexistent_collective_returns_none() {
1990 let dir = tempdir().unwrap();
1991 let path = dir.path().join("test.db");
1992 let storage = RedbStorage::open(&path, &default_config()).unwrap();
1993
1994 let result = storage.get_collective(CollectiveId::new()).unwrap();
1995 assert!(result.is_none());
1996
1997 Box::new(storage).close().unwrap();
1998 }
1999
2000 #[test]
2001 fn test_save_collective_overwrites_existing() {
2002 let dir = tempdir().unwrap();
2003 let path = dir.path().join("test.db");
2004 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2005
2006 let mut collective = Collective::new("original-name", 384);
2007 let id = collective.id;
2008 storage.save_collective(&collective).unwrap();
2009
2010 collective.name = "updated-name".to_string();
2012 storage.save_collective(&collective).unwrap();
2013
2014 let retrieved = storage.get_collective(id).unwrap().unwrap();
2015 assert_eq!(retrieved.name, "updated-name");
2016
2017 Box::new(storage).close().unwrap();
2018 }
2019
2020 #[test]
2021 fn test_list_collectives_empty() {
2022 let dir = tempdir().unwrap();
2023 let path = dir.path().join("test.db");
2024 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2025
2026 let collectives = storage.list_collectives().unwrap();
2027 assert!(collectives.is_empty());
2028
2029 Box::new(storage).close().unwrap();
2030 }
2031
2032 #[test]
2033 fn test_list_collectives_returns_all() {
2034 let dir = tempdir().unwrap();
2035 let path = dir.path().join("test.db");
2036 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2037
2038 let c1 = Collective::new("project-alpha", 384);
2039 let c2 = Collective::new("project-beta", 384);
2040 let c3 = Collective::new("project-gamma", 384);
2041
2042 storage.save_collective(&c1).unwrap();
2043 storage.save_collective(&c2).unwrap();
2044 storage.save_collective(&c3).unwrap();
2045
2046 let collectives = storage.list_collectives().unwrap();
2047 assert_eq!(collectives.len(), 3);
2048
2049 let ids: Vec<CollectiveId> = collectives.iter().map(|c| c.id).collect();
2051 assert!(ids.contains(&c1.id));
2052 assert!(ids.contains(&c2.id));
2053 assert!(ids.contains(&c3.id));
2054
2055 Box::new(storage).close().unwrap();
2056 }
2057
2058 #[test]
2059 fn test_delete_collective_existing() {
2060 let dir = tempdir().unwrap();
2061 let path = dir.path().join("test.db");
2062 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2063
2064 let collective = Collective::new("to-delete", 384);
2065 let id = collective.id;
2066 storage.save_collective(&collective).unwrap();
2067
2068 let deleted = storage.delete_collective(id).unwrap();
2070 assert!(deleted);
2071
2072 assert!(storage.get_collective(id).unwrap().is_none());
2074
2075 Box::new(storage).close().unwrap();
2076 }
2077
2078 #[test]
2079 fn test_delete_collective_nonexistent() {
2080 let dir = tempdir().unwrap();
2081 let path = dir.path().join("test.db");
2082 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2083
2084 let deleted = storage.delete_collective(CollectiveId::new()).unwrap();
2085 assert!(!deleted);
2086
2087 Box::new(storage).close().unwrap();
2088 }
2089
2090 #[test]
2095 fn test_uncommitted_transaction_is_invisible() {
2096 let dir = tempdir().unwrap();
2099 let path = dir.path().join("test.db");
2100 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2101
2102 let collective = Collective::new("phantom", 384);
2103 let id = collective.id;
2104 let bytes = bincode::serialize(&collective).unwrap();
2105
2106 {
2108 let write_txn = storage.database().begin_write().unwrap();
2109 {
2110 let mut table = write_txn.open_table(COLLECTIVES_TABLE).unwrap();
2111 table.insert(id.as_bytes(), bytes.as_slice()).unwrap();
2112 }
2113 }
2115
2116 let result = storage.get_collective(id).unwrap();
2118 assert!(result.is_none(), "Uncommitted data must not be visible");
2119
2120 Box::new(storage).close().unwrap();
2121 }
2122
2123 #[test]
2124 fn test_committed_transaction_is_visible() {
2125 let dir = tempdir().unwrap();
2128 let path = dir.path().join("test.db");
2129 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2130
2131 let collective = Collective::new("committed", 384);
2132 let id = collective.id;
2133
2134 storage.save_collective(&collective).unwrap();
2135
2136 let result = storage.get_collective(id).unwrap();
2137 assert!(result.is_some(), "Committed data must be visible");
2138
2139 Box::new(storage).close().unwrap();
2140 }
2141
2142 #[test]
2143 fn test_multi_table_atomicity() {
2144 let dir = tempdir().unwrap();
2148 let path = dir.path().join("test.db");
2149 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2150
2151 let collective = Collective::new("multi-table", 384);
2152 let id = collective.id;
2153 let collective_bytes = bincode::serialize(&collective).unwrap();
2154
2155 let write_txn = storage.database().begin_write().unwrap();
2157 {
2158 let mut coll_table = write_txn.open_table(COLLECTIVES_TABLE).unwrap();
2159 coll_table
2160 .insert(id.as_bytes(), collective_bytes.as_slice())
2161 .unwrap();
2162 }
2163 {
2164 let mut meta_table = write_txn.open_table(METADATA_TABLE).unwrap();
2165 meta_table
2166 .insert("test_marker", b"multi_table_test".as_slice())
2167 .unwrap();
2168 }
2169 write_txn.commit().unwrap();
2170
2171 let coll = storage.get_collective(id).unwrap();
2173 assert!(coll.is_some(), "Collective from multi-table txn must exist");
2174
2175 let read_txn = storage.database().begin_read().unwrap();
2176 let meta_table = read_txn.open_table(METADATA_TABLE).unwrap();
2177 let marker = meta_table.get("test_marker").unwrap();
2178 assert!(marker.is_some(), "Metadata from multi-table txn must exist");
2179
2180 Box::new(storage).close().unwrap();
2181 }
2182
2183 #[test]
2184 fn test_mvcc_read_consistency() {
2185 let dir = tempdir().unwrap();
2199 let path = dir.path().join("test.db");
2200 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2201
2202 let c1 = Collective::new("alpha", 384);
2204 let c2 = Collective::new("beta", 384);
2205 let c3 = Collective::new("gamma", 384);
2206
2207 storage.save_collective(&c1).unwrap();
2208 storage.save_collective(&c2).unwrap();
2209 storage.save_collective(&c3).unwrap();
2210
2211 storage.delete_collective(c2.id).unwrap();
2213
2214 let read_txn = storage.database().begin_read().unwrap();
2217 let table = read_txn.open_table(COLLECTIVES_TABLE).unwrap();
2218
2219 assert!(
2220 table.get(c1.id.as_bytes()).unwrap().is_some(),
2221 "c1 must be visible (committed)"
2222 );
2223 assert!(
2224 table.get(c2.id.as_bytes()).unwrap().is_none(),
2225 "c2 must be absent (deleted)"
2226 );
2227 assert!(
2228 table.get(c3.id.as_bytes()).unwrap().is_some(),
2229 "c3 must be visible (committed)"
2230 );
2231
2232 let count = table.iter().unwrap().count();
2234 assert_eq!(count, 2, "Exactly 2 collectives should exist");
2236
2237 drop(table);
2238 drop(read_txn);
2239
2240 Box::new(storage).close().unwrap();
2241 }
2242
2243 #[test]
2248 fn test_corruption_detection_invalid_metadata_bytes() {
2249 let dir = tempdir().unwrap();
2252 let path = dir.path().join("corrupt.db");
2253
2254 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2256 let write_txn = storage.database().begin_write().unwrap();
2257 {
2258 let mut meta = write_txn.open_table(METADATA_TABLE).unwrap();
2259 meta.insert(METADATA_KEY, b"not-valid-bincode-data".as_slice())
2260 .unwrap();
2261 }
2262 write_txn.commit().unwrap();
2263 Box::new(storage).close().unwrap();
2264
2265 let result = RedbStorage::open(&path, &default_config());
2267 assert!(result.is_err(), "Corrupted metadata must be rejected");
2268 let err = result.unwrap_err();
2269 match err {
2270 PulseDBError::Storage(StorageError::Corrupted(msg)) => {
2271 assert!(
2272 msg.contains("Invalid metadata format"),
2273 "Error should mention invalid format, got: {}",
2274 msg
2275 );
2276 }
2277 other => panic!("Expected StorageError::Corrupted, got: {:?}", other),
2278 }
2279 }
2280
2281 #[test]
2282 fn test_corruption_detection_missing_metadata_key() {
2283 let dir = tempdir().unwrap();
2286 let path = dir.path().join("no_key.db");
2287
2288 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2290 let write_txn = storage.database().begin_write().unwrap();
2291 {
2292 let mut meta = write_txn.open_table(METADATA_TABLE).unwrap();
2293 meta.remove(METADATA_KEY).unwrap();
2294 }
2295 write_txn.commit().unwrap();
2296 Box::new(storage).close().unwrap();
2297
2298 let result = RedbStorage::open(&path, &default_config());
2300 assert!(result.is_err(), "Missing metadata key must be rejected");
2301 let err = result.unwrap_err();
2302 match err {
2303 PulseDBError::Storage(StorageError::Corrupted(msg)) => {
2304 assert!(
2305 msg.contains("Missing database metadata"),
2306 "Error should mention missing metadata, got: {}",
2307 msg
2308 );
2309 }
2310 other => panic!("Expected StorageError::Corrupted, got: {:?}", other),
2311 }
2312 }
2313
2314 #[test]
2315 fn test_corruption_detection_missing_metadata_table() {
2316 let dir = tempdir().unwrap();
2320 let path = dir.path().join("no_table.db");
2321
2322 {
2324 let db = ::redb::Database::create(&path).unwrap();
2325 let write_txn = db.begin_write().unwrap();
2326 {
2327 let dummy: ::redb::TableDefinition<&str, &str> =
2328 ::redb::TableDefinition::new("dummy");
2329 let mut table = write_txn.open_table(dummy).unwrap();
2330 table.insert("key", "value").unwrap();
2331 }
2332 write_txn.commit().unwrap();
2333 }
2334
2335 let result = RedbStorage::open(&path, &default_config());
2337 assert!(result.is_err(), "Missing metadata table must be rejected");
2338 let err = result.unwrap_err();
2339 match err {
2340 PulseDBError::Storage(StorageError::Corrupted(msg)) => {
2341 assert!(
2342 msg.contains("Cannot open metadata table"),
2343 "Error should mention metadata table, got: {}",
2344 msg
2345 );
2346 }
2347 other => panic!("Expected StorageError::Corrupted, got: {:?}", other),
2348 }
2349 }
2350
2351 use crate::experience::{Experience, ExperienceType, ExperienceUpdate, Severity};
2356 use crate::types::{AgentId, ExperienceId, Timestamp};
2357
2358 fn test_experience(collective_id: CollectiveId, dim: usize) -> Experience {
2360 Experience {
2361 id: ExperienceId::new(),
2362 collective_id,
2363 content: "Test experience content".into(),
2364 embedding: vec![0.42; dim],
2365 experience_type: ExperienceType::Fact {
2366 statement: "redb uses shadow paging".into(),
2367 source: "docs".into(),
2368 },
2369 importance: 0.8,
2370 confidence: 0.7,
2371 applications: 0,
2372 domain: vec!["rust".into(), "databases".into()],
2373 related_files: vec!["src/storage/redb.rs".into()],
2374 source_agent: AgentId::new("test-agent"),
2375 source_task: None,
2376 timestamp: Timestamp::now(),
2377 archived: false,
2378 }
2379 }
2380
2381 #[test]
2382 fn test_save_and_get_experience() {
2383 let dir = tempdir().unwrap();
2384 let path = dir.path().join("test.db");
2385 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2386
2387 let collective = Collective::new("test", 384);
2388 storage.save_collective(&collective).unwrap();
2389
2390 let exp = test_experience(collective.id, 384);
2391 let exp_id = exp.id;
2392
2393 storage.save_experience(&exp).unwrap();
2394
2395 let retrieved = storage.get_experience(exp_id).unwrap().unwrap();
2396 assert_eq!(retrieved.id, exp_id);
2397 assert_eq!(retrieved.collective_id, collective.id);
2398 assert_eq!(retrieved.content, "Test experience content");
2399 assert_eq!(retrieved.importance, 0.8);
2400 assert_eq!(retrieved.confidence, 0.7);
2401 assert_eq!(retrieved.applications, 0);
2402 assert_eq!(retrieved.domain, vec!["rust", "databases"]);
2403 assert!(!retrieved.archived);
2404 assert_eq!(retrieved.embedding.len(), 384);
2406 assert_eq!(retrieved.embedding[0], 0.42);
2407
2408 Box::new(storage).close().unwrap();
2409 }
2410
2411 #[test]
2412 fn test_get_nonexistent_experience_returns_none() {
2413 let dir = tempdir().unwrap();
2414 let path = dir.path().join("test.db");
2415 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2416
2417 let result = storage.get_experience(ExperienceId::new()).unwrap();
2418 assert!(result.is_none());
2419
2420 Box::new(storage).close().unwrap();
2421 }
2422
2423 #[test]
2424 fn test_update_experience_fields() {
2425 let dir = tempdir().unwrap();
2426 let path = dir.path().join("test.db");
2427 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2428
2429 let collective = Collective::new("test", 384);
2430 storage.save_collective(&collective).unwrap();
2431
2432 let exp = test_experience(collective.id, 384);
2433 let exp_id = exp.id;
2434 storage.save_experience(&exp).unwrap();
2435
2436 let update = ExperienceUpdate {
2438 importance: Some(0.95),
2439 domain: Some(vec!["updated-tag".into()]),
2440 ..Default::default()
2441 };
2442 let updated = storage.update_experience(exp_id, &update).unwrap();
2443 assert!(updated);
2444
2445 let retrieved = storage.get_experience(exp_id).unwrap().unwrap();
2446 assert_eq!(retrieved.importance, 0.95);
2447 assert_eq!(retrieved.domain, vec!["updated-tag"]);
2448 assert_eq!(retrieved.confidence, 0.7);
2450
2451 Box::new(storage).close().unwrap();
2452 }
2453
2454 #[test]
2455 fn test_update_nonexistent_experience_returns_false() {
2456 let dir = tempdir().unwrap();
2457 let path = dir.path().join("test.db");
2458 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2459
2460 let update = ExperienceUpdate {
2461 importance: Some(0.5),
2462 ..Default::default()
2463 };
2464 let result = storage
2465 .update_experience(ExperienceId::new(), &update)
2466 .unwrap();
2467 assert!(!result);
2468
2469 Box::new(storage).close().unwrap();
2470 }
2471
2472 #[test]
2473 fn test_delete_experience() {
2474 let dir = tempdir().unwrap();
2475 let path = dir.path().join("test.db");
2476 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2477
2478 let collective = Collective::new("test", 384);
2479 storage.save_collective(&collective).unwrap();
2480
2481 let exp = test_experience(collective.id, 384);
2482 let exp_id = exp.id;
2483 storage.save_experience(&exp).unwrap();
2484
2485 assert!(storage.get_experience(exp_id).unwrap().is_some());
2487
2488 let deleted = storage.delete_experience(exp_id).unwrap();
2490 assert!(deleted);
2491
2492 assert!(storage.get_experience(exp_id).unwrap().is_none());
2494 assert!(storage.get_embedding(exp_id).unwrap().is_none());
2495
2496 assert_eq!(
2498 storage
2499 .count_experiences_in_collective(collective.id)
2500 .unwrap(),
2501 0
2502 );
2503
2504 Box::new(storage).close().unwrap();
2505 }
2506
2507 #[test]
2508 fn test_delete_nonexistent_experience_returns_false() {
2509 let dir = tempdir().unwrap();
2510 let path = dir.path().join("test.db");
2511 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2512
2513 let result = storage.delete_experience(ExperienceId::new()).unwrap();
2514 assert!(!result);
2515
2516 Box::new(storage).close().unwrap();
2517 }
2518
2519 #[test]
2520 fn test_save_and_get_embedding() {
2521 let dir = tempdir().unwrap();
2522 let path = dir.path().join("test.db");
2523 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2524
2525 let id = ExperienceId::new();
2526 let embedding = vec![0.1, 0.2, 0.3, -0.5, 1.0, f32::MIN_POSITIVE];
2527
2528 storage.save_embedding(id, &embedding).unwrap();
2529
2530 let retrieved = storage.get_embedding(id).unwrap().unwrap();
2531 assert_eq!(retrieved, embedding);
2532
2533 Box::new(storage).close().unwrap();
2534 }
2535
2536 #[test]
2537 fn test_experience_by_collective_index() {
2538 let dir = tempdir().unwrap();
2539 let path = dir.path().join("test.db");
2540 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2541
2542 let collective = Collective::new("test", 384);
2543 storage.save_collective(&collective).unwrap();
2544
2545 for _ in 0..3 {
2547 let exp = test_experience(collective.id, 384);
2548 storage.save_experience(&exp).unwrap();
2549 }
2550
2551 assert_eq!(
2553 storage
2554 .count_experiences_in_collective(collective.id)
2555 .unwrap(),
2556 3
2557 );
2558
2559 Box::new(storage).close().unwrap();
2560 }
2561
2562 #[test]
2563 fn test_cascade_delete_includes_experiences() {
2564 let dir = tempdir().unwrap();
2565 let path = dir.path().join("test.db");
2566 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2567
2568 let collective = Collective::new("test", 384);
2569 storage.save_collective(&collective).unwrap();
2570
2571 let exp1 = test_experience(collective.id, 384);
2572 let exp2 = test_experience(collective.id, 384);
2573 let id1 = exp1.id;
2574 let id2 = exp2.id;
2575 storage.save_experience(&exp1).unwrap();
2576 storage.save_experience(&exp2).unwrap();
2577
2578 let count = storage
2580 .delete_experiences_by_collective(collective.id)
2581 .unwrap();
2582 assert_eq!(count, 2);
2583
2584 assert!(storage.get_experience(id1).unwrap().is_none());
2586 assert!(storage.get_experience(id2).unwrap().is_none());
2587 assert!(storage.get_embedding(id1).unwrap().is_none());
2588 assert!(storage.get_embedding(id2).unwrap().is_none());
2589
2590 Box::new(storage).close().unwrap();
2591 }
2592
2593 #[test]
2594 fn test_update_experience_archived_flag() {
2595 let dir = tempdir().unwrap();
2596 let path = dir.path().join("test.db");
2597 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2598
2599 let collective = Collective::new("test", 384);
2600 storage.save_collective(&collective).unwrap();
2601
2602 let exp = test_experience(collective.id, 384);
2603 let exp_id = exp.id;
2604 storage.save_experience(&exp).unwrap();
2605
2606 let update = ExperienceUpdate {
2608 archived: Some(true),
2609 ..Default::default()
2610 };
2611 storage.update_experience(exp_id, &update).unwrap();
2612
2613 let retrieved = storage.get_experience(exp_id).unwrap().unwrap();
2614 assert!(retrieved.archived);
2615
2616 let update = ExperienceUpdate {
2618 archived: Some(false),
2619 ..Default::default()
2620 };
2621 storage.update_experience(exp_id, &update).unwrap();
2622
2623 let retrieved = storage.get_experience(exp_id).unwrap().unwrap();
2624 assert!(!retrieved.archived);
2625
2626 Box::new(storage).close().unwrap();
2627 }
2628
2629 #[test]
2630 fn test_f32_byte_conversion_roundtrip() {
2631 let original = vec![0.0, 1.0, -1.0, f32::MAX, f32::MIN, std::f32::consts::PI];
2632 let bytes = f32_slice_to_bytes(&original);
2633 assert_eq!(bytes.len(), original.len() * 4);
2634
2635 let restored = bytes_to_f32_vec(&bytes);
2636 assert_eq!(original, restored);
2637 }
2638
2639 #[test]
2640 fn test_experience_with_all_type_variants() {
2641 let dir = tempdir().unwrap();
2642 let path = dir.path().join("test.db");
2643 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2644
2645 let collective = Collective::new("test", 384);
2646 storage.save_collective(&collective).unwrap();
2647
2648 let types = vec![
2650 ExperienceType::Difficulty {
2651 description: "test".into(),
2652 severity: Severity::High,
2653 },
2654 ExperienceType::Solution {
2655 problem_ref: None,
2656 approach: "test".into(),
2657 worked: true,
2658 },
2659 ExperienceType::ErrorPattern {
2660 signature: "E0308".into(),
2661 fix: "check types".into(),
2662 prevention: "use clippy".into(),
2663 },
2664 ExperienceType::SuccessPattern {
2665 task_type: "refactor".into(),
2666 approach: "extract method".into(),
2667 quality: 0.9,
2668 },
2669 ExperienceType::UserPreference {
2670 category: "style".into(),
2671 preference: "snake_case".into(),
2672 strength: 1.0,
2673 },
2674 ExperienceType::ArchitecturalDecision {
2675 decision: "use redb".into(),
2676 rationale: "pure Rust".into(),
2677 },
2678 ExperienceType::TechInsight {
2679 technology: "tokio".into(),
2680 insight: "spawn_blocking".into(),
2681 },
2682 ExperienceType::Fact {
2683 statement: "Rust is safe".into(),
2684 source: "docs".into(),
2685 },
2686 ExperienceType::Generic { category: None },
2687 ];
2688
2689 for experience_type in types {
2690 let mut exp = test_experience(collective.id, 384);
2691 exp.experience_type = experience_type;
2692 storage.save_experience(&exp).unwrap();
2693
2694 let retrieved = storage.get_experience(exp.id).unwrap().unwrap();
2696 assert_eq!(
2697 retrieved.experience_type.type_tag(),
2698 exp.experience_type.type_tag()
2699 );
2700 }
2701
2702 assert_eq!(
2703 storage
2704 .count_experiences_in_collective(collective.id)
2705 .unwrap(),
2706 9
2707 );
2708
2709 Box::new(storage).close().unwrap();
2710 }
2711
2712 #[test]
2713 fn test_reinforce_experience_atomic() {
2714 let dir = tempdir().unwrap();
2715 let path = dir.path().join("test.db");
2716 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2717
2718 let collective = Collective::new("test", 384);
2719 storage.save_collective(&collective).unwrap();
2720
2721 let exp = test_experience(collective.id, 384);
2722 let exp_id = exp.id;
2723 storage.save_experience(&exp).unwrap();
2724
2725 assert_eq!(storage.reinforce_experience(exp_id).unwrap(), Some(1));
2727 assert_eq!(storage.reinforce_experience(exp_id).unwrap(), Some(2));
2728 assert_eq!(storage.reinforce_experience(exp_id).unwrap(), Some(3));
2729
2730 let retrieved = storage.get_experience(exp_id).unwrap().unwrap();
2732 assert_eq!(retrieved.applications, 3);
2733
2734 let emb = storage.get_embedding(exp_id).unwrap().unwrap();
2736 assert_eq!(emb.len(), 384);
2737
2738 Box::new(storage).close().unwrap();
2739 }
2740
2741 #[test]
2742 fn test_reinforce_experience_nonexistent() {
2743 let dir = tempdir().unwrap();
2744 let path = dir.path().join("test.db");
2745 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2746
2747 let result = storage.reinforce_experience(ExperienceId::new()).unwrap();
2748 assert!(result.is_none());
2749
2750 Box::new(storage).close().unwrap();
2751 }
2752
2753 #[test]
2758 fn test_wal_sequence_starts_at_zero() {
2759 let dir = tempdir().unwrap();
2760 let path = dir.path().join("test.db");
2761 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2762
2763 assert_eq!(storage.get_wal_sequence().unwrap(), 0);
2764
2765 Box::new(storage).close().unwrap();
2766 }
2767
2768 #[test]
2769 fn test_save_experience_increments_wal_sequence() {
2770 let dir = tempdir().unwrap();
2771 let path = dir.path().join("test.db");
2772 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2773
2774 let collective = Collective::new("test", 384);
2775 storage.save_collective(&collective).unwrap();
2776 assert_eq!(storage.get_wal_sequence().unwrap(), 1);
2778
2779 let exp1 = test_experience(collective.id, 384);
2780 storage.save_experience(&exp1).unwrap();
2781 assert_eq!(storage.get_wal_sequence().unwrap(), 2);
2782
2783 let exp2 = test_experience(collective.id, 384);
2784 storage.save_experience(&exp2).unwrap();
2785 assert_eq!(storage.get_wal_sequence().unwrap(), 3);
2786
2787 Box::new(storage).close().unwrap();
2788 }
2789
2790 #[test]
2791 fn test_poll_watch_events_returns_correct_events() {
2792 let dir = tempdir().unwrap();
2793 let path = dir.path().join("test.db");
2794 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2795
2796 let collective = Collective::new("test", 384);
2797 storage.save_collective(&collective).unwrap();
2798 let exp1 = test_experience(collective.id, 384);
2801 let exp2 = test_experience(collective.id, 384);
2802 let exp3 = test_experience(collective.id, 384);
2803 storage.save_experience(&exp1).unwrap();
2804 storage.save_experience(&exp2).unwrap();
2805 storage.save_experience(&exp3).unwrap();
2806
2807 let (events, max_seq) = storage.poll_watch_events(0, 100).unwrap();
2809 assert_eq!(events.len(), 4);
2810 assert_eq!(max_seq, 4);
2811 assert!(events
2813 .iter()
2814 .all(|e| e.event_type == WatchEventTypeTag::Created));
2815 assert_eq!(events[1].entity_id, *exp1.id.as_bytes());
2817 assert_eq!(events[2].entity_id, *exp2.id.as_bytes());
2818 assert_eq!(events[3].entity_id, *exp3.id.as_bytes());
2819
2820 Box::new(storage).close().unwrap();
2821 }
2822
2823 #[test]
2824 fn test_poll_watch_events_since_midpoint() {
2825 let dir = tempdir().unwrap();
2826 let path = dir.path().join("test.db");
2827 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2828
2829 let collective = Collective::new("test", 384);
2830 storage.save_collective(&collective).unwrap();
2831
2832 for _ in 0..5 {
2834 let exp = test_experience(collective.id, 384);
2835 storage.save_experience(&exp).unwrap();
2836 }
2837
2838 let (events, max_seq) = storage.poll_watch_events(4, 100).unwrap();
2841 assert_eq!(events.len(), 2);
2842 assert_eq!(max_seq, 6);
2843
2844 Box::new(storage).close().unwrap();
2845 }
2846
2847 #[test]
2848 fn test_poll_watch_events_empty_when_caught_up() {
2849 let dir = tempdir().unwrap();
2850 let path = dir.path().join("test.db");
2851 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2852
2853 let collective = Collective::new("test", 384);
2854 storage.save_collective(&collective).unwrap();
2855
2856 let exp = test_experience(collective.id, 384);
2857 storage.save_experience(&exp).unwrap();
2858
2859 let (events, max_seq) = storage.poll_watch_events(0, 100).unwrap();
2861 assert_eq!(events.len(), 2);
2862 assert_eq!(max_seq, 2);
2863
2864 let (events, max_seq) = storage.poll_watch_events(2, 100).unwrap();
2866 assert_eq!(events.len(), 0);
2867 assert_eq!(max_seq, 2); Box::new(storage).close().unwrap();
2870 }
2871
2872 #[test]
2873 fn test_delete_records_watch_event() {
2874 let dir = tempdir().unwrap();
2875 let path = dir.path().join("test.db");
2876 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2877
2878 let collective = Collective::new("test", 384);
2879 storage.save_collective(&collective).unwrap();
2880
2881 let exp = test_experience(collective.id, 384);
2882 storage.save_experience(&exp).unwrap();
2883 storage.delete_experience(exp.id).unwrap();
2884
2885 let (events, max_seq) = storage.poll_watch_events(0, 100).unwrap();
2887 assert_eq!(events.len(), 3);
2888 assert_eq!(max_seq, 3);
2889 assert_eq!(events[0].event_type, WatchEventTypeTag::Created); assert_eq!(events[1].event_type, WatchEventTypeTag::Created); assert_eq!(events[2].event_type, WatchEventTypeTag::Deleted); assert_eq!(events[2].entity_id, *exp.id.as_bytes());
2893
2894 Box::new(storage).close().unwrap();
2895 }
2896
2897 #[test]
2898 fn test_update_records_watch_event() {
2899 let dir = tempdir().unwrap();
2900 let path = dir.path().join("test.db");
2901 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2902
2903 let collective = Collective::new("test", 384);
2904 storage.save_collective(&collective).unwrap();
2905
2906 let exp = test_experience(collective.id, 384);
2907 storage.save_experience(&exp).unwrap();
2908
2909 let update = ExperienceUpdate {
2910 importance: Some(0.99),
2911 ..Default::default()
2912 };
2913 storage.update_experience(exp.id, &update).unwrap();
2914
2915 let (events, _) = storage.poll_watch_events(0, 100).unwrap();
2917 assert_eq!(events.len(), 3);
2918 assert_eq!(events[2].event_type, WatchEventTypeTag::Updated);
2919
2920 Box::new(storage).close().unwrap();
2921 }
2922
2923 #[test]
2924 fn test_reinforce_records_watch_event() {
2925 let dir = tempdir().unwrap();
2926 let path = dir.path().join("test.db");
2927 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2928
2929 let collective = Collective::new("test", 384);
2930 storage.save_collective(&collective).unwrap();
2931
2932 let exp = test_experience(collective.id, 384);
2933 storage.save_experience(&exp).unwrap();
2934 storage.reinforce_experience(exp.id).unwrap();
2935
2936 let (events, _) = storage.poll_watch_events(0, 100).unwrap();
2938 assert_eq!(events.len(), 3);
2939 assert_eq!(events[1].event_type, WatchEventTypeTag::Created);
2940 assert_eq!(events[2].event_type, WatchEventTypeTag::Updated);
2941
2942 Box::new(storage).close().unwrap();
2943 }
2944
2945 #[test]
2946 fn test_archive_records_archived_event() {
2947 let dir = tempdir().unwrap();
2948 let path = dir.path().join("test.db");
2949 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2950
2951 let collective = Collective::new("test", 384);
2952 storage.save_collective(&collective).unwrap();
2953
2954 let exp = test_experience(collective.id, 384);
2955 storage.save_experience(&exp).unwrap();
2956
2957 let update = ExperienceUpdate {
2958 archived: Some(true),
2959 ..Default::default()
2960 };
2961 storage.update_experience(exp.id, &update).unwrap();
2962
2963 let (events, _) = storage.poll_watch_events(0, 100).unwrap();
2965 assert_eq!(events.len(), 3);
2966 assert_eq!(events[2].event_type, WatchEventTypeTag::Archived);
2967
2968 Box::new(storage).close().unwrap();
2969 }
2970
2971 #[test]
2972 fn test_poll_watch_events_batch_limit() {
2973 let dir = tempdir().unwrap();
2974 let path = dir.path().join("test.db");
2975 let storage = RedbStorage::open(&path, &default_config()).unwrap();
2976
2977 let collective = Collective::new("test", 384);
2978 storage.save_collective(&collective).unwrap();
2979
2980 for _ in 0..10 {
2982 let exp = test_experience(collective.id, 384);
2983 storage.save_experience(&exp).unwrap();
2984 }
2985
2986 let (events, max_seq) = storage.poll_watch_events(0, 3).unwrap();
2988 assert_eq!(events.len(), 3);
2989 assert_eq!(max_seq, 3);
2990
2991 let (events, max_seq) = storage.poll_watch_events(3, 3).unwrap();
2993 assert_eq!(events.len(), 3);
2994 assert_eq!(max_seq, 6);
2995
2996 Box::new(storage).close().unwrap();
2997 }
2998}