1use std::collections::{BTreeMap, HashMap, HashSet};
41use std::fs;
42use std::path::{Path, PathBuf};
43use std::sync::Arc;
44
45use roaring::RoaringBitmap;
46
47use crate::descriptor::Descriptor;
48use crate::error::{CoreError, Result};
49use crate::ids::{CollectionId, Lsn};
50use crate::keyring::{KeyRing, SingleCodecKeyRing};
51use crate::manifest::{
52 self, CollectionEntry, IndexSnapshotRef, MANIFEST_FORMAT_VERSION, Manifest, SegmentRef,
53};
54use crate::page::{PageCodec, PageType};
55use crate::paged::{fsync_dir, read_paged, write_paged};
56use crate::sec::{self, SecPredicate};
57use crate::segment::{self, SealRow, SealedSegment};
58use crate::wal::{self, WalEntry, WalOp, WalWriter};
59
60const COMPACT_MIN_SEGMENTS: usize = 8;
63
64#[derive(Debug, Clone, PartialEq)]
66pub struct Record {
67 pub vector: Vec<f32>,
69 pub payload: Vec<u8>,
71}
72
73#[derive(Debug, Default)]
78pub struct RecoveryTail {
79 pub upserts: Vec<(String, Record)>,
81 pub deleted: Vec<String>,
84}
85
86#[derive(Debug, Clone, Copy)]
89enum Loc {
90 Active(u32),
91 Sealed { seg: u32, row: u32 },
92}
93
94#[derive(Debug, Clone)]
97struct ActiveRow {
98 vector: Vec<u8>,
99 payload: Vec<u8>,
100}
101
102struct CollectionState {
104 id: CollectionId,
105 name: String,
106 descriptor: Descriptor,
107 codec: Box<dyn PageCodec>,
112 stride: usize,
114 primary: BTreeMap<String, Loc>,
117 sealed: Vec<SealedSegment>,
119 segments_meta: Vec<SegmentRef>,
121 active: Vec<ActiveRow>,
123 active_index: BTreeMap<String, u32>,
125 dead_this_window: BTreeMap<u32, RoaringBitmap>,
128 index_snapshot: Option<IndexSnapshotRef>,
131}
132
133impl CollectionState {
134 fn new(
135 id: CollectionId,
136 name: String,
137 descriptor: Descriptor,
138 codec: Box<dyn PageCodec>,
139 ) -> Self {
140 let stride = descriptor.stride();
141 Self {
142 id,
143 name,
144 descriptor,
145 codec,
146 stride,
147 primary: BTreeMap::new(),
148 sealed: Vec::new(),
149 segments_meta: Vec::new(),
150 active: Vec::new(),
151 active_index: BTreeMap::new(),
152 dead_this_window: BTreeMap::new(),
153 index_snapshot: None,
154 }
155 }
156
157 fn has_pending(&self) -> bool {
158 !self.active_index.is_empty() || !self.dead_this_window.is_empty()
159 }
160
161 fn apply_upsert(&mut self, external_id: &str, vector: Vec<u8>, payload: Vec<u8>) {
165 if let Some(Loc::Sealed { seg, row }) = self.primary.get(external_id).copied() {
166 self.dead_this_window.entry(seg).or_default().insert(row);
167 }
168 let row = self.active.len() as u32;
169 self.active.push(ActiveRow { vector, payload });
170 self.active_index.insert(external_id.to_owned(), row);
171 self.primary
172 .insert(external_id.to_owned(), Loc::Active(row));
173 }
174
175 fn apply_delete(&mut self, external_id: &str) -> bool {
179 match self.primary.remove(external_id) {
180 Some(Loc::Sealed { seg, row }) => {
181 self.dead_this_window.entry(seg).or_default().insert(row);
182 self.active_index.remove(external_id);
183 true
184 }
185 Some(Loc::Active(_)) => {
186 self.active_index.remove(external_id);
187 true
188 }
189 None => false,
190 }
191 }
192}
193
194struct PendingSegment {
197 seg_ref: SegmentRef,
198 sealed: SealedSegment,
199}
200
201pub type CommitObserver = Arc<dyn Fn(&WalEntry) + Send + Sync>;
206
207pub struct Store {
209 dir: PathBuf,
210 keyring: Box<dyn KeyRing>,
211 collections: HashMap<CollectionId, CollectionState>,
212 name_index: HashMap<String, CollectionId>,
213 next_lsn: Lsn,
214 next_collection_id: u64,
215 next_segment_id: u64,
216 manifest_version: u64,
217 last_checkpointed_lsn: Lsn,
218 wal: WalWriter,
219 wal_seq: u64,
220 commit_observer: Option<CommitObserver>,
221}
222
223impl Store {
224 pub fn open(dir: &Path) -> Result<Self> {
227 Self::open_with_keyring(dir, Box::new(SingleCodecKeyRing::plaintext()))
228 }
229
230 pub fn open_with_codec(dir: &Path, codec: Box<dyn PageCodec>) -> Result<Self> {
234 Self::open_with_keyring(dir, Box::new(SingleCodecKeyRing::new(codec)))
235 }
236
237 pub fn open_with_keyring(dir: &Path, keyring: Box<dyn KeyRing>) -> Result<Self> {
243 fs::create_dir_all(dir).map_err(|e| CoreError::io(dir, e))?;
244 let wal_dir = dir.join("wal");
245 fs::create_dir_all(&wal_dir).map_err(|e| CoreError::io(&wal_dir, e))?;
246 fsync_dir(dir)?;
247 fsync_dir(&wal_dir)?;
248
249 let mfst = manifest::read_current(dir, keyring.catalog_codec())?.unwrap_or_default();
251
252 let mut collections: HashMap<CollectionId, CollectionState> = HashMap::new();
256 let mut name_index: HashMap<String, CollectionId> = HashMap::new();
257 for entry in &mfst.collections {
258 let descriptor = Descriptor::decode(&entry.descriptor)?;
259 let codec = keyring.collection_codec(entry.id)?;
260 let mut state = CollectionState::new(entry.id, entry.name.clone(), descriptor, codec);
261 state.segments_meta = entry.segments.clone();
262 state.index_snapshot = entry.index_snapshot.clone();
263 let seg_dir = segments_dir(dir, entry.id);
264 for seg in &entry.segments {
265 let sealed = segment::open_segment(&seg_dir, seg.id, state.codec.as_ref())?;
266 let seg_idx = state.sealed.len() as u32;
267 for (row, ext_id) in sealed.row_ids().iter().enumerate() {
268 let row = row as u32;
269 if !sealed.is_dead(row) {
270 state
271 .primary
272 .insert(ext_id.clone(), Loc::Sealed { seg: seg_idx, row });
273 }
274 }
275 state.sealed.push(sealed);
276 }
277 name_index.insert(state.name.clone(), state.id);
278 collections.insert(state.id, state);
279 }
280
281 let floor = mfst.last_checkpointed_lsn;
283 let mut max_lsn = floor;
284 let wal_files = list_wal_files(&wal_dir)?;
285 let mut max_seq = 0u64;
286 let mut keep_seqs: HashSet<u64> = HashSet::new();
287 for (seq, path) in &wal_files {
288 max_seq = (*seq).max(max_seq);
289 let replay = wal::read_all(path, keyring.catalog_codec())?;
290 let mut had_live = false;
291 for entry in replay.entries {
292 if entry.lsn.value() <= floor.value() {
293 continue; }
295 had_live = true;
296 if entry.lsn > max_lsn {
297 max_lsn = entry.lsn;
298 }
299 apply_wal_entry(&mut collections, &mut name_index, &entry, keyring.as_ref())?;
300 }
301 if had_live {
302 keep_seqs.insert(*seq);
303 }
304 }
305 let next_lsn = max_lsn.next();
306
307 gc_orphan_segments(dir, &mfst, keyring.as_ref())?;
311 gc_orphan_index_snapshots(dir, &mfst)?;
312
313 let wal_seq = max_seq + 1;
316 let wal = WalWriter::create(&wal_file_path(&wal_dir, wal_seq), next_lsn)?;
317 fsync_dir(&wal_dir)?;
318 for (seq, path) in &wal_files {
319 if !keep_seqs.contains(seq) {
320 remove_file_if_present(path)?;
321 }
322 }
323 fsync_dir(&wal_dir)?;
324
325 Ok(Self {
326 dir: dir.to_path_buf(),
327 keyring,
328 collections,
329 name_index,
330 next_lsn,
331 next_collection_id: mfst.next_collection_id,
332 next_segment_id: mfst.next_segment_id,
333 manifest_version: mfst.version,
334 last_checkpointed_lsn: floor,
335 wal,
336 wal_seq,
337 commit_observer: None,
338 })
339 }
340
341 pub fn set_commit_observer(&mut self, observer: CommitObserver) {
345 self.commit_observer = Some(observer);
346 }
347
348 fn publish(&self, entry: &WalEntry) {
350 if let Some(observer) = &self.commit_observer {
351 observer(entry);
352 }
353 }
354
355 pub fn replication_snapshot(&self) -> Result<Vec<WalOp>> {
361 let mut ops = Vec::new();
362 for (&id, state) in &self.collections {
363 ops.push(WalOp::CreateCollection {
364 collection_id: id,
365 name: state.name.clone(),
366 descriptor: postcard::to_allocvec(&state.descriptor)?,
367 });
368 for (external_id, record) in self.scan(id)? {
369 ops.push(WalOp::Upsert {
370 collection_id: id,
371 external_id,
372 vector: f32_to_le_bytes(&record.vector),
373 payload: record.payload,
374 });
375 }
376 }
377 Ok(ops)
378 }
379
380 pub fn apply_replicated(&mut self, op: WalOp) -> Result<()> {
386 if let WalOp::Checkpoint { .. } = op {
387 return Ok(());
388 }
389 if let WalOp::CreateCollection { collection_id, .. } = &op {
390 self.keyring.provision_collection(*collection_id)?;
393 self.next_collection_id = self.next_collection_id.max(collection_id.0 + 1);
394 }
395 let lsn = self.next_lsn;
396 let entry = WalEntry { lsn, op };
397 self.wal.append_sync(self.keyring.catalog_codec(), &entry)?;
398 self.next_lsn = lsn.next();
399 apply_wal_entry(
400 &mut self.collections,
401 &mut self.name_index,
402 &entry,
403 self.keyring.as_ref(),
404 )?;
405 self.publish(&entry);
406 Ok(())
407 }
408
409 pub fn create_collection(
411 &mut self,
412 name: &str,
413 descriptor: Descriptor,
414 ) -> Result<CollectionId> {
415 if self.name_index.contains_key(name) {
416 return Err(CoreError::AlreadyExists(format!("collection {name}")));
417 }
418 if descriptor.dim == 0 {
419 return Err(CoreError::InvalidArgument(
420 "dim must be non-zero".to_owned(),
421 ));
422 }
423 let id = CollectionId(self.next_collection_id);
424 self.keyring.provision_collection(id)?;
427 let descriptor_bytes = postcard::to_allocvec(&descriptor)?;
428 let lsn = self.next_lsn;
429 let entry = WalEntry {
430 lsn,
431 op: WalOp::CreateCollection {
432 collection_id: id,
433 name: name.to_owned(),
434 descriptor: descriptor_bytes,
435 },
436 };
437 self.wal.append_sync(self.keyring.catalog_codec(), &entry)?;
438 self.next_lsn = lsn.next();
439 self.publish(&entry);
440 self.next_collection_id += 1;
441 let codec = self.keyring.collection_codec(id)?;
442 self.collections.insert(
443 id,
444 CollectionState::new(id, name.to_owned(), descriptor, codec),
445 );
446 self.name_index.insert(name.to_owned(), id);
447 Ok(id)
448 }
449
450 pub fn drop_collection(&mut self, name: &str) -> Result<bool> {
453 let Some(&id) = self.name_index.get(name) else {
454 return Ok(false);
455 };
456 let lsn = self.next_lsn;
457 let entry = WalEntry {
458 lsn,
459 op: WalOp::DropCollection { collection_id: id },
460 };
461 self.wal.append_sync(self.keyring.catalog_codec(), &entry)?;
462 self.next_lsn = lsn.next();
463 self.publish(&entry);
464 self.collections.remove(&id);
465 self.name_index.remove(name);
466 Ok(true)
467 }
468
469 pub fn shred_collection(&mut self, name: &str) -> Result<bool> {
476 let Some(id) = self.collection_id(name) else {
477 return Ok(false);
478 };
479 self.drop_collection(name)?;
480 self.checkpoint()?;
484 self.keyring.shred_collection(id)?;
487 Ok(true)
488 }
489
490 pub fn upsert(
494 &mut self,
495 collection: CollectionId,
496 external_id: &str,
497 vector: &[f32],
498 payload: &[u8],
499 ) -> Result<Lsn> {
500 let dim = self
501 .collections
502 .get(&collection)
503 .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?
504 .descriptor
505 .dim as usize;
506 if vector.len() != dim {
507 return Err(CoreError::InvalidArgument(format!(
508 "vector has {} dims, collection expects {dim}",
509 vector.len()
510 )));
511 }
512 let vector_bytes = f32_to_le_bytes(vector);
513 let lsn = self.next_lsn;
514 let entry = WalEntry {
515 lsn,
516 op: WalOp::Upsert {
517 collection_id: collection,
518 external_id: external_id.to_owned(),
519 vector: vector_bytes.clone(),
520 payload: payload.to_vec(),
521 },
522 };
523 self.wal.append_sync(self.keyring.catalog_codec(), &entry)?;
524 self.next_lsn = lsn.next();
525 self.publish(&entry);
526 let state = self
527 .collections
528 .get_mut(&collection)
529 .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
530 state.apply_upsert(external_id, vector_bytes, payload.to_vec());
531 Ok(lsn)
532 }
533
534 pub fn upsert_batch(
544 &mut self,
545 collection: CollectionId,
546 records: &[(&str, &[f32], &[u8])],
547 ) -> Result<u64> {
548 if records.is_empty() {
549 return Ok(0);
550 }
551 let dim = self
552 .collections
553 .get(&collection)
554 .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?
555 .descriptor
556 .dim as usize;
557 for (_, vector, _) in records {
558 if vector.len() != dim {
559 return Err(CoreError::InvalidArgument(format!(
560 "vector has {} dims, collection expects {dim}",
561 vector.len()
562 )));
563 }
564 }
565
566 let mut entries: Vec<WalEntry> = Vec::with_capacity(records.len());
568 for (ext_id, vector, payload) in records {
569 let lsn = self.next_lsn;
570 self.next_lsn = lsn.next();
571 entries.push(WalEntry {
572 lsn,
573 op: WalOp::Upsert {
574 collection_id: collection,
575 external_id: ext_id.to_string(),
576 vector: f32_to_le_bytes(vector),
577 payload: payload.to_vec(),
578 },
579 });
580 }
581
582 for entry in &entries {
584 self.wal.append(self.keyring.catalog_codec(), entry)?;
585 }
586 self.wal.sync()?;
587
588 for entry in &entries {
590 self.publish(entry);
591 if let WalOp::Upsert {
592 external_id,
593 vector,
594 payload,
595 ..
596 } = &entry.op
597 {
598 let state = self
599 .collections
600 .get_mut(&collection)
601 .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
602 state.apply_upsert(external_id, vector.clone(), payload.clone());
603 }
604 }
605 Ok(records.len() as u64)
606 }
607
608 pub fn delete(&mut self, collection: CollectionId, external_id: &str) -> Result<bool> {
610 let existed = self
611 .collections
612 .get(&collection)
613 .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?
614 .primary
615 .contains_key(external_id);
616 if !existed {
617 return Ok(false);
618 }
619 let lsn = self.next_lsn;
620 let entry = WalEntry {
621 lsn,
622 op: WalOp::Delete {
623 collection_id: collection,
624 external_id: external_id.to_owned(),
625 },
626 };
627 self.wal.append_sync(self.keyring.catalog_codec(), &entry)?;
628 self.next_lsn = lsn.next();
629 self.publish(&entry);
630 let state = self
631 .collections
632 .get_mut(&collection)
633 .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
634 state.apply_delete(external_id);
635 Ok(true)
636 }
637
638 pub fn get(&self, collection: CollectionId, external_id: &str) -> Result<Option<Record>> {
640 let state = self
641 .collections
642 .get(&collection)
643 .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
644 match state.primary.get(external_id).copied() {
645 Some(loc) => Ok(Some(self.record_at(state, loc)?)),
646 None => Ok(None),
647 }
648 }
649
650 pub fn scan(&self, collection: CollectionId) -> Result<Vec<(String, Record)>> {
653 let state = self
654 .collections
655 .get(&collection)
656 .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
657 let mut out = Vec::with_capacity(state.primary.len());
658 for (id, &loc) in &state.primary {
659 out.push((id.clone(), self.record_at(state, loc)?));
660 }
661 Ok(out)
662 }
663
664 fn record_at(&self, state: &CollectionState, loc: Loc) -> Result<Record> {
667 match loc {
668 Loc::Active(r) => {
669 let row = state
670 .active
671 .get(r as usize)
672 .ok_or_else(|| CoreError::MalformedPage(format!("dangling active row {r}")))?;
673 Ok(Record {
674 vector: le_bytes_to_f32(&row.vector),
675 payload: row.payload.clone(),
676 })
677 }
678 Loc::Sealed { seg, row } => {
679 let segment = state.sealed.get(seg as usize).ok_or_else(|| {
680 CoreError::MalformedPage(format!("dangling segment index {seg}"))
681 })?;
682 let vector_bytes = segment.read_vector(state.codec.as_ref(), row, state.stride)?;
683 let payload = segment.read_payload(state.codec.as_ref(), row)?;
684 Ok(Record {
685 vector: le_bytes_to_f32(&vector_bytes),
686 payload,
687 })
688 }
689 }
690 }
691
692 #[must_use]
694 pub fn collection_id(&self, name: &str) -> Option<CollectionId> {
695 self.name_index.get(name).copied()
696 }
697
698 #[must_use]
700 pub fn descriptor(&self, collection: CollectionId) -> Option<&Descriptor> {
701 self.collections.get(&collection).map(|s| &s.descriptor)
702 }
703
704 pub fn collection_codec_clone(&self, collection: CollectionId) -> Result<Box<dyn PageCodec>> {
712 self.collections
713 .get(&collection)
714 .map(|s| s.codec.clone_box())
715 .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))
716 }
717
718 #[must_use]
720 pub fn dir(&self) -> &Path {
721 &self.dir
722 }
723
724 #[must_use]
727 pub fn manifest_version(&self) -> u64 {
728 self.manifest_version
729 }
730
731 #[must_use]
734 pub fn index_dir(&self, collection: CollectionId) -> PathBuf {
735 collection_dir(&self.dir, collection).join("index")
736 }
737
738 pub fn read_index_snapshot(&self, collection: CollectionId) -> Result<Option<Vec<u8>>> {
747 let state = self
748 .collections
749 .get(&collection)
750 .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
751 let Some(snap) = &state.index_snapshot else {
752 return Ok(None);
753 };
754 let path = self
755 .index_dir(collection)
756 .join(index_snapshot_file_name(snap.id));
757 let body = read_paged(&path, state.codec.as_ref(), PageType::IndexBlock)?;
758 Ok(Some(body))
759 }
760
761 pub fn recovery_tail(&self, collection: CollectionId) -> Result<RecoveryTail> {
769 let state = self
770 .collections
771 .get(&collection)
772 .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
773 let mut upserts = Vec::with_capacity(state.active_index.len());
774 for (ext_id, &row) in &state.active_index {
775 let ar = &state.active[row as usize];
776 upserts.push((
777 ext_id.clone(),
778 Record {
779 vector: le_bytes_to_f32(&ar.vector),
780 payload: ar.payload.clone(),
781 },
782 ));
783 }
784 let mut deleted = Vec::new();
785 for (&seg_idx, bitmap) in &state.dead_this_window {
786 if let Some(seg) = state.sealed.get(seg_idx as usize) {
787 let row_ids = seg.row_ids();
788 for row in bitmap.iter() {
789 if let Some(ext) = row_ids.get(row as usize) {
790 deleted.push(ext.clone());
791 }
792 }
793 }
794 }
795 Ok(RecoveryTail { upserts, deleted })
796 }
797
798 pub fn len(&self, collection: CollectionId) -> Result<usize> {
800 Ok(self
801 .collections
802 .get(&collection)
803 .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?
804 .primary
805 .len())
806 }
807
808 pub fn is_empty(&self, collection: CollectionId) -> Result<bool> {
810 Ok(self.len(collection)? == 0)
811 }
812
813 #[must_use]
815 pub fn collection_names(&self) -> Vec<String> {
816 let mut names: Vec<String> = self.name_index.keys().cloned().collect();
817 names.sort();
818 names
819 }
820
821 pub fn matching_ids(
831 &self,
832 collection: CollectionId,
833 predicate: &SecPredicate,
834 ) -> Result<Vec<String>> {
835 let state = self
836 .collections
837 .get(&collection)
838 .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
839 let field_type = state
840 .descriptor
841 .filterable
842 .iter()
843 .find(|f| f.path == predicate.field())
844 .map(|f| f.field_type)
845 .ok_or_else(|| {
846 CoreError::InvalidArgument(format!("field {} is not filterable", predicate.field()))
847 })?;
848
849 let mut out: Vec<String> = Vec::new();
850 for (seg_idx, segment) in state.sealed.iter().enumerate() {
853 let seg_idx = seg_idx as u32;
854 let Some(rows) = segment.sec_query(predicate)? else {
855 continue;
856 };
857 for row in rows {
858 if segment.is_dead(row) {
859 continue;
860 }
861 let Some(ext_id) = segment.row_ids().get(row as usize) else {
862 continue;
863 };
864 if matches!(
865 state.primary.get(ext_id),
866 Some(Loc::Sealed { seg: s, row: r }) if *s == seg_idx && *r == row
867 ) {
868 out.push(ext_id.clone());
869 }
870 }
871 }
872 for (ext_id, &row) in &state.active_index {
874 if let Some(active) = state.active.get(row as usize)
875 && sec::payload_matches(predicate, field_type, &active.payload)
876 {
877 out.push(ext_id.clone());
878 }
879 }
880 out.sort();
881 out.dedup();
882 Ok(out)
883 }
884
885 pub fn checkpoint(&mut self) -> Result<()> {
893 self.checkpoint_with_index_snapshots(&HashMap::new())
894 }
895
896 pub fn checkpoint_with_index_snapshots(
904 &mut self,
905 index_snapshots: &HashMap<CollectionId, Vec<u8>>,
906 ) -> Result<()> {
907 let last_lsn = Lsn(self.next_lsn.value().saturating_sub(1));
908 if last_lsn.value() <= self.last_checkpointed_lsn.value() {
909 return Ok(()); }
911 let mut cids: Vec<CollectionId> = self.collections.keys().copied().collect();
912 cids.sort();
913 let segment_lsn_low = self.last_checkpointed_lsn.next();
914 let new_version = self.manifest_version + 1;
915
916 let mut pending: HashMap<CollectionId, PendingSegment> = HashMap::new();
920 for &cid in &cids {
921 if !self.collections[&cid].has_pending() {
922 continue;
923 }
924 let seg_dir = segments_dir(&self.dir, cid);
925 fs::create_dir_all(&seg_dir).map_err(|e| CoreError::io(&seg_dir, e))?;
926 let codec = self.collections[&cid].codec.clone_box();
929
930 {
933 let state = &self.collections[&cid];
934 for (&seg_idx, newly_dead) in &state.dead_this_window {
935 if let Some(seg) = state.sealed.get(seg_idx as usize) {
936 let mut merged = seg.dead_bitmap();
937 merged |= newly_dead;
938 segment::write_del(&seg_dir, seg.seg_id, codec.as_ref(), &merged)?;
939 }
940 }
941 }
942
943 let new_seg = if self.collections[&cid].active_index.is_empty() {
947 None
948 } else {
949 let seg_id = self.next_segment_id;
950 self.next_segment_id += 1;
951 let row_count = {
952 let state = &self.collections[&cid];
953 let seal_rows: Vec<SealRow<'_>> = state
954 .active_index
955 .iter()
956 .map(|(id, &row)| SealRow {
957 external_id: id,
958 vector: &state.active[row as usize].vector,
959 payload: &state.active[row as usize].payload,
960 })
961 .collect();
962 segment::write_segment(
963 &seg_dir,
964 seg_id,
965 codec.as_ref(),
966 &seal_rows,
967 &state.descriptor.filterable,
968 )?;
969 seal_rows.len() as u64
970 };
971 Some((seg_id, row_count))
972 };
973
974 fsync_dir(&seg_dir)?;
977 fsync_dir(&collection_dir(&self.dir, cid))?;
978 fsync_dir(&self.dir.join("collections"))?;
979 fsync_dir(&self.dir)?;
980
981 if let Some((seg_id, row_count)) = new_seg {
982 let sealed = segment::open_segment(&seg_dir, seg_id, codec.as_ref())?;
983 pending.insert(
984 cid,
985 PendingSegment {
986 seg_ref: SegmentRef {
987 id: seg_id,
988 row_count,
989 lsn_low: segment_lsn_low,
990 lsn_high: last_lsn,
991 },
992 sealed,
993 },
994 );
995 }
996 }
997
998 let mut new_index_refs: HashMap<CollectionId, IndexSnapshotRef> = HashMap::new();
1003 for &cid in &cids {
1004 let Some(blob) = index_snapshots.get(&cid) else {
1005 continue;
1006 };
1007 let index_dir = self.index_dir(cid);
1008 fs::create_dir_all(&index_dir).map_err(|e| CoreError::io(&index_dir, e))?;
1009 let codec = self.collections[&cid].codec.clone_box();
1010 let path = index_dir.join(index_snapshot_file_name(new_version));
1011 write_paged(
1012 &path,
1013 codec.as_ref(),
1014 PageType::IndexBlock,
1015 new_version,
1016 blob,
1017 )?;
1018 fsync_dir(&index_dir)?;
1019 fsync_dir(&collection_dir(&self.dir, cid))?;
1020 new_index_refs.insert(
1021 cid,
1022 IndexSnapshotRef {
1023 id: new_version,
1024 lsn: last_lsn,
1025 },
1026 );
1027 }
1028
1029 let mut entries = Vec::with_capacity(cids.len());
1031 for &cid in &cids {
1032 let state = &self.collections[&cid];
1033 let mut segs = state.segments_meta.clone();
1034 if let Some(p) = pending.get(&cid) {
1035 segs.push(p.seg_ref.clone());
1036 }
1037 entries.push(CollectionEntry {
1038 id: state.id,
1039 name: state.name.clone(),
1040 descriptor: postcard::to_allocvec(&state.descriptor)?,
1041 segments: segs,
1042 index_snapshot: new_index_refs.get(&cid).cloned(),
1043 });
1044 }
1045 let new_manifest = Manifest {
1046 format_version: MANIFEST_FORMAT_VERSION,
1047 version: new_version,
1048 last_checkpointed_lsn: last_lsn,
1049 next_collection_id: self.next_collection_id,
1050 next_segment_id: self.next_segment_id,
1051 collections: entries,
1052 };
1053 manifest::write_manifest(&self.dir, &new_manifest, self.keyring.catalog_codec())?;
1054
1055 self.manifest_version = new_version;
1057 self.last_checkpointed_lsn = last_lsn;
1058 for &cid in &cids {
1059 let Some(state) = self.collections.get_mut(&cid) else {
1060 continue;
1061 };
1062 let dead_window = std::mem::take(&mut state.dead_this_window);
1065 for (seg_idx, bitmap) in dead_window {
1066 if let Some(seg) = state.sealed.get_mut(seg_idx as usize) {
1067 seg.mark_dead(&bitmap);
1068 }
1069 }
1070 if let Some(p) = pending.remove(&cid) {
1072 let seg_idx = state.sealed.len() as u32;
1073 for (row, ext_id) in p.sealed.row_ids().iter().enumerate() {
1074 state.primary.insert(
1075 ext_id.clone(),
1076 Loc::Sealed {
1077 seg: seg_idx,
1078 row: row as u32,
1079 },
1080 );
1081 }
1082 state.sealed.push(p.sealed);
1083 state.segments_meta.push(p.seg_ref);
1084 }
1085 state.active.clear();
1086 state.active_index.clear();
1087 state.index_snapshot = new_index_refs.get(&cid).cloned();
1088 }
1089 self.rotate_wal()?;
1090 gc_orphan_segments(&self.dir, &new_manifest, self.keyring.as_ref())?;
1091 gc_orphan_index_snapshots(&self.dir, &new_manifest)?;
1092 self.auto_compact()?;
1093 Ok(())
1094 }
1095
1096 pub fn compact(&mut self) -> Result<()> {
1102 for cid in self.sorted_cids() {
1103 if self.reclaimable(cid) {
1104 self.compact_collection(cid)?;
1105 }
1106 }
1107 Ok(())
1108 }
1109
1110 fn auto_compact(&mut self) -> Result<()> {
1113 for cid in self.sorted_cids() {
1114 if self.needs_compaction(cid) {
1115 self.compact_collection(cid)?;
1116 }
1117 }
1118 Ok(())
1119 }
1120
1121 fn sorted_cids(&self) -> Vec<CollectionId> {
1122 let mut cids: Vec<CollectionId> = self.collections.keys().copied().collect();
1123 cids.sort();
1124 cids
1125 }
1126
1127 fn reclaimable(&self, cid: CollectionId) -> bool {
1130 self.collections.get(&cid).is_some_and(|s| {
1131 s.sealed.len() > 1
1132 || s.sealed
1133 .iter()
1134 .any(|seg| seg.live_count() < u64::from(seg.row_count()))
1135 })
1136 }
1137
1138 fn needs_compaction(&self, cid: CollectionId) -> bool {
1141 let Some(s) = self.collections.get(&cid) else {
1142 return false;
1143 };
1144 if s.sealed.is_empty() {
1145 return false;
1146 }
1147 let total: u64 = s.sealed.iter().map(|seg| u64::from(seg.row_count())).sum();
1148 let live: u64 = s.sealed.iter().map(SealedSegment::live_count).sum();
1149 s.sealed.len() >= COMPACT_MIN_SEGMENTS || (total > 0 && (total - live) * 2 >= total)
1150 }
1151
1152 fn compact_collection(&mut self, cid: CollectionId) -> Result<()> {
1155 let codec = self
1158 .collections
1159 .get(&cid)
1160 .ok_or_else(|| CoreError::NotFound(format!("collection {cid}")))?
1161 .codec
1162 .clone_box();
1163 let live: Vec<(String, Vec<u8>, Vec<u8>)> = {
1166 let state = self
1167 .collections
1168 .get(&cid)
1169 .ok_or_else(|| CoreError::NotFound(format!("collection {cid}")))?;
1170 let mut out = Vec::with_capacity(state.primary.len());
1171 for (ext_id, &loc) in &state.primary {
1172 if let Loc::Sealed { seg, row } = loc {
1173 let segment = state.sealed.get(seg as usize).ok_or_else(|| {
1174 CoreError::MalformedPage(format!("dangling segment index {seg}"))
1175 })?;
1176 let vector = segment.read_vector(codec.as_ref(), row, state.stride)?;
1177 let payload = segment.read_payload(codec.as_ref(), row)?;
1178 out.push((ext_id.clone(), vector, payload));
1179 }
1180 }
1181 out
1182 };
1183
1184 let (lsn_low, lsn_high) = {
1186 let state = &self.collections[&cid];
1187 let low = state
1188 .segments_meta
1189 .iter()
1190 .map(|s| s.lsn_low.value())
1191 .min()
1192 .map(Lsn)
1193 .unwrap_or(Lsn::ZERO);
1194 let high = state
1195 .segments_meta
1196 .iter()
1197 .map(|s| s.lsn_high.value())
1198 .max()
1199 .map(Lsn)
1200 .unwrap_or(self.last_checkpointed_lsn);
1201 (low, high)
1202 };
1203
1204 let seg_id = self.next_segment_id;
1205 self.next_segment_id += 1;
1206 let seg_dir = segments_dir(&self.dir, cid);
1207 fs::create_dir_all(&seg_dir).map_err(|e| CoreError::io(&seg_dir, e))?;
1208 let seal_rows: Vec<SealRow<'_>> = live
1209 .iter()
1210 .map(|(id, v, p)| SealRow {
1211 external_id: id,
1212 vector: v,
1213 payload: p,
1214 })
1215 .collect();
1216 segment::write_segment(
1217 &seg_dir,
1218 seg_id,
1219 codec.as_ref(),
1220 &seal_rows,
1221 &self.collections[&cid].descriptor.filterable,
1222 )?;
1223 fsync_dir(&seg_dir)?;
1224 fsync_dir(&collection_dir(&self.dir, cid))?;
1225 fsync_dir(&self.dir.join("collections"))?;
1226 fsync_dir(&self.dir)?;
1227 let new_ref = SegmentRef {
1228 id: seg_id,
1229 row_count: seal_rows.len() as u64,
1230 lsn_low,
1231 lsn_high,
1232 };
1233 let sealed = segment::open_segment(&seg_dir, seg_id, codec.as_ref())?;
1234
1235 let new_version = self.manifest_version + 1;
1238 let mut entries = Vec::with_capacity(self.collections.len());
1239 for &other in &self.sorted_cids() {
1240 let state = &self.collections[&other];
1241 let segs = if other == cid {
1242 vec![new_ref.clone()]
1243 } else {
1244 state.segments_meta.clone()
1245 };
1246 entries.push(CollectionEntry {
1247 id: state.id,
1248 name: state.name.clone(),
1249 descriptor: postcard::to_allocvec(&state.descriptor)?,
1250 segments: segs,
1251 index_snapshot: state.index_snapshot.clone(),
1252 });
1253 }
1254 let new_manifest = Manifest {
1255 format_version: MANIFEST_FORMAT_VERSION,
1256 version: new_version,
1257 last_checkpointed_lsn: self.last_checkpointed_lsn,
1258 next_collection_id: self.next_collection_id,
1259 next_segment_id: self.next_segment_id,
1260 collections: entries,
1261 };
1262 manifest::write_manifest(&self.dir, &new_manifest, self.keyring.catalog_codec())?;
1263
1264 self.manifest_version = new_version;
1268 let row_ids: Vec<String> = sealed.row_ids().to_vec();
1269 if let Some(state) = self.collections.get_mut(&cid) {
1270 state.sealed = vec![sealed];
1271 state.segments_meta = vec![new_ref];
1272 state.dead_this_window.clear();
1273 for (row, ext_id) in row_ids.into_iter().enumerate() {
1274 state.primary.insert(
1275 ext_id,
1276 Loc::Sealed {
1277 seg: 0,
1278 row: row as u32,
1279 },
1280 );
1281 }
1282 }
1283 gc_orphan_segments(&self.dir, &new_manifest, self.keyring.as_ref())?;
1284 gc_orphan_index_snapshots(&self.dir, &new_manifest)?;
1285 Ok(())
1286 }
1287
1288 fn rotate_wal(&mut self) -> Result<()> {
1291 let wal_dir = self.dir.join("wal");
1292 let old_seq = self.wal_seq;
1293 let new_seq = old_seq + 1;
1294 let new_wal = WalWriter::create(&wal_file_path(&wal_dir, new_seq), self.next_lsn)?;
1295 fsync_dir(&wal_dir)?;
1296 self.wal = new_wal;
1297 self.wal_seq = new_seq;
1298 for (seq, path) in list_wal_files(&wal_dir)? {
1299 if seq <= old_seq {
1300 remove_file_if_present(&path)?;
1301 }
1302 }
1303 fsync_dir(&wal_dir)?;
1304 Ok(())
1305 }
1306}
1307
1308fn apply_wal_entry(
1312 collections: &mut HashMap<CollectionId, CollectionState>,
1313 name_index: &mut HashMap<String, CollectionId>,
1314 entry: &WalEntry,
1315 keyring: &dyn KeyRing,
1316) -> Result<()> {
1317 match &entry.op {
1318 WalOp::CreateCollection {
1319 collection_id,
1320 name,
1321 descriptor,
1322 } => {
1323 let descriptor = Descriptor::decode(descriptor)?;
1324 let codec = keyring.collection_codec(*collection_id)?;
1327 name_index.insert(name.clone(), *collection_id);
1328 collections.insert(
1329 *collection_id,
1330 CollectionState::new(*collection_id, name.clone(), descriptor, codec),
1331 );
1332 }
1333 WalOp::DropCollection { collection_id } => {
1334 if let Some(state) = collections.remove(collection_id) {
1335 name_index.remove(&state.name);
1336 }
1337 }
1338 WalOp::Upsert {
1339 collection_id,
1340 external_id,
1341 vector,
1342 payload,
1343 } => {
1344 if let Some(state) = collections.get_mut(collection_id) {
1345 state.apply_upsert(external_id, vector.clone(), payload.clone());
1346 }
1347 }
1348 WalOp::Delete {
1349 collection_id,
1350 external_id,
1351 } => {
1352 if let Some(state) = collections.get_mut(collection_id) {
1353 state.apply_delete(external_id);
1354 }
1355 }
1356 WalOp::Checkpoint { .. } => {}
1359 }
1360 Ok(())
1361}
1362
1363fn gc_orphan_segments(dir: &Path, mfst: &Manifest, keyring: &dyn KeyRing) -> Result<()> {
1366 let collections_dir = dir.join("collections");
1367 if !collections_dir.exists() {
1368 return Ok(());
1369 }
1370 let mut referenced: HashSet<(u64, u64)> = HashSet::new();
1371 let mut live_collections: HashSet<u64> = HashSet::new();
1372 for c in &mfst.collections {
1373 live_collections.insert(c.id.value());
1374 for s in &c.segments {
1375 referenced.insert((c.id.value(), s.id));
1376 }
1377 }
1378 for entry in fs::read_dir(&collections_dir).map_err(|e| CoreError::io(&collections_dir, e))? {
1379 let entry = entry.map_err(|e| CoreError::io(&collections_dir, e))?;
1380 let cdir = entry.path();
1381 let Some(cid) = entry
1382 .file_name()
1383 .to_str()
1384 .and_then(|n| n.parse::<u64>().ok())
1385 else {
1386 continue;
1387 };
1388 if !live_collections.contains(&cid) {
1389 keyring.shred_collection(CollectionId(cid))?;
1393 if cdir.is_dir() {
1394 fs::remove_dir_all(&cdir).map_err(|e| CoreError::io(&cdir, e))?;
1395 }
1396 continue;
1397 }
1398 let seg_dir = cdir.join("segments");
1399 if !seg_dir.is_dir() {
1400 continue;
1401 }
1402 for seg in fs::read_dir(&seg_dir).map_err(|e| CoreError::io(&seg_dir, e))? {
1403 let seg = seg.map_err(|e| CoreError::io(&seg_dir, e))?;
1404 let path = seg.path();
1405 let Some(name) = seg.file_name().to_str().map(str::to_owned) else {
1406 continue;
1407 };
1408 if segment::is_temp_file(&name) {
1410 remove_file_if_present(&path)?;
1411 continue;
1412 }
1413 let Some(seg_id) = segment::seg_id_of_file(&name) else {
1414 continue;
1415 };
1416 if !referenced.contains(&(cid, seg_id)) {
1417 remove_file_if_present(&path)?;
1418 }
1419 }
1420 }
1421 Ok(())
1422}
1423
1424fn gc_orphan_index_snapshots(dir: &Path, mfst: &Manifest) -> Result<()> {
1430 for c in &mfst.collections {
1431 let index_dir = collection_dir(dir, c.id).join("index");
1432 if !index_dir.is_dir() {
1433 continue;
1434 }
1435 let keep = c.index_snapshot.as_ref().map(|r| r.id);
1436 for entry in fs::read_dir(&index_dir).map_err(|e| CoreError::io(&index_dir, e))? {
1437 let entry = entry.map_err(|e| CoreError::io(&index_dir, e))?;
1438 let Some(name) = entry.file_name().to_str().map(str::to_owned) else {
1439 continue;
1440 };
1441 let Some(id) = index_snapshot_id_of_file(&name) else {
1442 continue; };
1444 if Some(id) != keep {
1445 remove_file_if_present(&entry.path())?;
1446 }
1447 }
1448 }
1449 Ok(())
1450}
1451
1452fn remove_file_if_present(path: &Path) -> Result<()> {
1453 match fs::remove_file(path) {
1454 Ok(()) => Ok(()),
1455 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
1456 Err(e) => Err(CoreError::io(path, e)),
1457 }
1458}
1459
1460fn collection_dir(dir: &Path, cid: CollectionId) -> PathBuf {
1461 dir.join("collections").join(format!("{:010}", cid.value()))
1462}
1463
1464fn segments_dir(dir: &Path, cid: CollectionId) -> PathBuf {
1465 collection_dir(dir, cid).join("segments")
1466}
1467
1468fn index_snapshot_file_name(id: u64) -> String {
1471 format!("idx-{id:010}")
1472}
1473
1474fn index_snapshot_id_of_file(name: &str) -> Option<u64> {
1477 name.strip_prefix("idx-")
1478 .and_then(|s| s.parse::<u64>().ok())
1479}
1480
1481fn wal_file_path(wal_dir: &Path, seq: u64) -> PathBuf {
1482 wal_dir.join(format!("wal-{seq:010}.log"))
1483}
1484
1485fn list_wal_files(wal_dir: &Path) -> Result<Vec<(u64, PathBuf)>> {
1486 let mut out = Vec::new();
1487 for entry in fs::read_dir(wal_dir).map_err(|e| CoreError::io(wal_dir, e))? {
1488 let entry = entry.map_err(|e| CoreError::io(wal_dir, e))?;
1489 if let Some(seq) = entry.file_name().to_str().and_then(parse_wal_file_name) {
1490 out.push((seq, entry.path()));
1491 }
1492 }
1493 out.sort_by_key(|(seq, _)| *seq);
1494 Ok(out)
1495}
1496
1497fn parse_wal_file_name(name: &str) -> Option<u64> {
1498 name.strip_prefix("wal-")
1499 .and_then(|s| s.strip_suffix(".log"))
1500 .and_then(|s| s.parse::<u64>().ok())
1501}
1502
1503fn f32_to_le_bytes(v: &[f32]) -> Vec<u8> {
1504 let mut out = Vec::with_capacity(v.len() * 4);
1505 for &x in v {
1506 out.extend_from_slice(&x.to_le_bytes());
1507 }
1508 out
1509}
1510
1511fn le_bytes_to_f32(bytes: &[u8]) -> Vec<f32> {
1512 bytes
1513 .chunks_exact(4)
1514 .map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
1515 .collect()
1516}
1517
1518#[cfg(test)]
1519mod tests {
1520 use super::*;
1521 use crate::descriptor::{DistanceMetric, Dtype};
1522
1523 fn desc() -> Descriptor {
1524 Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
1525 }
1526
1527 fn open(dir: &Path) -> Store {
1528 Store::open(dir).unwrap()
1529 }
1530
1531 fn seg_dir_file(dir: &Path, cid: CollectionId, seg_id: u64) -> PathBuf {
1533 segments_dir(dir, cid).join(format!("seg-{seg_id:010}.dir"))
1534 }
1535
1536 #[test]
1537 fn upsert_get_delete_in_memory() {
1538 let tmp = tempfile::tempdir().unwrap();
1539 let mut s = open(tmp.path());
1540 let c = s.create_collection("c", desc()).unwrap();
1541 s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
1542 let got = s.get(c, "a").unwrap().unwrap();
1543 assert_eq!(got.vector, vec![1.0, 2.0, 3.0, 4.0]);
1544 assert_eq!(got.payload, b"{}");
1545 assert!(s.delete(c, "a").unwrap());
1546 assert!(s.get(c, "a").unwrap().is_none());
1547 assert!(!s.delete(c, "a").unwrap());
1548 }
1549
1550 #[test]
1551 fn dim_mismatch_is_rejected() {
1552 let tmp = tempfile::tempdir().unwrap();
1553 let mut s = open(tmp.path());
1554 let c = s.create_collection("c", desc()).unwrap();
1555 assert!(matches!(
1556 s.upsert(c, "a", &[1.0, 2.0], b"{}"),
1557 Err(CoreError::InvalidArgument(_))
1558 ));
1559 }
1560
1561 #[test]
1562 fn upsert_batch_commits_all_on_sync() {
1563 let tmp = tempfile::tempdir().unwrap();
1564 {
1565 let mut s = open(tmp.path());
1566 let c = s.create_collection("c", desc()).unwrap();
1567 let vecs: Vec<([f32; 4], String)> = (0..8u32)
1568 .map(|i| ([i as f32; 4], format!("k{i}")))
1569 .collect();
1570 let payload = b"{}";
1571 let records: Vec<(&str, &[f32], &[u8])> = vecs
1572 .iter()
1573 .map(|(v, id)| (id.as_str(), v.as_slice(), payload.as_slice()))
1574 .collect();
1575 let n = s.upsert_batch(c, &records).unwrap();
1576 assert_eq!(n, 8);
1577 for (_, id) in &vecs {
1579 assert!(s.get(c, id).unwrap().is_some(), "missing {id}");
1580 }
1581 }
1582 let s = open(tmp.path());
1584 let c = s.collection_id("c").unwrap();
1585 assert_eq!(s.len(c).unwrap(), 8);
1586 for i in 0..8u32 {
1587 let got = s.get(c, &format!("k{i}")).unwrap().unwrap();
1588 assert_eq!(got.vector, vec![i as f32; 4]);
1589 }
1590 }
1591
1592 #[test]
1593 fn upsert_batch_dim_mismatch_writes_nothing() {
1594 let tmp = tempfile::tempdir().unwrap();
1595 let mut s = open(tmp.path());
1596 let c = s.create_collection("c", desc()).unwrap();
1597 let bad: &[(&str, &[f32], &[u8])] = &[
1599 ("a", &[1.0, 2.0, 3.0, 4.0], b"{}"),
1600 ("b", &[1.0, 2.0], b"{}"), ];
1602 assert!(matches!(
1603 s.upsert_batch(c, bad),
1604 Err(CoreError::InvalidArgument(_))
1605 ));
1606 assert!(s.get(c, "a").unwrap().is_none());
1608 }
1609
1610 #[test]
1611 fn duplicate_collection_is_rejected() {
1612 let tmp = tempfile::tempdir().unwrap();
1613 let mut s = open(tmp.path());
1614 s.create_collection("c", desc()).unwrap();
1615 assert!(matches!(
1616 s.create_collection("c", desc()),
1617 Err(CoreError::AlreadyExists(_))
1618 ));
1619 }
1620
1621 #[test]
1622 fn recovers_without_checkpoint_via_wal_replay() {
1623 let tmp = tempfile::tempdir().unwrap();
1624 {
1625 let mut s = open(tmp.path());
1626 let c = s.create_collection("c", desc()).unwrap();
1627 for i in 0..10u32 {
1628 let v = [i as f32; 4];
1629 s.upsert(c, &format!("k{i}"), &v, b"{}").unwrap();
1630 }
1631 }
1632 let s = open(tmp.path());
1633 let c = s.collection_id("c").unwrap();
1634 assert_eq!(s.len(c).unwrap(), 10);
1635 let got = s.get(c, "k7").unwrap().unwrap();
1636 assert_eq!(got.vector, vec![7.0; 4]);
1637 }
1638
1639 #[test]
1640 fn recovers_across_checkpoint_and_wal_tail() {
1641 let tmp = tempfile::tempdir().unwrap();
1642 {
1643 let mut s = open(tmp.path());
1644 let c = s.create_collection("c", desc()).unwrap();
1645 for i in 0..5u32 {
1646 s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"{}")
1647 .unwrap();
1648 }
1649 s.checkpoint().unwrap();
1650 for i in 5..8u32 {
1652 s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"{}")
1653 .unwrap();
1654 }
1655 s.delete(c, "k0").unwrap();
1656 }
1657 let s = open(tmp.path());
1658 let c = s.collection_id("c").unwrap();
1659 assert_eq!(s.len(c).unwrap(), 7); assert!(s.get(c, "k0").unwrap().is_none());
1661 assert_eq!(s.get(c, "k6").unwrap().unwrap().vector, vec![6.0; 4]);
1662 }
1663
1664 #[test]
1665 fn open_with_keyring_round_trips_through_checkpoint() {
1666 let tmp = tempfile::tempdir().unwrap();
1667 {
1668 let mut s =
1669 Store::open_with_keyring(tmp.path(), Box::new(SingleCodecKeyRing::plaintext()))
1670 .unwrap();
1671 let c = s.create_collection("c", desc()).unwrap();
1672 s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
1673 s.checkpoint().unwrap();
1674 s.upsert(c, "b", &[5.0; 4], b"{}").unwrap();
1675 }
1676 let s = Store::open_with_keyring(tmp.path(), Box::new(SingleCodecKeyRing::plaintext()))
1679 .unwrap();
1680 let c = s.collection_id("c").unwrap();
1681 assert_eq!(s.len(c).unwrap(), 2);
1682 assert_eq!(
1683 s.get(c, "a").unwrap().unwrap().vector,
1684 vec![1.0, 2.0, 3.0, 4.0]
1685 );
1686 assert_eq!(s.get(c, "b").unwrap().unwrap().vector, vec![5.0; 4]);
1687 }
1688
1689 #[test]
1690 fn delete_survives_checkpoint() {
1691 let tmp = tempfile::tempdir().unwrap();
1692 {
1693 let mut s = open(tmp.path());
1694 let c = s.create_collection("c", desc()).unwrap();
1695 s.upsert(c, "a", &[1.0; 4], b"{}").unwrap();
1696 s.upsert(c, "b", &[2.0; 4], b"{}").unwrap();
1697 s.checkpoint().unwrap();
1698 s.delete(c, "a").unwrap();
1699 s.checkpoint().unwrap(); }
1701 let s = open(tmp.path());
1702 let c = s.collection_id("c").unwrap();
1703 assert!(s.get(c, "a").unwrap().is_none());
1704 assert!(s.get(c, "b").unwrap().is_some());
1705 assert_eq!(s.len(c).unwrap(), 1);
1706 }
1707
1708 #[test]
1709 fn reopen_is_idempotent() {
1710 let tmp = tempfile::tempdir().unwrap();
1711 {
1712 let mut s = open(tmp.path());
1713 let c = s.create_collection("c", desc()).unwrap();
1714 s.upsert(c, "a", &[1.0; 4], b"{}").unwrap();
1715 s.checkpoint().unwrap();
1716 s.upsert(c, "b", &[2.0; 4], b"{}").unwrap();
1717 }
1718 let snapshot = |dir: &Path| {
1719 let s = open(dir);
1720 let c = s.collection_id("c").unwrap();
1721 s.scan(c).unwrap()
1722 };
1723 assert_eq!(snapshot(tmp.path()), snapshot(tmp.path()));
1724 }
1725
1726 #[test]
1727 fn update_then_checkpoint_keeps_latest_value() {
1728 let tmp = tempfile::tempdir().unwrap();
1729 {
1730 let mut s = open(tmp.path());
1731 let c = s.create_collection("c", desc()).unwrap();
1732 s.upsert(c, "a", &[1.0; 4], b"v1").unwrap();
1733 s.checkpoint().unwrap();
1734 s.upsert(c, "a", &[9.0; 4], b"v2").unwrap(); s.checkpoint().unwrap();
1736 }
1737 let s = open(tmp.path());
1738 let c = s.collection_id("c").unwrap();
1739 let got = s.get(c, "a").unwrap().unwrap();
1740 assert_eq!(got.vector, vec![9.0; 4]);
1741 assert_eq!(got.payload, b"v2");
1742 assert_eq!(s.len(c).unwrap(), 1);
1743 }
1744
1745 #[test]
1746 fn update_within_one_window_seals_latest() {
1747 let tmp = tempfile::tempdir().unwrap();
1750 {
1751 let mut s = open(tmp.path());
1752 let c = s.create_collection("c", desc()).unwrap();
1753 s.upsert(c, "a", &[1.0; 4], b"v1").unwrap();
1754 s.upsert(c, "a", &[2.0; 4], b"v2").unwrap();
1755 s.upsert(c, "a", &[3.0; 4], b"v3").unwrap();
1756 s.checkpoint().unwrap();
1757 }
1758 let s = open(tmp.path());
1759 let c = s.collection_id("c").unwrap();
1760 assert_eq!(s.len(c).unwrap(), 1);
1761 let got = s.get(c, "a").unwrap().unwrap();
1762 assert_eq!(got.vector, vec![3.0; 4]);
1763 assert_eq!(got.payload, b"v3");
1764 }
1765
1766 #[test]
1767 fn dropped_collection_is_gone_after_reopen() {
1768 let tmp = tempfile::tempdir().unwrap();
1769 {
1770 let mut s = open(tmp.path());
1771 let c = s.create_collection("c", desc()).unwrap();
1772 s.upsert(c, "a", &[1.0; 4], b"{}").unwrap();
1773 s.checkpoint().unwrap();
1774 assert!(s.drop_collection("c").unwrap());
1775 s.checkpoint().unwrap();
1776 }
1777 let s = open(tmp.path());
1778 assert!(s.collection_id("c").is_none());
1779 assert!(s.collection_names().is_empty());
1780 }
1781
1782 #[test]
1783 fn orphan_segment_is_garbage_collected() {
1784 let tmp = tempfile::tempdir().unwrap();
1785 let cid;
1786 {
1787 let mut s = open(tmp.path());
1788 let c = s.create_collection("c", desc()).unwrap();
1789 cid = c;
1790 s.upsert(c, "a", &[1.0; 4], b"{}").unwrap();
1791 s.checkpoint().unwrap();
1792 }
1793 let stray = segments_dir(tmp.path(), cid).join("seg-0000009999.vec");
1795 fs::write(&stray, b"junk").unwrap();
1796 assert!(stray.exists());
1797 let _s = open(tmp.path());
1798 assert!(!stray.exists(), "orphan segment should be GC'd on open");
1799 }
1800
1801 #[test]
1802 fn corrupt_segment_is_detected_not_served() {
1803 let tmp = tempfile::tempdir().unwrap();
1804 let cid;
1805 {
1806 let mut s = open(tmp.path());
1807 let c = s.create_collection("c", desc()).unwrap();
1808 cid = c;
1809 s.upsert(c, "a", &[1.0; 4], b"{}").unwrap();
1810 s.checkpoint().unwrap();
1811 }
1812 let path = seg_dir_file(tmp.path(), cid, 0);
1817 let mut bytes = fs::read(&path).unwrap();
1818 bytes[33] ^= 0xFF;
1819 fs::write(&path, &bytes).unwrap();
1820 assert!(matches!(
1821 Store::open(tmp.path()),
1822 Err(CoreError::PageCorrupt { .. })
1823 ));
1824 }
1825
1826 #[test]
1827 fn torn_wal_tail_drops_only_unacked_record() {
1828 let tmp = tempfile::tempdir().unwrap();
1829 let wal_path;
1830 {
1831 let mut s = open(tmp.path());
1832 let c = s.create_collection("c", desc()).unwrap();
1833 for i in 0..3u32 {
1834 s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"{}")
1835 .unwrap();
1836 }
1837 wal_path = wal_file_path(&tmp.path().join("wal"), s.wal_seq);
1838 }
1839 {
1841 use std::io::Write as _;
1842 let mut f = fs::OpenOptions::new().append(true).open(&wal_path).unwrap();
1843 f.write_all(&[0xAA, 0xBB, 0xCC]).unwrap();
1844 f.sync_data().unwrap();
1845 }
1846 let s = open(tmp.path());
1847 let c = s.collection_id("c").unwrap();
1848 assert_eq!(s.len(c).unwrap(), 3); }
1850
1851 #[test]
1852 fn reads_served_from_disk_after_checkpoint() {
1853 let tmp = tempfile::tempdir().unwrap();
1856 let mut s = open(tmp.path());
1857 let c = s.create_collection("c", desc()).unwrap();
1858 s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], br#"{"k":1}"#)
1859 .unwrap();
1860 s.checkpoint().unwrap();
1861 let got = s.get(c, "a").unwrap().unwrap();
1862 assert_eq!(got.vector, vec![1.0, 2.0, 3.0, 4.0]);
1863 assert_eq!(got.payload, br#"{"k":1}"#);
1864 }
1865
1866 #[test]
1867 fn high_dim_vectors_straddle_pages() {
1868 let tmp = tempfile::tempdir().unwrap();
1871 let mut s = open(tmp.path());
1872 let dim = 1000usize; let c = s
1874 .create_collection(
1875 "c",
1876 Descriptor::new(dim as u32, Dtype::F32, DistanceMetric::L2),
1877 )
1878 .unwrap();
1879 for i in 0..20u32 {
1880 let v: Vec<f32> = (0..dim).map(|j| (i as f32) * 1000.0 + j as f32).collect();
1881 s.upsert(c, &format!("k{i}"), &v, b"{}").unwrap();
1882 }
1883 s.checkpoint().unwrap();
1884 let s = open(tmp.path());
1885 let c = s.collection_id("c").unwrap();
1886 for i in 0..20u32 {
1887 let got = s.get(c, &format!("k{i}")).unwrap().unwrap();
1888 let want: Vec<f32> = (0..dim).map(|j| (i as f32) * 1000.0 + j as f32).collect();
1889 assert_eq!(
1890 got.vector, want,
1891 "vector k{i} mismatch after straddling read"
1892 );
1893 }
1894 }
1895
1896 #[test]
1897 fn delete_persists_via_del_bitmap_across_reopen() {
1898 let tmp = tempfile::tempdir().unwrap();
1902 let cid;
1903 {
1904 let mut s = open(tmp.path());
1905 let c = s.create_collection("c", desc()).unwrap();
1906 cid = c;
1907 for i in 0..5u32 {
1908 s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"{}")
1909 .unwrap();
1910 }
1911 s.checkpoint().unwrap();
1912 s.delete(c, "k2").unwrap();
1913 s.checkpoint().unwrap();
1914 assert_eq!(
1915 s.collections[&c].sealed.len(),
1916 1,
1917 "no new segment for a delete-only window"
1918 );
1919 }
1920 assert!(
1922 segments_dir(tmp.path(), cid)
1923 .join("seg-0000000000.del")
1924 .exists(),
1925 ".del must be persisted for the deleted row"
1926 );
1927 let s = open(tmp.path());
1928 let c = s.collection_id("c").unwrap();
1929 assert!(s.get(c, "k2").unwrap().is_none());
1930 assert_eq!(s.len(c).unwrap(), 4);
1931 for i in [0u32, 1, 3, 4] {
1932 assert!(s.get(c, &format!("k{i}")).unwrap().is_some());
1933 }
1934 }
1935
1936 #[test]
1937 fn shadowed_row_is_tombstoned_and_latest_wins() {
1938 let tmp = tempfile::tempdir().unwrap();
1939 {
1940 let mut s = open(tmp.path());
1941 let c = s.create_collection("c", desc()).unwrap();
1942 for i in 0..5u32 {
1943 s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"v1")
1944 .unwrap();
1945 }
1946 s.checkpoint().unwrap(); s.upsert(c, "k2", &[99.0; 4], b"v2").unwrap();
1948 s.checkpoint().unwrap(); }
1950 let s = open(tmp.path());
1951 let c = s.collection_id("c").unwrap();
1952 assert_eq!(s.len(c).unwrap(), 5); let got = s.get(c, "k2").unwrap().unwrap();
1954 assert_eq!(got.vector, vec![99.0; 4]);
1955 assert_eq!(got.payload, b"v2");
1956 }
1957
1958 #[test]
1959 fn compaction_merges_segments_reclaims_and_keeps_active_rows() {
1960 let tmp = tempfile::tempdir().unwrap();
1961 let cid;
1962 {
1963 let mut s = open(tmp.path());
1964 let c = s.create_collection("c", desc()).unwrap();
1965 cid = c;
1966 for i in 0..6u32 {
1967 s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"{}")
1968 .unwrap();
1969 }
1970 s.checkpoint().unwrap(); for i in 6..12u32 {
1972 s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"{}")
1973 .unwrap();
1974 }
1975 s.checkpoint().unwrap(); s.delete(c, "k0").unwrap();
1977 s.delete(c, "k6").unwrap();
1978 s.checkpoint().unwrap(); assert_eq!(s.collections[&c].sealed.len(), 2);
1980
1981 s.upsert(c, "fresh", &[7.0; 4], b"new").unwrap();
1983 s.compact().unwrap();
1984 assert_eq!(s.collections[&c].sealed.len(), 1, "segments merged to one");
1985 assert!(
1986 !segments_dir(tmp.path(), cid)
1987 .join("seg-0000000000.dir")
1988 .exists(),
1989 "old segment files reclaimed"
1990 );
1991 assert_eq!(s.len(c).unwrap(), 11); assert!(s.get(c, "k0").unwrap().is_none());
1993 assert!(s.get(c, "k6").unwrap().is_none());
1994 assert_eq!(s.get(c, "k5").unwrap().unwrap().vector, vec![5.0; 4]);
1995 assert_eq!(s.get(c, "fresh").unwrap().unwrap().payload, b"new");
1996 }
1997 let s = open(tmp.path());
1999 let c = s.collection_id("c").unwrap();
2000 assert_eq!(s.collections[&c].sealed.len(), 1);
2001 assert_eq!(s.len(c).unwrap(), 11);
2002 assert!(s.get(c, "k0").unwrap().is_none());
2003 assert_eq!(s.get(c, "fresh").unwrap().unwrap().vector, vec![7.0; 4]);
2004 assert_eq!(s.get(c, "k11").unwrap().unwrap().vector, vec![11.0; 4]);
2005 }
2006
2007 #[test]
2008 fn auto_compaction_merges_many_segments() {
2009 let tmp = tempfile::tempdir().unwrap();
2010 let mut s = open(tmp.path());
2011 let c = s.create_collection("c", desc()).unwrap();
2012 for ck in 0..8u32 {
2015 for i in 0..3u32 {
2016 let n = ck * 3 + i;
2017 s.upsert(c, &format!("k{n}"), &[n as f32; 4], b"{}")
2018 .unwrap();
2019 }
2020 s.checkpoint().unwrap();
2021 }
2022 assert!(
2023 s.collections[&c].sealed.len() < COMPACT_MIN_SEGMENTS,
2024 "auto-compaction should have merged the segments"
2025 );
2026 assert_eq!(s.len(c).unwrap(), 24);
2027 assert_eq!(s.get(c, "k0").unwrap().unwrap().vector, vec![0.0; 4]);
2028 assert_eq!(s.get(c, "k23").unwrap().unwrap().vector, vec![23.0; 4]);
2029 }
2030
2031 #[test]
2032 fn matching_ids_spans_secondary_index_and_active_buffer() {
2033 use crate::descriptor::FilterableField;
2034 use crate::sec::SecValue;
2035
2036 let tmp = tempfile::tempdir().unwrap();
2037 let mut s = open(tmp.path());
2038 let descriptor = Descriptor::new(4, Dtype::F32, DistanceMetric::L2).with_filterable(vec![
2039 FilterableField::keyword("city"),
2040 FilterableField::numeric("age"),
2041 ]);
2042 let c = s.create_collection("c", descriptor).unwrap();
2043 s.upsert(c, "a", &[0.0; 4], br#"{"city":"paris","age":30}"#)
2044 .unwrap();
2045 s.upsert(c, "b", &[0.0; 4], br#"{"city":"lyon","age":25}"#)
2046 .unwrap();
2047 s.upsert(c, "d", &[0.0; 4], br#"{"city":"paris","age":40}"#)
2048 .unwrap();
2049 s.checkpoint().unwrap();
2050 s.upsert(c, "e", &[0.0; 4], br#"{"city":"paris","age":22}"#)
2052 .unwrap();
2053
2054 let paris = || SecPredicate::Eq {
2055 field: "city".into(),
2056 value: SecValue::Keyword("paris".into()),
2057 };
2058 assert_eq!(s.matching_ids(c, &paris()).unwrap(), ["a", "d", "e"]);
2059
2060 assert_eq!(
2062 s.matching_ids(
2063 c,
2064 &SecPredicate::Range {
2065 field: "age".into(),
2066 lo: Some(SecValue::Numeric(25.0)),
2067 hi: Some(SecValue::Numeric(35.0)),
2068 lo_inclusive: true,
2069 hi_inclusive: true,
2070 }
2071 )
2072 .unwrap(),
2073 ["a", "b"]
2074 );
2075
2076 s.delete(c, "a").unwrap();
2078 assert_eq!(s.matching_ids(c, &paris()).unwrap(), ["d", "e"]);
2079
2080 assert!(matches!(
2082 s.matching_ids(
2083 c,
2084 &SecPredicate::Eq {
2085 field: "country".into(),
2086 value: SecValue::Keyword("fr".into()),
2087 }
2088 ),
2089 Err(CoreError::InvalidArgument(_))
2090 ));
2091
2092 s.checkpoint().unwrap();
2094 let s = open(tmp.path());
2095 let c = s.collection_id("c").unwrap();
2096 assert_eq!(s.matching_ids(c, &paris()).unwrap(), ["d", "e"]);
2097 }
2098
2099 fn index_snapshot_files(dir: &Path, cid: CollectionId) -> Vec<String> {
2103 let idx = collection_dir(dir, cid).join("index");
2104 let mut names: Vec<String> = fs::read_dir(&idx)
2105 .map(|rd| {
2106 rd.filter_map(std::result::Result::ok)
2107 .filter_map(|e| e.file_name().to_str().map(str::to_owned))
2108 .filter(|n| n.starts_with("idx-"))
2109 .collect()
2110 })
2111 .unwrap_or_default();
2112 names.sort();
2113 names
2114 }
2115
2116 #[test]
2117 fn index_snapshot_round_trips_through_checkpoint_and_reopen() {
2118 let tmp = tempfile::tempdir().unwrap();
2119 let blob = b"opaque-index-bytes".to_vec();
2120 let cid = {
2121 let mut s = open(tmp.path());
2122 let c = s.create_collection("c", desc()).unwrap();
2123 s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
2124 s.checkpoint_with_index_snapshots(&HashMap::from([(c, blob.clone())]))
2125 .unwrap();
2126 assert_eq!(s.read_index_snapshot(c).unwrap(), Some(blob.clone()));
2128 assert_eq!(index_snapshot_files(tmp.path(), c).len(), 1);
2129 c
2130 };
2131 let s = open(tmp.path());
2133 assert_eq!(s.read_index_snapshot(cid).unwrap(), Some(blob));
2134 }
2135
2136 #[test]
2137 fn checkpoint_without_a_snapshot_clears_and_reclaims_it() {
2138 let tmp = tempfile::tempdir().unwrap();
2139 let mut s = open(tmp.path());
2140 let c = s.create_collection("c", desc()).unwrap();
2141 s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
2142 s.checkpoint_with_index_snapshots(&HashMap::from([(c, b"blob".to_vec())]))
2143 .unwrap();
2144 assert!(s.read_index_snapshot(c).unwrap().is_some());
2145
2146 s.upsert(c, "b", &[5.0, 6.0, 7.0, 8.0], b"{}").unwrap();
2148 s.checkpoint().unwrap();
2149 assert_eq!(s.read_index_snapshot(c).unwrap(), None);
2150 assert!(index_snapshot_files(tmp.path(), c).is_empty());
2151
2152 let s = open(tmp.path());
2153 assert_eq!(s.read_index_snapshot(c).unwrap(), None);
2154 }
2155
2156 #[test]
2157 fn a_new_snapshot_supersedes_and_reclaims_the_old_one() {
2158 let tmp = tempfile::tempdir().unwrap();
2159 let mut s = open(tmp.path());
2160 let c = s.create_collection("c", desc()).unwrap();
2161 s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
2162 s.checkpoint_with_index_snapshots(&HashMap::from([(c, b"first".to_vec())]))
2163 .unwrap();
2164 s.upsert(c, "b", &[5.0, 6.0, 7.0, 8.0], b"{}").unwrap();
2165 s.checkpoint_with_index_snapshots(&HashMap::from([(c, b"second".to_vec())]))
2166 .unwrap();
2167
2168 assert_eq!(s.read_index_snapshot(c).unwrap(), Some(b"second".to_vec()));
2169 assert_eq!(index_snapshot_files(tmp.path(), c).len(), 1);
2170 }
2171
2172 #[test]
2173 fn compaction_preserves_the_index_snapshot() {
2174 let tmp = tempfile::tempdir().unwrap();
2175 let mut s = open(tmp.path());
2176 let c = s.create_collection("c", desc()).unwrap();
2177 s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
2178 s.checkpoint_with_index_snapshots(&HashMap::from([(c, b"keep".to_vec())]))
2179 .unwrap();
2180 s.upsert(c, "b", &[5.0, 6.0, 7.0, 8.0], b"{}").unwrap();
2182 s.delete(c, "a").unwrap();
2183 s.checkpoint_with_index_snapshots(&HashMap::from([(c, b"keep".to_vec())]))
2184 .unwrap();
2185 s.compact().unwrap();
2186 assert_eq!(s.read_index_snapshot(c).unwrap(), Some(b"keep".to_vec()));
2187
2188 let s = open(tmp.path());
2189 assert_eq!(s.read_index_snapshot(c).unwrap(), Some(b"keep".to_vec()));
2190 }
2191
2192 #[test]
2193 fn orphan_index_snapshot_is_reclaimed_on_open() {
2194 let tmp = tempfile::tempdir().unwrap();
2195 let cid = {
2196 let mut s = open(tmp.path());
2197 let c = s.create_collection("c", desc()).unwrap();
2198 s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
2199 s.checkpoint_with_index_snapshots(&HashMap::from([(c, b"live".to_vec())]))
2200 .unwrap();
2201 let stray = s.index_dir(c).join("idx-9999999999");
2204 fs::write(&stray, b"orphan").unwrap();
2205 c
2206 };
2207 let s = open(tmp.path());
2208 assert!(!s.index_dir(cid).join("idx-9999999999").exists());
2210 assert_eq!(s.read_index_snapshot(cid).unwrap(), Some(b"live".to_vec()));
2211 }
2212}