1use std::collections::{BTreeMap, HashMap, HashSet};
41use std::fs;
42use std::path::{Path, PathBuf};
43use std::sync::Arc;
44
45use roaring::RoaringBitmap;
46
47use crate::descriptor::Descriptor;
48use crate::error::{CoreError, Result};
49use crate::ids::{CollectionId, Lsn};
50use crate::keyring::{KeyRing, SingleCodecKeyRing};
51use crate::manifest::{
52 self, CollectionEntry, IndexSnapshotRef, MANIFEST_FORMAT_VERSION, Manifest, SegmentRef,
53};
54use crate::page::{PageCodec, PageType};
55use crate::paged::{fsync_dir, read_paged, write_paged};
56use crate::sec::{self, SecPredicate};
57use crate::segment::{self, SealRow, SealedSegment};
58use crate::wal::{self, WalEntry, WalOp, WalWriter};
59
60const COMPACT_MIN_SEGMENTS: usize = 8;
63
64#[derive(Debug, Clone, PartialEq)]
66pub struct Record {
67 pub vector: Vec<f32>,
69 pub payload: Vec<u8>,
71}
72
73#[derive(Debug, Default)]
78pub struct RecoveryTail {
79 pub upserts: Vec<(String, Record)>,
81 pub deleted: Vec<String>,
84}
85
86#[derive(Debug, Clone, Copy)]
89enum Loc {
90 Active(u32),
91 Sealed { seg: u32, row: u32 },
92}
93
94#[derive(Debug, Clone)]
97struct ActiveRow {
98 vector: Vec<u8>,
99 payload: Vec<u8>,
100}
101
102struct CollectionState {
104 id: CollectionId,
105 name: String,
106 descriptor: Descriptor,
107 codec: Box<dyn PageCodec>,
112 stride: usize,
114 primary: BTreeMap<String, Loc>,
117 sealed: Vec<SealedSegment>,
119 segments_meta: Vec<SegmentRef>,
121 active: Vec<ActiveRow>,
123 active_index: BTreeMap<String, u32>,
125 dead_this_window: BTreeMap<u32, RoaringBitmap>,
128 index_snapshot: Option<IndexSnapshotRef>,
131}
132
133impl CollectionState {
134 fn new(
135 id: CollectionId,
136 name: String,
137 descriptor: Descriptor,
138 codec: Box<dyn PageCodec>,
139 ) -> Self {
140 let stride = descriptor.stride();
141 Self {
142 id,
143 name,
144 descriptor,
145 codec,
146 stride,
147 primary: BTreeMap::new(),
148 sealed: Vec::new(),
149 segments_meta: Vec::new(),
150 active: Vec::new(),
151 active_index: BTreeMap::new(),
152 dead_this_window: BTreeMap::new(),
153 index_snapshot: None,
154 }
155 }
156
157 fn has_pending(&self) -> bool {
158 !self.active_index.is_empty() || !self.dead_this_window.is_empty()
159 }
160
161 fn apply_upsert(&mut self, external_id: &str, vector: Vec<u8>, payload: Vec<u8>) {
165 if let Some(Loc::Sealed { seg, row }) = self.primary.get(external_id).copied() {
166 self.dead_this_window.entry(seg).or_default().insert(row);
167 }
168 let row = self.active.len() as u32;
169 self.active.push(ActiveRow { vector, payload });
170 self.active_index.insert(external_id.to_owned(), row);
171 self.primary
172 .insert(external_id.to_owned(), Loc::Active(row));
173 }
174
175 fn apply_delete(&mut self, external_id: &str) -> bool {
179 match self.primary.remove(external_id) {
180 Some(Loc::Sealed { seg, row }) => {
181 self.dead_this_window.entry(seg).or_default().insert(row);
182 self.active_index.remove(external_id);
183 true
184 }
185 Some(Loc::Active(_)) => {
186 self.active_index.remove(external_id);
187 true
188 }
189 None => false,
190 }
191 }
192}
193
194struct PendingSegment {
197 seg_ref: SegmentRef,
198 sealed: SealedSegment,
199}
200
201pub type CommitObserver = Arc<dyn Fn(&WalEntry) + Send + Sync>;
206
207pub struct Store {
209 dir: PathBuf,
210 keyring: Box<dyn KeyRing>,
211 collections: HashMap<CollectionId, CollectionState>,
212 name_index: HashMap<String, CollectionId>,
213 next_lsn: Lsn,
214 next_collection_id: u64,
215 next_segment_id: u64,
216 manifest_version: u64,
217 last_checkpointed_lsn: Lsn,
218 wal: WalWriter,
219 wal_seq: u64,
220 commit_observer: Option<CommitObserver>,
221}
222
223impl Store {
224 pub fn open(dir: &Path) -> Result<Self> {
227 Self::open_with_keyring(dir, Box::new(SingleCodecKeyRing::plaintext()))
228 }
229
230 pub fn open_with_codec(dir: &Path, codec: Box<dyn PageCodec>) -> Result<Self> {
234 Self::open_with_keyring(dir, Box::new(SingleCodecKeyRing::new(codec)))
235 }
236
237 pub fn open_with_keyring(dir: &Path, keyring: Box<dyn KeyRing>) -> Result<Self> {
243 fs::create_dir_all(dir).map_err(|e| CoreError::io(dir, e))?;
244 let wal_dir = dir.join("wal");
245 fs::create_dir_all(&wal_dir).map_err(|e| CoreError::io(&wal_dir, e))?;
246 fsync_dir(dir)?;
247 fsync_dir(&wal_dir)?;
248
249 let mfst = manifest::read_current(dir, keyring.catalog_codec())?.unwrap_or_default();
251
252 let mut collections: HashMap<CollectionId, CollectionState> = HashMap::new();
256 let mut name_index: HashMap<String, CollectionId> = HashMap::new();
257 for entry in &mfst.collections {
258 let descriptor = Descriptor::decode(&entry.descriptor)?;
259 let codec = keyring.collection_codec(entry.id)?;
260 let mut state = CollectionState::new(entry.id, entry.name.clone(), descriptor, codec);
261 state.segments_meta = entry.segments.clone();
262 state.index_snapshot = entry.index_snapshot.clone();
263 let seg_dir = segments_dir(dir, entry.id);
264 for seg in &entry.segments {
265 let sealed = segment::open_segment(&seg_dir, seg.id, state.codec.as_ref())?;
266 let seg_idx = state.sealed.len() as u32;
267 for (row, ext_id) in sealed.row_ids().iter().enumerate() {
268 let row = row as u32;
269 if !sealed.is_dead(row) {
270 state
271 .primary
272 .insert(ext_id.clone(), Loc::Sealed { seg: seg_idx, row });
273 }
274 }
275 state.sealed.push(sealed);
276 }
277 name_index.insert(state.name.clone(), state.id);
278 collections.insert(state.id, state);
279 }
280
281 let floor = mfst.last_checkpointed_lsn;
283 let mut max_lsn = floor;
284 let wal_files = list_wal_files(&wal_dir)?;
285 let mut max_seq = 0u64;
286 let mut keep_seqs: HashSet<u64> = HashSet::new();
287 for (seq, path) in &wal_files {
288 max_seq = (*seq).max(max_seq);
289 let replay = wal::read_all(path, keyring.catalog_codec())?;
290 let mut had_live = false;
291 for entry in replay.entries {
292 if entry.lsn.value() <= floor.value() {
293 continue; }
295 had_live = true;
296 if entry.lsn > max_lsn {
297 max_lsn = entry.lsn;
298 }
299 apply_wal_entry(&mut collections, &mut name_index, &entry, keyring.as_ref())?;
300 }
301 if had_live {
302 keep_seqs.insert(*seq);
303 }
304 }
305 let next_lsn = max_lsn.next();
306
307 gc_orphan_segments(dir, &mfst, keyring.as_ref())?;
311 gc_orphan_index_snapshots(dir, &mfst)?;
312
313 let wal_seq = max_seq + 1;
316 let wal = WalWriter::create(&wal_file_path(&wal_dir, wal_seq), next_lsn)?;
317 fsync_dir(&wal_dir)?;
318 for (seq, path) in &wal_files {
319 if !keep_seqs.contains(seq) {
320 remove_file_if_present(path)?;
321 }
322 }
323 fsync_dir(&wal_dir)?;
324
325 Ok(Self {
326 dir: dir.to_path_buf(),
327 keyring,
328 collections,
329 name_index,
330 next_lsn,
331 next_collection_id: mfst.next_collection_id,
332 next_segment_id: mfst.next_segment_id,
333 manifest_version: mfst.version,
334 last_checkpointed_lsn: floor,
335 wal,
336 wal_seq,
337 commit_observer: None,
338 })
339 }
340
341 pub fn set_commit_observer(&mut self, observer: CommitObserver) {
345 self.commit_observer = Some(observer);
346 }
347
348 fn publish(&self, entry: &WalEntry) {
350 if let Some(observer) = &self.commit_observer {
351 observer(entry);
352 }
353 }
354
355 pub fn replication_snapshot(&self) -> Result<Vec<WalOp>> {
361 let mut ops = Vec::new();
362 for (&id, state) in &self.collections {
363 ops.push(WalOp::CreateCollection {
364 collection_id: id,
365 name: state.name.clone(),
366 descriptor: postcard::to_allocvec(&state.descriptor)?,
367 });
368 for (external_id, record) in self.scan(id)? {
369 ops.push(WalOp::Upsert {
370 collection_id: id,
371 external_id,
372 vector: f32_to_le_bytes(&record.vector),
373 payload: record.payload,
374 });
375 }
376 }
377 Ok(ops)
378 }
379
380 pub fn apply_replicated(&mut self, op: WalOp) -> Result<()> {
386 if let WalOp::Checkpoint { .. } = op {
387 return Ok(());
388 }
389 if let WalOp::CreateCollection { collection_id, .. } = &op {
390 self.keyring.provision_collection(*collection_id)?;
393 self.next_collection_id = self.next_collection_id.max(collection_id.0 + 1);
394 }
395 let lsn = self.next_lsn;
396 let entry = WalEntry { lsn, op };
397 self.wal.append_sync(self.keyring.catalog_codec(), &entry)?;
398 self.next_lsn = lsn.next();
399 apply_wal_entry(
400 &mut self.collections,
401 &mut self.name_index,
402 &entry,
403 self.keyring.as_ref(),
404 )?;
405 self.publish(&entry);
406 Ok(())
407 }
408
409 pub fn create_collection(
411 &mut self,
412 name: &str,
413 descriptor: Descriptor,
414 ) -> Result<CollectionId> {
415 if self.name_index.contains_key(name) {
416 return Err(CoreError::AlreadyExists(format!("collection {name}")));
417 }
418 if descriptor.dim == 0 {
419 return Err(CoreError::InvalidArgument(
420 "dim must be non-zero".to_owned(),
421 ));
422 }
423 let id = CollectionId(self.next_collection_id);
424 self.keyring.provision_collection(id)?;
427 let descriptor_bytes = postcard::to_allocvec(&descriptor)?;
428 let lsn = self.next_lsn;
429 let entry = WalEntry {
430 lsn,
431 op: WalOp::CreateCollection {
432 collection_id: id,
433 name: name.to_owned(),
434 descriptor: descriptor_bytes,
435 },
436 };
437 self.wal.append_sync(self.keyring.catalog_codec(), &entry)?;
438 self.next_lsn = lsn.next();
439 self.publish(&entry);
440 self.next_collection_id += 1;
441 let codec = self.keyring.collection_codec(id)?;
442 self.collections.insert(
443 id,
444 CollectionState::new(id, name.to_owned(), descriptor, codec),
445 );
446 self.name_index.insert(name.to_owned(), id);
447 Ok(id)
448 }
449
450 pub fn drop_collection(&mut self, name: &str) -> Result<bool> {
453 let Some(&id) = self.name_index.get(name) else {
454 return Ok(false);
455 };
456 let lsn = self.next_lsn;
457 let entry = WalEntry {
458 lsn,
459 op: WalOp::DropCollection { collection_id: id },
460 };
461 self.wal.append_sync(self.keyring.catalog_codec(), &entry)?;
462 self.next_lsn = lsn.next();
463 self.publish(&entry);
464 self.collections.remove(&id);
465 self.name_index.remove(name);
466 Ok(true)
467 }
468
469 pub fn shred_collection(&mut self, name: &str) -> Result<bool> {
476 let Some(id) = self.collection_id(name) else {
477 return Ok(false);
478 };
479 self.drop_collection(name)?;
480 self.checkpoint()?;
484 self.keyring.shred_collection(id)?;
487 Ok(true)
488 }
489
490 pub fn upsert(
494 &mut self,
495 collection: CollectionId,
496 external_id: &str,
497 vector: &[f32],
498 payload: &[u8],
499 ) -> Result<Lsn> {
500 let dim = self
501 .collections
502 .get(&collection)
503 .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?
504 .descriptor
505 .dim as usize;
506 if vector.len() != dim {
507 return Err(CoreError::InvalidArgument(format!(
508 "vector has {} dims, collection expects {dim}",
509 vector.len()
510 )));
511 }
512 let vector_bytes = f32_to_le_bytes(vector);
513 let lsn = self.next_lsn;
514 let entry = WalEntry {
515 lsn,
516 op: WalOp::Upsert {
517 collection_id: collection,
518 external_id: external_id.to_owned(),
519 vector: vector_bytes.clone(),
520 payload: payload.to_vec(),
521 },
522 };
523 self.wal.append_sync(self.keyring.catalog_codec(), &entry)?;
524 self.next_lsn = lsn.next();
525 self.publish(&entry);
526 let state = self
527 .collections
528 .get_mut(&collection)
529 .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
530 state.apply_upsert(external_id, vector_bytes, payload.to_vec());
531 Ok(lsn)
532 }
533
534 pub fn upsert_batch(
544 &mut self,
545 collection: CollectionId,
546 records: &[(&str, &[f32], &[u8])],
547 ) -> Result<u64> {
548 if records.is_empty() {
549 return Ok(0);
550 }
551 let dim = self
552 .collections
553 .get(&collection)
554 .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?
555 .descriptor
556 .dim as usize;
557 for (_, vector, _) in records {
558 if vector.len() != dim {
559 return Err(CoreError::InvalidArgument(format!(
560 "vector has {} dims, collection expects {dim}",
561 vector.len()
562 )));
563 }
564 }
565
566 let mut entries: Vec<WalEntry> = Vec::with_capacity(records.len());
568 for (ext_id, vector, payload) in records {
569 let lsn = self.next_lsn;
570 self.next_lsn = lsn.next();
571 entries.push(WalEntry {
572 lsn,
573 op: WalOp::Upsert {
574 collection_id: collection,
575 external_id: ext_id.to_string(),
576 vector: f32_to_le_bytes(vector),
577 payload: payload.to_vec(),
578 },
579 });
580 }
581
582 for entry in &entries {
584 self.wal.append(self.keyring.catalog_codec(), entry)?;
585 }
586 self.wal.sync()?;
587
588 for entry in &entries {
590 self.publish(entry);
591 if let WalOp::Upsert {
592 external_id,
593 vector,
594 payload,
595 ..
596 } = &entry.op
597 {
598 let state = self
599 .collections
600 .get_mut(&collection)
601 .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
602 state.apply_upsert(external_id, vector.clone(), payload.clone());
603 }
604 }
605 Ok(records.len() as u64)
606 }
607
608 pub fn delete(&mut self, collection: CollectionId, external_id: &str) -> Result<bool> {
610 let existed = self
611 .collections
612 .get(&collection)
613 .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?
614 .primary
615 .contains_key(external_id);
616 if !existed {
617 return Ok(false);
618 }
619 let lsn = self.next_lsn;
620 let entry = WalEntry {
621 lsn,
622 op: WalOp::Delete {
623 collection_id: collection,
624 external_id: external_id.to_owned(),
625 },
626 };
627 self.wal.append_sync(self.keyring.catalog_codec(), &entry)?;
628 self.next_lsn = lsn.next();
629 self.publish(&entry);
630 let state = self
631 .collections
632 .get_mut(&collection)
633 .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
634 state.apply_delete(external_id);
635 Ok(true)
636 }
637
638 pub fn prepare_create_collection(&self, name: &str, descriptor: &Descriptor) -> Result<WalOp> {
646 if self.name_index.contains_key(name) {
647 return Err(CoreError::AlreadyExists(format!("collection {name}")));
648 }
649 if descriptor.dim == 0 {
650 return Err(CoreError::InvalidArgument(
651 "dim must be non-zero".to_owned(),
652 ));
653 }
654 Ok(WalOp::CreateCollection {
655 collection_id: CollectionId(self.next_collection_id),
656 name: name.to_owned(),
657 descriptor: postcard::to_allocvec(descriptor)?,
658 })
659 }
660
661 pub fn prepare_upsert(
667 &self,
668 collection: CollectionId,
669 external_id: &str,
670 vector: &[f32],
671 payload: &[u8],
672 ) -> Result<WalOp> {
673 let dim = self
674 .collections
675 .get(&collection)
676 .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?
677 .descriptor
678 .dim as usize;
679 if vector.len() != dim {
680 return Err(CoreError::InvalidArgument(format!(
681 "vector has {} dims, collection expects {dim}",
682 vector.len()
683 )));
684 }
685 Ok(WalOp::Upsert {
686 collection_id: collection,
687 external_id: external_id.to_owned(),
688 vector: f32_to_le_bytes(vector),
689 payload: payload.to_vec(),
690 })
691 }
692
693 pub fn prepare_delete(
696 &self,
697 collection: CollectionId,
698 external_id: &str,
699 ) -> Result<Option<WalOp>> {
700 let existed = self
701 .collections
702 .get(&collection)
703 .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?
704 .primary
705 .contains_key(external_id);
706 Ok(existed.then(|| WalOp::Delete {
707 collection_id: collection,
708 external_id: external_id.to_owned(),
709 }))
710 }
711
712 pub fn get(&self, collection: CollectionId, external_id: &str) -> Result<Option<Record>> {
714 let state = self
715 .collections
716 .get(&collection)
717 .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
718 match state.primary.get(external_id).copied() {
719 Some(loc) => Ok(Some(self.record_at(state, loc)?)),
720 None => Ok(None),
721 }
722 }
723
724 pub fn scan(&self, collection: CollectionId) -> Result<Vec<(String, Record)>> {
727 let state = self
728 .collections
729 .get(&collection)
730 .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
731 let mut out = Vec::with_capacity(state.primary.len());
732 for (id, &loc) in &state.primary {
733 out.push((id.clone(), self.record_at(state, loc)?));
734 }
735 Ok(out)
736 }
737
738 fn record_at(&self, state: &CollectionState, loc: Loc) -> Result<Record> {
741 match loc {
742 Loc::Active(r) => {
743 let row = state
744 .active
745 .get(r as usize)
746 .ok_or_else(|| CoreError::MalformedPage(format!("dangling active row {r}")))?;
747 Ok(Record {
748 vector: le_bytes_to_f32(&row.vector),
749 payload: row.payload.clone(),
750 })
751 }
752 Loc::Sealed { seg, row } => {
753 let segment = state.sealed.get(seg as usize).ok_or_else(|| {
754 CoreError::MalformedPage(format!("dangling segment index {seg}"))
755 })?;
756 let vector_bytes = segment.read_vector(state.codec.as_ref(), row, state.stride)?;
757 let payload = segment.read_payload(state.codec.as_ref(), row)?;
758 Ok(Record {
759 vector: le_bytes_to_f32(&vector_bytes),
760 payload,
761 })
762 }
763 }
764 }
765
766 #[must_use]
768 pub fn collection_id(&self, name: &str) -> Option<CollectionId> {
769 self.name_index.get(name).copied()
770 }
771
772 #[must_use]
774 pub fn descriptor(&self, collection: CollectionId) -> Option<&Descriptor> {
775 self.collections.get(&collection).map(|s| &s.descriptor)
776 }
777
778 pub fn collection_codec_clone(&self, collection: CollectionId) -> Result<Box<dyn PageCodec>> {
786 self.collections
787 .get(&collection)
788 .map(|s| s.codec.clone_box())
789 .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))
790 }
791
792 #[must_use]
794 pub fn dir(&self) -> &Path {
795 &self.dir
796 }
797
798 #[must_use]
801 pub fn manifest_version(&self) -> u64 {
802 self.manifest_version
803 }
804
805 #[must_use]
808 pub fn index_dir(&self, collection: CollectionId) -> PathBuf {
809 collection_dir(&self.dir, collection).join("index")
810 }
811
812 pub fn read_index_snapshot(&self, collection: CollectionId) -> Result<Option<Vec<u8>>> {
821 let state = self
822 .collections
823 .get(&collection)
824 .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
825 let Some(snap) = &state.index_snapshot else {
826 return Ok(None);
827 };
828 let path = self
829 .index_dir(collection)
830 .join(index_snapshot_file_name(snap.id));
831 let body = read_paged(&path, state.codec.as_ref(), PageType::IndexBlock)?;
832 Ok(Some(body))
833 }
834
835 pub fn recovery_tail(&self, collection: CollectionId) -> Result<RecoveryTail> {
843 let state = self
844 .collections
845 .get(&collection)
846 .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
847 let mut upserts = Vec::with_capacity(state.active_index.len());
848 for (ext_id, &row) in &state.active_index {
849 let ar = &state.active[row as usize];
850 upserts.push((
851 ext_id.clone(),
852 Record {
853 vector: le_bytes_to_f32(&ar.vector),
854 payload: ar.payload.clone(),
855 },
856 ));
857 }
858 let mut deleted = Vec::new();
859 for (&seg_idx, bitmap) in &state.dead_this_window {
860 if let Some(seg) = state.sealed.get(seg_idx as usize) {
861 let row_ids = seg.row_ids();
862 for row in bitmap.iter() {
863 if let Some(ext) = row_ids.get(row as usize) {
864 deleted.push(ext.clone());
865 }
866 }
867 }
868 }
869 Ok(RecoveryTail { upserts, deleted })
870 }
871
872 pub fn len(&self, collection: CollectionId) -> Result<usize> {
874 Ok(self
875 .collections
876 .get(&collection)
877 .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?
878 .primary
879 .len())
880 }
881
882 pub fn is_empty(&self, collection: CollectionId) -> Result<bool> {
884 Ok(self.len(collection)? == 0)
885 }
886
887 #[must_use]
889 pub fn collection_names(&self) -> Vec<String> {
890 let mut names: Vec<String> = self.name_index.keys().cloned().collect();
891 names.sort();
892 names
893 }
894
895 pub fn matching_ids(
905 &self,
906 collection: CollectionId,
907 predicate: &SecPredicate,
908 ) -> Result<Vec<String>> {
909 let state = self
910 .collections
911 .get(&collection)
912 .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
913 let field_type = state
914 .descriptor
915 .filterable
916 .iter()
917 .find(|f| f.path == predicate.field())
918 .map(|f| f.field_type)
919 .ok_or_else(|| {
920 CoreError::InvalidArgument(format!("field {} is not filterable", predicate.field()))
921 })?;
922
923 let mut out: Vec<String> = Vec::new();
924 for (seg_idx, segment) in state.sealed.iter().enumerate() {
927 let seg_idx = seg_idx as u32;
928 let Some(rows) = segment.sec_query(predicate)? else {
929 continue;
930 };
931 for row in rows {
932 if segment.is_dead(row) {
933 continue;
934 }
935 let Some(ext_id) = segment.row_ids().get(row as usize) else {
936 continue;
937 };
938 if matches!(
939 state.primary.get(ext_id),
940 Some(Loc::Sealed { seg: s, row: r }) if *s == seg_idx && *r == row
941 ) {
942 out.push(ext_id.clone());
943 }
944 }
945 }
946 for (ext_id, &row) in &state.active_index {
948 if let Some(active) = state.active.get(row as usize)
949 && sec::payload_matches(predicate, field_type, &active.payload)
950 {
951 out.push(ext_id.clone());
952 }
953 }
954 out.sort();
955 out.dedup();
956 Ok(out)
957 }
958
959 pub fn checkpoint(&mut self) -> Result<()> {
967 self.checkpoint_with_index_snapshots(&HashMap::new())
968 }
969
970 pub fn checkpoint_with_index_snapshots(
978 &mut self,
979 index_snapshots: &HashMap<CollectionId, Vec<u8>>,
980 ) -> Result<()> {
981 let last_lsn = Lsn(self.next_lsn.value().saturating_sub(1));
982 if last_lsn.value() <= self.last_checkpointed_lsn.value() {
983 return Ok(()); }
985 let mut cids: Vec<CollectionId> = self.collections.keys().copied().collect();
986 cids.sort();
987 let segment_lsn_low = self.last_checkpointed_lsn.next();
988 let new_version = self.manifest_version + 1;
989
990 let mut pending: HashMap<CollectionId, PendingSegment> = HashMap::new();
994 for &cid in &cids {
995 if !self.collections[&cid].has_pending() {
996 continue;
997 }
998 let seg_dir = segments_dir(&self.dir, cid);
999 fs::create_dir_all(&seg_dir).map_err(|e| CoreError::io(&seg_dir, e))?;
1000 let codec = self.collections[&cid].codec.clone_box();
1003
1004 {
1007 let state = &self.collections[&cid];
1008 for (&seg_idx, newly_dead) in &state.dead_this_window {
1009 if let Some(seg) = state.sealed.get(seg_idx as usize) {
1010 let mut merged = seg.dead_bitmap();
1011 merged |= newly_dead;
1012 segment::write_del(&seg_dir, seg.seg_id, codec.as_ref(), &merged)?;
1013 }
1014 }
1015 }
1016
1017 let new_seg = if self.collections[&cid].active_index.is_empty() {
1021 None
1022 } else {
1023 let seg_id = self.next_segment_id;
1024 self.next_segment_id += 1;
1025 let row_count = {
1026 let state = &self.collections[&cid];
1027 let seal_rows: Vec<SealRow<'_>> = state
1028 .active_index
1029 .iter()
1030 .map(|(id, &row)| SealRow {
1031 external_id: id,
1032 vector: &state.active[row as usize].vector,
1033 payload: &state.active[row as usize].payload,
1034 })
1035 .collect();
1036 segment::write_segment(
1037 &seg_dir,
1038 seg_id,
1039 codec.as_ref(),
1040 &seal_rows,
1041 &state.descriptor.filterable,
1042 )?;
1043 seal_rows.len() as u64
1044 };
1045 Some((seg_id, row_count))
1046 };
1047
1048 fsync_dir(&seg_dir)?;
1051 fsync_dir(&collection_dir(&self.dir, cid))?;
1052 fsync_dir(&self.dir.join("collections"))?;
1053 fsync_dir(&self.dir)?;
1054
1055 if let Some((seg_id, row_count)) = new_seg {
1056 let sealed = segment::open_segment(&seg_dir, seg_id, codec.as_ref())?;
1057 pending.insert(
1058 cid,
1059 PendingSegment {
1060 seg_ref: SegmentRef {
1061 id: seg_id,
1062 row_count,
1063 lsn_low: segment_lsn_low,
1064 lsn_high: last_lsn,
1065 },
1066 sealed,
1067 },
1068 );
1069 }
1070 }
1071
1072 let mut new_index_refs: HashMap<CollectionId, IndexSnapshotRef> = HashMap::new();
1077 for &cid in &cids {
1078 let Some(blob) = index_snapshots.get(&cid) else {
1079 continue;
1080 };
1081 let index_dir = self.index_dir(cid);
1082 fs::create_dir_all(&index_dir).map_err(|e| CoreError::io(&index_dir, e))?;
1083 let codec = self.collections[&cid].codec.clone_box();
1084 let path = index_dir.join(index_snapshot_file_name(new_version));
1085 write_paged(
1086 &path,
1087 codec.as_ref(),
1088 PageType::IndexBlock,
1089 new_version,
1090 blob,
1091 )?;
1092 fsync_dir(&index_dir)?;
1093 fsync_dir(&collection_dir(&self.dir, cid))?;
1094 new_index_refs.insert(
1095 cid,
1096 IndexSnapshotRef {
1097 id: new_version,
1098 lsn: last_lsn,
1099 },
1100 );
1101 }
1102
1103 let mut entries = Vec::with_capacity(cids.len());
1105 for &cid in &cids {
1106 let state = &self.collections[&cid];
1107 let mut segs = state.segments_meta.clone();
1108 if let Some(p) = pending.get(&cid) {
1109 segs.push(p.seg_ref.clone());
1110 }
1111 entries.push(CollectionEntry {
1112 id: state.id,
1113 name: state.name.clone(),
1114 descriptor: postcard::to_allocvec(&state.descriptor)?,
1115 segments: segs,
1116 index_snapshot: new_index_refs.get(&cid).cloned(),
1117 });
1118 }
1119 let new_manifest = Manifest {
1120 format_version: MANIFEST_FORMAT_VERSION,
1121 version: new_version,
1122 last_checkpointed_lsn: last_lsn,
1123 next_collection_id: self.next_collection_id,
1124 next_segment_id: self.next_segment_id,
1125 collections: entries,
1126 };
1127 manifest::write_manifest(&self.dir, &new_manifest, self.keyring.catalog_codec())?;
1128
1129 self.manifest_version = new_version;
1131 self.last_checkpointed_lsn = last_lsn;
1132 for &cid in &cids {
1133 let Some(state) = self.collections.get_mut(&cid) else {
1134 continue;
1135 };
1136 let dead_window = std::mem::take(&mut state.dead_this_window);
1139 for (seg_idx, bitmap) in dead_window {
1140 if let Some(seg) = state.sealed.get_mut(seg_idx as usize) {
1141 seg.mark_dead(&bitmap);
1142 }
1143 }
1144 if let Some(p) = pending.remove(&cid) {
1146 let seg_idx = state.sealed.len() as u32;
1147 for (row, ext_id) in p.sealed.row_ids().iter().enumerate() {
1148 state.primary.insert(
1149 ext_id.clone(),
1150 Loc::Sealed {
1151 seg: seg_idx,
1152 row: row as u32,
1153 },
1154 );
1155 }
1156 state.sealed.push(p.sealed);
1157 state.segments_meta.push(p.seg_ref);
1158 }
1159 state.active.clear();
1160 state.active_index.clear();
1161 state.index_snapshot = new_index_refs.get(&cid).cloned();
1162 }
1163 self.rotate_wal()?;
1164 gc_orphan_segments(&self.dir, &new_manifest, self.keyring.as_ref())?;
1165 gc_orphan_index_snapshots(&self.dir, &new_manifest)?;
1166 self.auto_compact()?;
1167 Ok(())
1168 }
1169
1170 pub fn compact(&mut self) -> Result<()> {
1176 for cid in self.sorted_cids() {
1177 if self.reclaimable(cid) {
1178 self.compact_collection(cid)?;
1179 }
1180 }
1181 Ok(())
1182 }
1183
1184 fn auto_compact(&mut self) -> Result<()> {
1187 for cid in self.sorted_cids() {
1188 if self.needs_compaction(cid) {
1189 self.compact_collection(cid)?;
1190 }
1191 }
1192 Ok(())
1193 }
1194
1195 fn sorted_cids(&self) -> Vec<CollectionId> {
1196 let mut cids: Vec<CollectionId> = self.collections.keys().copied().collect();
1197 cids.sort();
1198 cids
1199 }
1200
1201 fn reclaimable(&self, cid: CollectionId) -> bool {
1204 self.collections.get(&cid).is_some_and(|s| {
1205 s.sealed.len() > 1
1206 || s.sealed
1207 .iter()
1208 .any(|seg| seg.live_count() < u64::from(seg.row_count()))
1209 })
1210 }
1211
1212 fn needs_compaction(&self, cid: CollectionId) -> bool {
1215 let Some(s) = self.collections.get(&cid) else {
1216 return false;
1217 };
1218 if s.sealed.is_empty() {
1219 return false;
1220 }
1221 let total: u64 = s.sealed.iter().map(|seg| u64::from(seg.row_count())).sum();
1222 let live: u64 = s.sealed.iter().map(SealedSegment::live_count).sum();
1223 s.sealed.len() >= COMPACT_MIN_SEGMENTS || (total > 0 && (total - live) * 2 >= total)
1224 }
1225
1226 fn compact_collection(&mut self, cid: CollectionId) -> Result<()> {
1229 let codec = self
1232 .collections
1233 .get(&cid)
1234 .ok_or_else(|| CoreError::NotFound(format!("collection {cid}")))?
1235 .codec
1236 .clone_box();
1237 let live: Vec<(String, Vec<u8>, Vec<u8>)> = {
1240 let state = self
1241 .collections
1242 .get(&cid)
1243 .ok_or_else(|| CoreError::NotFound(format!("collection {cid}")))?;
1244 let mut out = Vec::with_capacity(state.primary.len());
1245 for (ext_id, &loc) in &state.primary {
1246 if let Loc::Sealed { seg, row } = loc {
1247 let segment = state.sealed.get(seg as usize).ok_or_else(|| {
1248 CoreError::MalformedPage(format!("dangling segment index {seg}"))
1249 })?;
1250 let vector = segment.read_vector(codec.as_ref(), row, state.stride)?;
1251 let payload = segment.read_payload(codec.as_ref(), row)?;
1252 out.push((ext_id.clone(), vector, payload));
1253 }
1254 }
1255 out
1256 };
1257
1258 let (lsn_low, lsn_high) = {
1260 let state = &self.collections[&cid];
1261 let low = state
1262 .segments_meta
1263 .iter()
1264 .map(|s| s.lsn_low.value())
1265 .min()
1266 .map(Lsn)
1267 .unwrap_or(Lsn::ZERO);
1268 let high = state
1269 .segments_meta
1270 .iter()
1271 .map(|s| s.lsn_high.value())
1272 .max()
1273 .map(Lsn)
1274 .unwrap_or(self.last_checkpointed_lsn);
1275 (low, high)
1276 };
1277
1278 let seg_id = self.next_segment_id;
1279 self.next_segment_id += 1;
1280 let seg_dir = segments_dir(&self.dir, cid);
1281 fs::create_dir_all(&seg_dir).map_err(|e| CoreError::io(&seg_dir, e))?;
1282 let seal_rows: Vec<SealRow<'_>> = live
1283 .iter()
1284 .map(|(id, v, p)| SealRow {
1285 external_id: id,
1286 vector: v,
1287 payload: p,
1288 })
1289 .collect();
1290 segment::write_segment(
1291 &seg_dir,
1292 seg_id,
1293 codec.as_ref(),
1294 &seal_rows,
1295 &self.collections[&cid].descriptor.filterable,
1296 )?;
1297 fsync_dir(&seg_dir)?;
1298 fsync_dir(&collection_dir(&self.dir, cid))?;
1299 fsync_dir(&self.dir.join("collections"))?;
1300 fsync_dir(&self.dir)?;
1301 let new_ref = SegmentRef {
1302 id: seg_id,
1303 row_count: seal_rows.len() as u64,
1304 lsn_low,
1305 lsn_high,
1306 };
1307 let sealed = segment::open_segment(&seg_dir, seg_id, codec.as_ref())?;
1308
1309 let new_version = self.manifest_version + 1;
1312 let mut entries = Vec::with_capacity(self.collections.len());
1313 for &other in &self.sorted_cids() {
1314 let state = &self.collections[&other];
1315 let segs = if other == cid {
1316 vec![new_ref.clone()]
1317 } else {
1318 state.segments_meta.clone()
1319 };
1320 entries.push(CollectionEntry {
1321 id: state.id,
1322 name: state.name.clone(),
1323 descriptor: postcard::to_allocvec(&state.descriptor)?,
1324 segments: segs,
1325 index_snapshot: state.index_snapshot.clone(),
1326 });
1327 }
1328 let new_manifest = Manifest {
1329 format_version: MANIFEST_FORMAT_VERSION,
1330 version: new_version,
1331 last_checkpointed_lsn: self.last_checkpointed_lsn,
1332 next_collection_id: self.next_collection_id,
1333 next_segment_id: self.next_segment_id,
1334 collections: entries,
1335 };
1336 manifest::write_manifest(&self.dir, &new_manifest, self.keyring.catalog_codec())?;
1337
1338 self.manifest_version = new_version;
1342 let row_ids: Vec<String> = sealed.row_ids().to_vec();
1343 if let Some(state) = self.collections.get_mut(&cid) {
1344 state.sealed = vec![sealed];
1345 state.segments_meta = vec![new_ref];
1346 state.dead_this_window.clear();
1347 for (row, ext_id) in row_ids.into_iter().enumerate() {
1348 state.primary.insert(
1349 ext_id,
1350 Loc::Sealed {
1351 seg: 0,
1352 row: row as u32,
1353 },
1354 );
1355 }
1356 }
1357 gc_orphan_segments(&self.dir, &new_manifest, self.keyring.as_ref())?;
1358 gc_orphan_index_snapshots(&self.dir, &new_manifest)?;
1359 Ok(())
1360 }
1361
1362 fn rotate_wal(&mut self) -> Result<()> {
1365 let wal_dir = self.dir.join("wal");
1366 let old_seq = self.wal_seq;
1367 let new_seq = old_seq + 1;
1368 let new_wal = WalWriter::create(&wal_file_path(&wal_dir, new_seq), self.next_lsn)?;
1369 fsync_dir(&wal_dir)?;
1370 self.wal = new_wal;
1371 self.wal_seq = new_seq;
1372 for (seq, path) in list_wal_files(&wal_dir)? {
1373 if seq <= old_seq {
1374 remove_file_if_present(&path)?;
1375 }
1376 }
1377 fsync_dir(&wal_dir)?;
1378 Ok(())
1379 }
1380}
1381
1382fn apply_wal_entry(
1386 collections: &mut HashMap<CollectionId, CollectionState>,
1387 name_index: &mut HashMap<String, CollectionId>,
1388 entry: &WalEntry,
1389 keyring: &dyn KeyRing,
1390) -> Result<()> {
1391 match &entry.op {
1392 WalOp::CreateCollection {
1393 collection_id,
1394 name,
1395 descriptor,
1396 } => {
1397 let descriptor = Descriptor::decode(descriptor)?;
1398 let codec = keyring.collection_codec(*collection_id)?;
1401 name_index.insert(name.clone(), *collection_id);
1402 collections.insert(
1403 *collection_id,
1404 CollectionState::new(*collection_id, name.clone(), descriptor, codec),
1405 );
1406 }
1407 WalOp::DropCollection { collection_id } => {
1408 if let Some(state) = collections.remove(collection_id) {
1409 name_index.remove(&state.name);
1410 }
1411 }
1412 WalOp::Upsert {
1413 collection_id,
1414 external_id,
1415 vector,
1416 payload,
1417 } => {
1418 if let Some(state) = collections.get_mut(collection_id) {
1419 state.apply_upsert(external_id, vector.clone(), payload.clone());
1420 }
1421 }
1422 WalOp::Delete {
1423 collection_id,
1424 external_id,
1425 } => {
1426 if let Some(state) = collections.get_mut(collection_id) {
1427 state.apply_delete(external_id);
1428 }
1429 }
1430 WalOp::Checkpoint { .. } => {}
1433 }
1434 Ok(())
1435}
1436
1437fn gc_orphan_segments(dir: &Path, mfst: &Manifest, keyring: &dyn KeyRing) -> Result<()> {
1440 let collections_dir = dir.join("collections");
1441 if !collections_dir.exists() {
1442 return Ok(());
1443 }
1444 let mut referenced: HashSet<(u64, u64)> = HashSet::new();
1445 let mut live_collections: HashSet<u64> = HashSet::new();
1446 for c in &mfst.collections {
1447 live_collections.insert(c.id.value());
1448 for s in &c.segments {
1449 referenced.insert((c.id.value(), s.id));
1450 }
1451 }
1452 for entry in fs::read_dir(&collections_dir).map_err(|e| CoreError::io(&collections_dir, e))? {
1453 let entry = entry.map_err(|e| CoreError::io(&collections_dir, e))?;
1454 let cdir = entry.path();
1455 let Some(cid) = entry
1456 .file_name()
1457 .to_str()
1458 .and_then(|n| n.parse::<u64>().ok())
1459 else {
1460 continue;
1461 };
1462 if !live_collections.contains(&cid) {
1463 keyring.shred_collection(CollectionId(cid))?;
1467 if cdir.is_dir() {
1468 fs::remove_dir_all(&cdir).map_err(|e| CoreError::io(&cdir, e))?;
1469 }
1470 continue;
1471 }
1472 let seg_dir = cdir.join("segments");
1473 if !seg_dir.is_dir() {
1474 continue;
1475 }
1476 for seg in fs::read_dir(&seg_dir).map_err(|e| CoreError::io(&seg_dir, e))? {
1477 let seg = seg.map_err(|e| CoreError::io(&seg_dir, e))?;
1478 let path = seg.path();
1479 let Some(name) = seg.file_name().to_str().map(str::to_owned) else {
1480 continue;
1481 };
1482 if segment::is_temp_file(&name) {
1484 remove_file_if_present(&path)?;
1485 continue;
1486 }
1487 let Some(seg_id) = segment::seg_id_of_file(&name) else {
1488 continue;
1489 };
1490 if !referenced.contains(&(cid, seg_id)) {
1491 remove_file_if_present(&path)?;
1492 }
1493 }
1494 }
1495 Ok(())
1496}
1497
1498fn gc_orphan_index_snapshots(dir: &Path, mfst: &Manifest) -> Result<()> {
1504 for c in &mfst.collections {
1505 let index_dir = collection_dir(dir, c.id).join("index");
1506 if !index_dir.is_dir() {
1507 continue;
1508 }
1509 let keep = c.index_snapshot.as_ref().map(|r| r.id);
1510 for entry in fs::read_dir(&index_dir).map_err(|e| CoreError::io(&index_dir, e))? {
1511 let entry = entry.map_err(|e| CoreError::io(&index_dir, e))?;
1512 let Some(name) = entry.file_name().to_str().map(str::to_owned) else {
1513 continue;
1514 };
1515 let Some(id) = index_snapshot_id_of_file(&name) else {
1516 continue; };
1518 if Some(id) != keep {
1519 remove_file_if_present(&entry.path())?;
1520 }
1521 }
1522 }
1523 Ok(())
1524}
1525
1526fn remove_file_if_present(path: &Path) -> Result<()> {
1527 match fs::remove_file(path) {
1528 Ok(()) => Ok(()),
1529 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
1530 Err(e) => Err(CoreError::io(path, e)),
1531 }
1532}
1533
1534fn collection_dir(dir: &Path, cid: CollectionId) -> PathBuf {
1535 dir.join("collections").join(format!("{:010}", cid.value()))
1536}
1537
1538fn segments_dir(dir: &Path, cid: CollectionId) -> PathBuf {
1539 collection_dir(dir, cid).join("segments")
1540}
1541
1542fn index_snapshot_file_name(id: u64) -> String {
1545 format!("idx-{id:010}")
1546}
1547
1548fn index_snapshot_id_of_file(name: &str) -> Option<u64> {
1551 name.strip_prefix("idx-")
1552 .and_then(|s| s.parse::<u64>().ok())
1553}
1554
1555fn wal_file_path(wal_dir: &Path, seq: u64) -> PathBuf {
1556 wal_dir.join(format!("wal-{seq:010}.log"))
1557}
1558
1559fn list_wal_files(wal_dir: &Path) -> Result<Vec<(u64, PathBuf)>> {
1560 let mut out = Vec::new();
1561 for entry in fs::read_dir(wal_dir).map_err(|e| CoreError::io(wal_dir, e))? {
1562 let entry = entry.map_err(|e| CoreError::io(wal_dir, e))?;
1563 if let Some(seq) = entry.file_name().to_str().and_then(parse_wal_file_name) {
1564 out.push((seq, entry.path()));
1565 }
1566 }
1567 out.sort_by_key(|(seq, _)| *seq);
1568 Ok(out)
1569}
1570
1571fn parse_wal_file_name(name: &str) -> Option<u64> {
1572 name.strip_prefix("wal-")
1573 .and_then(|s| s.strip_suffix(".log"))
1574 .and_then(|s| s.parse::<u64>().ok())
1575}
1576
1577fn f32_to_le_bytes(v: &[f32]) -> Vec<u8> {
1578 let mut out = Vec::with_capacity(v.len() * 4);
1579 for &x in v {
1580 out.extend_from_slice(&x.to_le_bytes());
1581 }
1582 out
1583}
1584
1585fn le_bytes_to_f32(bytes: &[u8]) -> Vec<f32> {
1586 bytes
1587 .chunks_exact(4)
1588 .map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
1589 .collect()
1590}
1591
1592#[cfg(test)]
1593mod tests {
1594 use super::*;
1595 use crate::descriptor::{DistanceMetric, Dtype};
1596
1597 fn desc() -> Descriptor {
1598 Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
1599 }
1600
1601 fn open(dir: &Path) -> Store {
1602 Store::open(dir).unwrap()
1603 }
1604
1605 fn seg_dir_file(dir: &Path, cid: CollectionId, seg_id: u64) -> PathBuf {
1607 segments_dir(dir, cid).join(format!("seg-{seg_id:010}.dir"))
1608 }
1609
1610 #[test]
1611 fn upsert_get_delete_in_memory() {
1612 let tmp = tempfile::tempdir().unwrap();
1613 let mut s = open(tmp.path());
1614 let c = s.create_collection("c", desc()).unwrap();
1615 s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
1616 let got = s.get(c, "a").unwrap().unwrap();
1617 assert_eq!(got.vector, vec![1.0, 2.0, 3.0, 4.0]);
1618 assert_eq!(got.payload, b"{}");
1619 assert!(s.delete(c, "a").unwrap());
1620 assert!(s.get(c, "a").unwrap().is_none());
1621 assert!(!s.delete(c, "a").unwrap());
1622 }
1623
1624 #[test]
1625 fn dim_mismatch_is_rejected() {
1626 let tmp = tempfile::tempdir().unwrap();
1627 let mut s = open(tmp.path());
1628 let c = s.create_collection("c", desc()).unwrap();
1629 assert!(matches!(
1630 s.upsert(c, "a", &[1.0, 2.0], b"{}"),
1631 Err(CoreError::InvalidArgument(_))
1632 ));
1633 }
1634
1635 #[test]
1636 fn upsert_batch_commits_all_on_sync() {
1637 let tmp = tempfile::tempdir().unwrap();
1638 {
1639 let mut s = open(tmp.path());
1640 let c = s.create_collection("c", desc()).unwrap();
1641 let vecs: Vec<([f32; 4], String)> = (0..8u32)
1642 .map(|i| ([i as f32; 4], format!("k{i}")))
1643 .collect();
1644 let payload = b"{}";
1645 let records: Vec<(&str, &[f32], &[u8])> = vecs
1646 .iter()
1647 .map(|(v, id)| (id.as_str(), v.as_slice(), payload.as_slice()))
1648 .collect();
1649 let n = s.upsert_batch(c, &records).unwrap();
1650 assert_eq!(n, 8);
1651 for (_, id) in &vecs {
1653 assert!(s.get(c, id).unwrap().is_some(), "missing {id}");
1654 }
1655 }
1656 let s = open(tmp.path());
1658 let c = s.collection_id("c").unwrap();
1659 assert_eq!(s.len(c).unwrap(), 8);
1660 for i in 0..8u32 {
1661 let got = s.get(c, &format!("k{i}")).unwrap().unwrap();
1662 assert_eq!(got.vector, vec![i as f32; 4]);
1663 }
1664 }
1665
1666 #[test]
1667 fn upsert_batch_dim_mismatch_writes_nothing() {
1668 let tmp = tempfile::tempdir().unwrap();
1669 let mut s = open(tmp.path());
1670 let c = s.create_collection("c", desc()).unwrap();
1671 let bad: &[(&str, &[f32], &[u8])] = &[
1673 ("a", &[1.0, 2.0, 3.0, 4.0], b"{}"),
1674 ("b", &[1.0, 2.0], b"{}"), ];
1676 assert!(matches!(
1677 s.upsert_batch(c, bad),
1678 Err(CoreError::InvalidArgument(_))
1679 ));
1680 assert!(s.get(c, "a").unwrap().is_none());
1682 }
1683
1684 #[test]
1685 fn duplicate_collection_is_rejected() {
1686 let tmp = tempfile::tempdir().unwrap();
1687 let mut s = open(tmp.path());
1688 s.create_collection("c", desc()).unwrap();
1689 assert!(matches!(
1690 s.create_collection("c", desc()),
1691 Err(CoreError::AlreadyExists(_))
1692 ));
1693 }
1694
1695 #[test]
1696 fn recovers_without_checkpoint_via_wal_replay() {
1697 let tmp = tempfile::tempdir().unwrap();
1698 {
1699 let mut s = open(tmp.path());
1700 let c = s.create_collection("c", desc()).unwrap();
1701 for i in 0..10u32 {
1702 let v = [i as f32; 4];
1703 s.upsert(c, &format!("k{i}"), &v, b"{}").unwrap();
1704 }
1705 }
1706 let s = open(tmp.path());
1707 let c = s.collection_id("c").unwrap();
1708 assert_eq!(s.len(c).unwrap(), 10);
1709 let got = s.get(c, "k7").unwrap().unwrap();
1710 assert_eq!(got.vector, vec![7.0; 4]);
1711 }
1712
1713 #[test]
1714 fn recovers_across_checkpoint_and_wal_tail() {
1715 let tmp = tempfile::tempdir().unwrap();
1716 {
1717 let mut s = open(tmp.path());
1718 let c = s.create_collection("c", desc()).unwrap();
1719 for i in 0..5u32 {
1720 s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"{}")
1721 .unwrap();
1722 }
1723 s.checkpoint().unwrap();
1724 for i in 5..8u32 {
1726 s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"{}")
1727 .unwrap();
1728 }
1729 s.delete(c, "k0").unwrap();
1730 }
1731 let s = open(tmp.path());
1732 let c = s.collection_id("c").unwrap();
1733 assert_eq!(s.len(c).unwrap(), 7); assert!(s.get(c, "k0").unwrap().is_none());
1735 assert_eq!(s.get(c, "k6").unwrap().unwrap().vector, vec![6.0; 4]);
1736 }
1737
1738 #[test]
1739 fn open_with_keyring_round_trips_through_checkpoint() {
1740 let tmp = tempfile::tempdir().unwrap();
1741 {
1742 let mut s =
1743 Store::open_with_keyring(tmp.path(), Box::new(SingleCodecKeyRing::plaintext()))
1744 .unwrap();
1745 let c = s.create_collection("c", desc()).unwrap();
1746 s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
1747 s.checkpoint().unwrap();
1748 s.upsert(c, "b", &[5.0; 4], b"{}").unwrap();
1749 }
1750 let s = Store::open_with_keyring(tmp.path(), Box::new(SingleCodecKeyRing::plaintext()))
1753 .unwrap();
1754 let c = s.collection_id("c").unwrap();
1755 assert_eq!(s.len(c).unwrap(), 2);
1756 assert_eq!(
1757 s.get(c, "a").unwrap().unwrap().vector,
1758 vec![1.0, 2.0, 3.0, 4.0]
1759 );
1760 assert_eq!(s.get(c, "b").unwrap().unwrap().vector, vec![5.0; 4]);
1761 }
1762
1763 #[test]
1764 fn delete_survives_checkpoint() {
1765 let tmp = tempfile::tempdir().unwrap();
1766 {
1767 let mut s = open(tmp.path());
1768 let c = s.create_collection("c", desc()).unwrap();
1769 s.upsert(c, "a", &[1.0; 4], b"{}").unwrap();
1770 s.upsert(c, "b", &[2.0; 4], b"{}").unwrap();
1771 s.checkpoint().unwrap();
1772 s.delete(c, "a").unwrap();
1773 s.checkpoint().unwrap(); }
1775 let s = open(tmp.path());
1776 let c = s.collection_id("c").unwrap();
1777 assert!(s.get(c, "a").unwrap().is_none());
1778 assert!(s.get(c, "b").unwrap().is_some());
1779 assert_eq!(s.len(c).unwrap(), 1);
1780 }
1781
1782 #[test]
1783 fn reopen_is_idempotent() {
1784 let tmp = tempfile::tempdir().unwrap();
1785 {
1786 let mut s = open(tmp.path());
1787 let c = s.create_collection("c", desc()).unwrap();
1788 s.upsert(c, "a", &[1.0; 4], b"{}").unwrap();
1789 s.checkpoint().unwrap();
1790 s.upsert(c, "b", &[2.0; 4], b"{}").unwrap();
1791 }
1792 let snapshot = |dir: &Path| {
1793 let s = open(dir);
1794 let c = s.collection_id("c").unwrap();
1795 s.scan(c).unwrap()
1796 };
1797 assert_eq!(snapshot(tmp.path()), snapshot(tmp.path()));
1798 }
1799
1800 #[test]
1801 fn update_then_checkpoint_keeps_latest_value() {
1802 let tmp = tempfile::tempdir().unwrap();
1803 {
1804 let mut s = open(tmp.path());
1805 let c = s.create_collection("c", desc()).unwrap();
1806 s.upsert(c, "a", &[1.0; 4], b"v1").unwrap();
1807 s.checkpoint().unwrap();
1808 s.upsert(c, "a", &[9.0; 4], b"v2").unwrap(); s.checkpoint().unwrap();
1810 }
1811 let s = open(tmp.path());
1812 let c = s.collection_id("c").unwrap();
1813 let got = s.get(c, "a").unwrap().unwrap();
1814 assert_eq!(got.vector, vec![9.0; 4]);
1815 assert_eq!(got.payload, b"v2");
1816 assert_eq!(s.len(c).unwrap(), 1);
1817 }
1818
1819 #[test]
1820 fn update_within_one_window_seals_latest() {
1821 let tmp = tempfile::tempdir().unwrap();
1824 {
1825 let mut s = open(tmp.path());
1826 let c = s.create_collection("c", desc()).unwrap();
1827 s.upsert(c, "a", &[1.0; 4], b"v1").unwrap();
1828 s.upsert(c, "a", &[2.0; 4], b"v2").unwrap();
1829 s.upsert(c, "a", &[3.0; 4], b"v3").unwrap();
1830 s.checkpoint().unwrap();
1831 }
1832 let s = open(tmp.path());
1833 let c = s.collection_id("c").unwrap();
1834 assert_eq!(s.len(c).unwrap(), 1);
1835 let got = s.get(c, "a").unwrap().unwrap();
1836 assert_eq!(got.vector, vec![3.0; 4]);
1837 assert_eq!(got.payload, b"v3");
1838 }
1839
1840 #[test]
1841 fn dropped_collection_is_gone_after_reopen() {
1842 let tmp = tempfile::tempdir().unwrap();
1843 {
1844 let mut s = open(tmp.path());
1845 let c = s.create_collection("c", desc()).unwrap();
1846 s.upsert(c, "a", &[1.0; 4], b"{}").unwrap();
1847 s.checkpoint().unwrap();
1848 assert!(s.drop_collection("c").unwrap());
1849 s.checkpoint().unwrap();
1850 }
1851 let s = open(tmp.path());
1852 assert!(s.collection_id("c").is_none());
1853 assert!(s.collection_names().is_empty());
1854 }
1855
1856 #[test]
1857 fn orphan_segment_is_garbage_collected() {
1858 let tmp = tempfile::tempdir().unwrap();
1859 let cid;
1860 {
1861 let mut s = open(tmp.path());
1862 let c = s.create_collection("c", desc()).unwrap();
1863 cid = c;
1864 s.upsert(c, "a", &[1.0; 4], b"{}").unwrap();
1865 s.checkpoint().unwrap();
1866 }
1867 let stray = segments_dir(tmp.path(), cid).join("seg-0000009999.vec");
1869 fs::write(&stray, b"junk").unwrap();
1870 assert!(stray.exists());
1871 let _s = open(tmp.path());
1872 assert!(!stray.exists(), "orphan segment should be GC'd on open");
1873 }
1874
1875 #[test]
1876 fn corrupt_segment_is_detected_not_served() {
1877 let tmp = tempfile::tempdir().unwrap();
1878 let cid;
1879 {
1880 let mut s = open(tmp.path());
1881 let c = s.create_collection("c", desc()).unwrap();
1882 cid = c;
1883 s.upsert(c, "a", &[1.0; 4], b"{}").unwrap();
1884 s.checkpoint().unwrap();
1885 }
1886 let path = seg_dir_file(tmp.path(), cid, 0);
1891 let mut bytes = fs::read(&path).unwrap();
1892 bytes[33] ^= 0xFF;
1893 fs::write(&path, &bytes).unwrap();
1894 assert!(matches!(
1895 Store::open(tmp.path()),
1896 Err(CoreError::PageCorrupt { .. })
1897 ));
1898 }
1899
1900 #[test]
1901 fn torn_wal_tail_drops_only_unacked_record() {
1902 let tmp = tempfile::tempdir().unwrap();
1903 let wal_path;
1904 {
1905 let mut s = open(tmp.path());
1906 let c = s.create_collection("c", desc()).unwrap();
1907 for i in 0..3u32 {
1908 s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"{}")
1909 .unwrap();
1910 }
1911 wal_path = wal_file_path(&tmp.path().join("wal"), s.wal_seq);
1912 }
1913 {
1915 use std::io::Write as _;
1916 let mut f = fs::OpenOptions::new().append(true).open(&wal_path).unwrap();
1917 f.write_all(&[0xAA, 0xBB, 0xCC]).unwrap();
1918 f.sync_data().unwrap();
1919 }
1920 let s = open(tmp.path());
1921 let c = s.collection_id("c").unwrap();
1922 assert_eq!(s.len(c).unwrap(), 3); }
1924
1925 #[test]
1926 fn reads_served_from_disk_after_checkpoint() {
1927 let tmp = tempfile::tempdir().unwrap();
1930 let mut s = open(tmp.path());
1931 let c = s.create_collection("c", desc()).unwrap();
1932 s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], br#"{"k":1}"#)
1933 .unwrap();
1934 s.checkpoint().unwrap();
1935 let got = s.get(c, "a").unwrap().unwrap();
1936 assert_eq!(got.vector, vec![1.0, 2.0, 3.0, 4.0]);
1937 assert_eq!(got.payload, br#"{"k":1}"#);
1938 }
1939
1940 #[test]
1941 fn high_dim_vectors_straddle_pages() {
1942 let tmp = tempfile::tempdir().unwrap();
1945 let mut s = open(tmp.path());
1946 let dim = 1000usize; let c = s
1948 .create_collection(
1949 "c",
1950 Descriptor::new(dim as u32, Dtype::F32, DistanceMetric::L2),
1951 )
1952 .unwrap();
1953 for i in 0..20u32 {
1954 let v: Vec<f32> = (0..dim).map(|j| (i as f32) * 1000.0 + j as f32).collect();
1955 s.upsert(c, &format!("k{i}"), &v, b"{}").unwrap();
1956 }
1957 s.checkpoint().unwrap();
1958 let s = open(tmp.path());
1959 let c = s.collection_id("c").unwrap();
1960 for i in 0..20u32 {
1961 let got = s.get(c, &format!("k{i}")).unwrap().unwrap();
1962 let want: Vec<f32> = (0..dim).map(|j| (i as f32) * 1000.0 + j as f32).collect();
1963 assert_eq!(
1964 got.vector, want,
1965 "vector k{i} mismatch after straddling read"
1966 );
1967 }
1968 }
1969
1970 #[test]
1971 fn delete_persists_via_del_bitmap_across_reopen() {
1972 let tmp = tempfile::tempdir().unwrap();
1976 let cid;
1977 {
1978 let mut s = open(tmp.path());
1979 let c = s.create_collection("c", desc()).unwrap();
1980 cid = c;
1981 for i in 0..5u32 {
1982 s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"{}")
1983 .unwrap();
1984 }
1985 s.checkpoint().unwrap();
1986 s.delete(c, "k2").unwrap();
1987 s.checkpoint().unwrap();
1988 assert_eq!(
1989 s.collections[&c].sealed.len(),
1990 1,
1991 "no new segment for a delete-only window"
1992 );
1993 }
1994 assert!(
1996 segments_dir(tmp.path(), cid)
1997 .join("seg-0000000000.del")
1998 .exists(),
1999 ".del must be persisted for the deleted row"
2000 );
2001 let s = open(tmp.path());
2002 let c = s.collection_id("c").unwrap();
2003 assert!(s.get(c, "k2").unwrap().is_none());
2004 assert_eq!(s.len(c).unwrap(), 4);
2005 for i in [0u32, 1, 3, 4] {
2006 assert!(s.get(c, &format!("k{i}")).unwrap().is_some());
2007 }
2008 }
2009
2010 #[test]
2011 fn shadowed_row_is_tombstoned_and_latest_wins() {
2012 let tmp = tempfile::tempdir().unwrap();
2013 {
2014 let mut s = open(tmp.path());
2015 let c = s.create_collection("c", desc()).unwrap();
2016 for i in 0..5u32 {
2017 s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"v1")
2018 .unwrap();
2019 }
2020 s.checkpoint().unwrap(); s.upsert(c, "k2", &[99.0; 4], b"v2").unwrap();
2022 s.checkpoint().unwrap(); }
2024 let s = open(tmp.path());
2025 let c = s.collection_id("c").unwrap();
2026 assert_eq!(s.len(c).unwrap(), 5); let got = s.get(c, "k2").unwrap().unwrap();
2028 assert_eq!(got.vector, vec![99.0; 4]);
2029 assert_eq!(got.payload, b"v2");
2030 }
2031
2032 #[test]
2033 fn compaction_merges_segments_reclaims_and_keeps_active_rows() {
2034 let tmp = tempfile::tempdir().unwrap();
2035 let cid;
2036 {
2037 let mut s = open(tmp.path());
2038 let c = s.create_collection("c", desc()).unwrap();
2039 cid = c;
2040 for i in 0..6u32 {
2041 s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"{}")
2042 .unwrap();
2043 }
2044 s.checkpoint().unwrap(); for i in 6..12u32 {
2046 s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"{}")
2047 .unwrap();
2048 }
2049 s.checkpoint().unwrap(); s.delete(c, "k0").unwrap();
2051 s.delete(c, "k6").unwrap();
2052 s.checkpoint().unwrap(); assert_eq!(s.collections[&c].sealed.len(), 2);
2054
2055 s.upsert(c, "fresh", &[7.0; 4], b"new").unwrap();
2057 s.compact().unwrap();
2058 assert_eq!(s.collections[&c].sealed.len(), 1, "segments merged to one");
2059 assert!(
2060 !segments_dir(tmp.path(), cid)
2061 .join("seg-0000000000.dir")
2062 .exists(),
2063 "old segment files reclaimed"
2064 );
2065 assert_eq!(s.len(c).unwrap(), 11); assert!(s.get(c, "k0").unwrap().is_none());
2067 assert!(s.get(c, "k6").unwrap().is_none());
2068 assert_eq!(s.get(c, "k5").unwrap().unwrap().vector, vec![5.0; 4]);
2069 assert_eq!(s.get(c, "fresh").unwrap().unwrap().payload, b"new");
2070 }
2071 let s = open(tmp.path());
2073 let c = s.collection_id("c").unwrap();
2074 assert_eq!(s.collections[&c].sealed.len(), 1);
2075 assert_eq!(s.len(c).unwrap(), 11);
2076 assert!(s.get(c, "k0").unwrap().is_none());
2077 assert_eq!(s.get(c, "fresh").unwrap().unwrap().vector, vec![7.0; 4]);
2078 assert_eq!(s.get(c, "k11").unwrap().unwrap().vector, vec![11.0; 4]);
2079 }
2080
2081 #[test]
2082 fn auto_compaction_merges_many_segments() {
2083 let tmp = tempfile::tempdir().unwrap();
2084 let mut s = open(tmp.path());
2085 let c = s.create_collection("c", desc()).unwrap();
2086 for ck in 0..8u32 {
2089 for i in 0..3u32 {
2090 let n = ck * 3 + i;
2091 s.upsert(c, &format!("k{n}"), &[n as f32; 4], b"{}")
2092 .unwrap();
2093 }
2094 s.checkpoint().unwrap();
2095 }
2096 assert!(
2097 s.collections[&c].sealed.len() < COMPACT_MIN_SEGMENTS,
2098 "auto-compaction should have merged the segments"
2099 );
2100 assert_eq!(s.len(c).unwrap(), 24);
2101 assert_eq!(s.get(c, "k0").unwrap().unwrap().vector, vec![0.0; 4]);
2102 assert_eq!(s.get(c, "k23").unwrap().unwrap().vector, vec![23.0; 4]);
2103 }
2104
2105 #[test]
2106 fn matching_ids_spans_secondary_index_and_active_buffer() {
2107 use crate::descriptor::FilterableField;
2108 use crate::sec::SecValue;
2109
2110 let tmp = tempfile::tempdir().unwrap();
2111 let mut s = open(tmp.path());
2112 let descriptor = Descriptor::new(4, Dtype::F32, DistanceMetric::L2).with_filterable(vec![
2113 FilterableField::keyword("city"),
2114 FilterableField::numeric("age"),
2115 ]);
2116 let c = s.create_collection("c", descriptor).unwrap();
2117 s.upsert(c, "a", &[0.0; 4], br#"{"city":"paris","age":30}"#)
2118 .unwrap();
2119 s.upsert(c, "b", &[0.0; 4], br#"{"city":"lyon","age":25}"#)
2120 .unwrap();
2121 s.upsert(c, "d", &[0.0; 4], br#"{"city":"paris","age":40}"#)
2122 .unwrap();
2123 s.checkpoint().unwrap();
2124 s.upsert(c, "e", &[0.0; 4], br#"{"city":"paris","age":22}"#)
2126 .unwrap();
2127
2128 let paris = || SecPredicate::Eq {
2129 field: "city".into(),
2130 value: SecValue::Keyword("paris".into()),
2131 };
2132 assert_eq!(s.matching_ids(c, &paris()).unwrap(), ["a", "d", "e"]);
2133
2134 assert_eq!(
2136 s.matching_ids(
2137 c,
2138 &SecPredicate::Range {
2139 field: "age".into(),
2140 lo: Some(SecValue::Numeric(25.0)),
2141 hi: Some(SecValue::Numeric(35.0)),
2142 lo_inclusive: true,
2143 hi_inclusive: true,
2144 }
2145 )
2146 .unwrap(),
2147 ["a", "b"]
2148 );
2149
2150 s.delete(c, "a").unwrap();
2152 assert_eq!(s.matching_ids(c, &paris()).unwrap(), ["d", "e"]);
2153
2154 assert!(matches!(
2156 s.matching_ids(
2157 c,
2158 &SecPredicate::Eq {
2159 field: "country".into(),
2160 value: SecValue::Keyword("fr".into()),
2161 }
2162 ),
2163 Err(CoreError::InvalidArgument(_))
2164 ));
2165
2166 s.checkpoint().unwrap();
2168 let s = open(tmp.path());
2169 let c = s.collection_id("c").unwrap();
2170 assert_eq!(s.matching_ids(c, &paris()).unwrap(), ["d", "e"]);
2171 }
2172
2173 fn index_snapshot_files(dir: &Path, cid: CollectionId) -> Vec<String> {
2177 let idx = collection_dir(dir, cid).join("index");
2178 let mut names: Vec<String> = fs::read_dir(&idx)
2179 .map(|rd| {
2180 rd.filter_map(std::result::Result::ok)
2181 .filter_map(|e| e.file_name().to_str().map(str::to_owned))
2182 .filter(|n| n.starts_with("idx-"))
2183 .collect()
2184 })
2185 .unwrap_or_default();
2186 names.sort();
2187 names
2188 }
2189
2190 #[test]
2191 fn index_snapshot_round_trips_through_checkpoint_and_reopen() {
2192 let tmp = tempfile::tempdir().unwrap();
2193 let blob = b"opaque-index-bytes".to_vec();
2194 let cid = {
2195 let mut s = open(tmp.path());
2196 let c = s.create_collection("c", desc()).unwrap();
2197 s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
2198 s.checkpoint_with_index_snapshots(&HashMap::from([(c, blob.clone())]))
2199 .unwrap();
2200 assert_eq!(s.read_index_snapshot(c).unwrap(), Some(blob.clone()));
2202 assert_eq!(index_snapshot_files(tmp.path(), c).len(), 1);
2203 c
2204 };
2205 let s = open(tmp.path());
2207 assert_eq!(s.read_index_snapshot(cid).unwrap(), Some(blob));
2208 }
2209
2210 #[test]
2211 fn checkpoint_without_a_snapshot_clears_and_reclaims_it() {
2212 let tmp = tempfile::tempdir().unwrap();
2213 let mut s = open(tmp.path());
2214 let c = s.create_collection("c", desc()).unwrap();
2215 s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
2216 s.checkpoint_with_index_snapshots(&HashMap::from([(c, b"blob".to_vec())]))
2217 .unwrap();
2218 assert!(s.read_index_snapshot(c).unwrap().is_some());
2219
2220 s.upsert(c, "b", &[5.0, 6.0, 7.0, 8.0], b"{}").unwrap();
2222 s.checkpoint().unwrap();
2223 assert_eq!(s.read_index_snapshot(c).unwrap(), None);
2224 assert!(index_snapshot_files(tmp.path(), c).is_empty());
2225
2226 let s = open(tmp.path());
2227 assert_eq!(s.read_index_snapshot(c).unwrap(), None);
2228 }
2229
2230 #[test]
2231 fn a_new_snapshot_supersedes_and_reclaims_the_old_one() {
2232 let tmp = tempfile::tempdir().unwrap();
2233 let mut s = open(tmp.path());
2234 let c = s.create_collection("c", desc()).unwrap();
2235 s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
2236 s.checkpoint_with_index_snapshots(&HashMap::from([(c, b"first".to_vec())]))
2237 .unwrap();
2238 s.upsert(c, "b", &[5.0, 6.0, 7.0, 8.0], b"{}").unwrap();
2239 s.checkpoint_with_index_snapshots(&HashMap::from([(c, b"second".to_vec())]))
2240 .unwrap();
2241
2242 assert_eq!(s.read_index_snapshot(c).unwrap(), Some(b"second".to_vec()));
2243 assert_eq!(index_snapshot_files(tmp.path(), c).len(), 1);
2244 }
2245
2246 #[test]
2247 fn compaction_preserves_the_index_snapshot() {
2248 let tmp = tempfile::tempdir().unwrap();
2249 let mut s = open(tmp.path());
2250 let c = s.create_collection("c", desc()).unwrap();
2251 s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
2252 s.checkpoint_with_index_snapshots(&HashMap::from([(c, b"keep".to_vec())]))
2253 .unwrap();
2254 s.upsert(c, "b", &[5.0, 6.0, 7.0, 8.0], b"{}").unwrap();
2256 s.delete(c, "a").unwrap();
2257 s.checkpoint_with_index_snapshots(&HashMap::from([(c, b"keep".to_vec())]))
2258 .unwrap();
2259 s.compact().unwrap();
2260 assert_eq!(s.read_index_snapshot(c).unwrap(), Some(b"keep".to_vec()));
2261
2262 let s = open(tmp.path());
2263 assert_eq!(s.read_index_snapshot(c).unwrap(), Some(b"keep".to_vec()));
2264 }
2265
2266 #[test]
2267 fn orphan_index_snapshot_is_reclaimed_on_open() {
2268 let tmp = tempfile::tempdir().unwrap();
2269 let cid = {
2270 let mut s = open(tmp.path());
2271 let c = s.create_collection("c", desc()).unwrap();
2272 s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
2273 s.checkpoint_with_index_snapshots(&HashMap::from([(c, b"live".to_vec())]))
2274 .unwrap();
2275 let stray = s.index_dir(c).join("idx-9999999999");
2278 fs::write(&stray, b"orphan").unwrap();
2279 c
2280 };
2281 let s = open(tmp.path());
2282 assert!(!s.index_dir(cid).join("idx-9999999999").exists());
2284 assert_eq!(s.read_index_snapshot(cid).unwrap(), Some(b"live".to_vec()));
2285 }
2286}