use std::collections::{BTreeMap, HashMap, HashSet};
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use roaring::RoaringBitmap;
use crate::descriptor::Descriptor;
use crate::error::{CoreError, Result};
use crate::ids::{CollectionId, Lsn};
use crate::keyring::{KeyRing, SingleCodecKeyRing};
use crate::manifest::{
self, CollectionEntry, IndexSnapshotRef, MANIFEST_FORMAT_VERSION, Manifest, SegmentRef,
};
use crate::page::{PageCodec, PageType};
use crate::paged::{fsync_dir, read_paged, write_paged};
use crate::sec::{self, SecPredicate};
use crate::segment::{self, SealRow, SealedSegment};
use crate::wal::{self, WalEntry, WalOp, WalWriter};
const COMPACT_MIN_SEGMENTS: usize = 8;
#[derive(Debug, Clone, PartialEq)]
pub struct Record {
pub vector: Vec<f32>,
pub payload: Vec<u8>,
}
#[derive(Debug, Default)]
pub struct RecoveryTail {
pub upserts: Vec<(String, Record)>,
pub deleted: Vec<String>,
}
#[derive(Debug, Clone, Copy)]
enum Loc {
Active(u32),
Sealed { seg: u32, row: u32 },
}
#[derive(Debug, Clone)]
struct ActiveRow {
vector: Vec<u8>,
payload: Vec<u8>,
}
struct CollectionState {
id: CollectionId,
name: String,
descriptor: Descriptor,
codec: Box<dyn PageCodec>,
stride: usize,
primary: BTreeMap<String, Loc>,
sealed: Vec<SealedSegment>,
segments_meta: Vec<SegmentRef>,
active: Vec<ActiveRow>,
active_index: BTreeMap<String, u32>,
dead_this_window: BTreeMap<u32, RoaringBitmap>,
index_snapshot: Option<IndexSnapshotRef>,
}
impl CollectionState {
fn new(
id: CollectionId,
name: String,
descriptor: Descriptor,
codec: Box<dyn PageCodec>,
) -> Self {
let stride = descriptor.stride();
Self {
id,
name,
descriptor,
codec,
stride,
primary: BTreeMap::new(),
sealed: Vec::new(),
segments_meta: Vec::new(),
active: Vec::new(),
active_index: BTreeMap::new(),
dead_this_window: BTreeMap::new(),
index_snapshot: None,
}
}
fn has_pending(&self) -> bool {
!self.active_index.is_empty() || !self.dead_this_window.is_empty()
}
fn apply_upsert(&mut self, external_id: &str, vector: Vec<u8>, payload: Vec<u8>) {
if let Some(Loc::Sealed { seg, row }) = self.primary.get(external_id).copied() {
self.dead_this_window.entry(seg).or_default().insert(row);
}
let row = self.active.len() as u32;
self.active.push(ActiveRow { vector, payload });
self.active_index.insert(external_id.to_owned(), row);
self.primary
.insert(external_id.to_owned(), Loc::Active(row));
}
fn apply_delete(&mut self, external_id: &str) -> bool {
match self.primary.remove(external_id) {
Some(Loc::Sealed { seg, row }) => {
self.dead_this_window.entry(seg).or_default().insert(row);
self.active_index.remove(external_id);
true
}
Some(Loc::Active(_)) => {
self.active_index.remove(external_id);
true
}
None => false,
}
}
}
struct PendingSegment {
seg_ref: SegmentRef,
sealed: SealedSegment,
}
pub type CommitObserver = Arc<dyn Fn(&WalEntry) + Send + Sync>;
pub struct Store {
dir: PathBuf,
keyring: Box<dyn KeyRing>,
collections: HashMap<CollectionId, CollectionState>,
name_index: HashMap<String, CollectionId>,
next_lsn: Lsn,
next_collection_id: u64,
next_segment_id: u64,
manifest_version: u64,
last_checkpointed_lsn: Lsn,
wal: WalWriter,
wal_seq: u64,
commit_observer: Option<CommitObserver>,
}
impl Store {
pub fn open(dir: &Path) -> Result<Self> {
Self::open_with_keyring(dir, Box::new(SingleCodecKeyRing::plaintext()))
}
pub fn open_with_codec(dir: &Path, codec: Box<dyn PageCodec>) -> Result<Self> {
Self::open_with_keyring(dir, Box::new(SingleCodecKeyRing::new(codec)))
}
pub fn open_with_keyring(dir: &Path, keyring: Box<dyn KeyRing>) -> Result<Self> {
fs::create_dir_all(dir).map_err(|e| CoreError::io(dir, e))?;
let wal_dir = dir.join("wal");
fs::create_dir_all(&wal_dir).map_err(|e| CoreError::io(&wal_dir, e))?;
fsync_dir(dir)?;
fsync_dir(&wal_dir)?;
let mfst = manifest::read_current(dir, keyring.catalog_codec())?.unwrap_or_default();
let mut collections: HashMap<CollectionId, CollectionState> = HashMap::new();
let mut name_index: HashMap<String, CollectionId> = HashMap::new();
for entry in &mfst.collections {
let descriptor = Descriptor::decode(&entry.descriptor)?;
let codec = keyring.collection_codec(entry.id)?;
let mut state = CollectionState::new(entry.id, entry.name.clone(), descriptor, codec);
state.segments_meta = entry.segments.clone();
state.index_snapshot = entry.index_snapshot.clone();
let seg_dir = segments_dir(dir, entry.id);
for seg in &entry.segments {
let sealed = segment::open_segment(&seg_dir, seg.id, state.codec.as_ref())?;
let seg_idx = state.sealed.len() as u32;
for (row, ext_id) in sealed.row_ids().iter().enumerate() {
let row = row as u32;
if !sealed.is_dead(row) {
state
.primary
.insert(ext_id.clone(), Loc::Sealed { seg: seg_idx, row });
}
}
state.sealed.push(sealed);
}
name_index.insert(state.name.clone(), state.id);
collections.insert(state.id, state);
}
let floor = mfst.last_checkpointed_lsn;
let mut max_lsn = floor;
let wal_files = list_wal_files(&wal_dir)?;
let mut max_seq = 0u64;
let mut keep_seqs: HashSet<u64> = HashSet::new();
for (seq, path) in &wal_files {
max_seq = (*seq).max(max_seq);
let replay = wal::read_all(path, keyring.catalog_codec())?;
let mut had_live = false;
for entry in replay.entries {
if entry.lsn.value() <= floor.value() {
continue; }
had_live = true;
if entry.lsn > max_lsn {
max_lsn = entry.lsn;
}
apply_wal_entry(&mut collections, &mut name_index, &entry, keyring.as_ref())?;
}
if had_live {
keep_seqs.insert(*seq);
}
}
let next_lsn = max_lsn.next();
gc_orphan_segments(dir, &mfst, keyring.as_ref())?;
gc_orphan_index_snapshots(dir, &mfst)?;
let wal_seq = max_seq + 1;
let wal = WalWriter::create(&wal_file_path(&wal_dir, wal_seq), next_lsn)?;
fsync_dir(&wal_dir)?;
for (seq, path) in &wal_files {
if !keep_seqs.contains(seq) {
remove_file_if_present(path)?;
}
}
fsync_dir(&wal_dir)?;
Ok(Self {
dir: dir.to_path_buf(),
keyring,
collections,
name_index,
next_lsn,
next_collection_id: mfst.next_collection_id,
next_segment_id: mfst.next_segment_id,
manifest_version: mfst.version,
last_checkpointed_lsn: floor,
wal,
wal_seq,
commit_observer: None,
})
}
pub fn set_commit_observer(&mut self, observer: CommitObserver) {
self.commit_observer = Some(observer);
}
fn publish(&self, entry: &WalEntry) {
if let Some(observer) = &self.commit_observer {
observer(entry);
}
}
pub fn replication_snapshot(&self) -> Result<Vec<WalOp>> {
let mut ops = Vec::new();
for (&id, state) in &self.collections {
ops.push(WalOp::CreateCollection {
collection_id: id,
name: state.name.clone(),
descriptor: postcard::to_allocvec(&state.descriptor)?,
});
for (external_id, record) in self.scan(id)? {
ops.push(WalOp::Upsert {
collection_id: id,
external_id,
vector: f32_to_le_bytes(&record.vector),
payload: record.payload,
});
}
}
Ok(ops)
}
pub fn apply_replicated(&mut self, op: WalOp) -> Result<()> {
if let WalOp::Checkpoint { .. } = op {
return Ok(());
}
if let WalOp::CreateCollection { collection_id, .. } = &op {
self.keyring.provision_collection(*collection_id)?;
self.next_collection_id = self.next_collection_id.max(collection_id.0 + 1);
}
let lsn = self.next_lsn;
let entry = WalEntry { lsn, op };
self.wal.append_sync(self.keyring.catalog_codec(), &entry)?;
self.next_lsn = lsn.next();
apply_wal_entry(
&mut self.collections,
&mut self.name_index,
&entry,
self.keyring.as_ref(),
)?;
self.publish(&entry);
Ok(())
}
pub fn create_collection(
&mut self,
name: &str,
descriptor: Descriptor,
) -> Result<CollectionId> {
if self.name_index.contains_key(name) {
return Err(CoreError::AlreadyExists(format!("collection {name}")));
}
if descriptor.dim == 0 {
return Err(CoreError::InvalidArgument(
"dim must be non-zero".to_owned(),
));
}
let id = CollectionId(self.next_collection_id);
self.keyring.provision_collection(id)?;
let descriptor_bytes = postcard::to_allocvec(&descriptor)?;
let lsn = self.next_lsn;
let entry = WalEntry {
lsn,
op: WalOp::CreateCollection {
collection_id: id,
name: name.to_owned(),
descriptor: descriptor_bytes,
},
};
self.wal.append_sync(self.keyring.catalog_codec(), &entry)?;
self.next_lsn = lsn.next();
self.publish(&entry);
self.next_collection_id += 1;
let codec = self.keyring.collection_codec(id)?;
self.collections.insert(
id,
CollectionState::new(id, name.to_owned(), descriptor, codec),
);
self.name_index.insert(name.to_owned(), id);
Ok(id)
}
pub fn drop_collection(&mut self, name: &str) -> Result<bool> {
let Some(&id) = self.name_index.get(name) else {
return Ok(false);
};
let lsn = self.next_lsn;
let entry = WalEntry {
lsn,
op: WalOp::DropCollection { collection_id: id },
};
self.wal.append_sync(self.keyring.catalog_codec(), &entry)?;
self.next_lsn = lsn.next();
self.publish(&entry);
self.collections.remove(&id);
self.name_index.remove(name);
Ok(true)
}
pub fn shred_collection(&mut self, name: &str) -> Result<bool> {
let Some(id) = self.collection_id(name) else {
return Ok(false);
};
self.drop_collection(name)?;
self.checkpoint()?;
self.keyring.shred_collection(id)?;
Ok(true)
}
pub fn upsert(
&mut self,
collection: CollectionId,
external_id: &str,
vector: &[f32],
payload: &[u8],
) -> Result<Lsn> {
let dim = self
.collections
.get(&collection)
.ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?
.descriptor
.dim as usize;
if vector.len() != dim {
return Err(CoreError::InvalidArgument(format!(
"vector has {} dims, collection expects {dim}",
vector.len()
)));
}
let vector_bytes = f32_to_le_bytes(vector);
let lsn = self.next_lsn;
let entry = WalEntry {
lsn,
op: WalOp::Upsert {
collection_id: collection,
external_id: external_id.to_owned(),
vector: vector_bytes.clone(),
payload: payload.to_vec(),
},
};
self.wal.append_sync(self.keyring.catalog_codec(), &entry)?;
self.next_lsn = lsn.next();
self.publish(&entry);
let state = self
.collections
.get_mut(&collection)
.ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
state.apply_upsert(external_id, vector_bytes, payload.to_vec());
Ok(lsn)
}
pub fn upsert_batch(
&mut self,
collection: CollectionId,
records: &[(&str, &[f32], &[u8])],
) -> Result<u64> {
if records.is_empty() {
return Ok(0);
}
let dim = self
.collections
.get(&collection)
.ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?
.descriptor
.dim as usize;
for (_, vector, _) in records {
if vector.len() != dim {
return Err(CoreError::InvalidArgument(format!(
"vector has {} dims, collection expects {dim}",
vector.len()
)));
}
}
let mut entries: Vec<WalEntry> = Vec::with_capacity(records.len());
for (ext_id, vector, payload) in records {
let lsn = self.next_lsn;
self.next_lsn = lsn.next();
entries.push(WalEntry {
lsn,
op: WalOp::Upsert {
collection_id: collection,
external_id: ext_id.to_string(),
vector: f32_to_le_bytes(vector),
payload: payload.to_vec(),
},
});
}
for entry in &entries {
self.wal.append(self.keyring.catalog_codec(), entry)?;
}
self.wal.sync()?;
for entry in &entries {
self.publish(entry);
if let WalOp::Upsert {
external_id,
vector,
payload,
..
} = &entry.op
{
let state = self
.collections
.get_mut(&collection)
.ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
state.apply_upsert(external_id, vector.clone(), payload.clone());
}
}
Ok(records.len() as u64)
}
pub fn delete(&mut self, collection: CollectionId, external_id: &str) -> Result<bool> {
let existed = self
.collections
.get(&collection)
.ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?
.primary
.contains_key(external_id);
if !existed {
return Ok(false);
}
let lsn = self.next_lsn;
let entry = WalEntry {
lsn,
op: WalOp::Delete {
collection_id: collection,
external_id: external_id.to_owned(),
},
};
self.wal.append_sync(self.keyring.catalog_codec(), &entry)?;
self.next_lsn = lsn.next();
self.publish(&entry);
let state = self
.collections
.get_mut(&collection)
.ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
state.apply_delete(external_id);
Ok(true)
}
pub fn prepare_create_collection(&self, name: &str, descriptor: &Descriptor) -> Result<WalOp> {
if self.name_index.contains_key(name) {
return Err(CoreError::AlreadyExists(format!("collection {name}")));
}
if descriptor.dim == 0 {
return Err(CoreError::InvalidArgument(
"dim must be non-zero".to_owned(),
));
}
Ok(WalOp::CreateCollection {
collection_id: CollectionId(self.next_collection_id),
name: name.to_owned(),
descriptor: postcard::to_allocvec(descriptor)?,
})
}
pub fn prepare_upsert(
&self,
collection: CollectionId,
external_id: &str,
vector: &[f32],
payload: &[u8],
) -> Result<WalOp> {
let dim = self
.collections
.get(&collection)
.ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?
.descriptor
.dim as usize;
if vector.len() != dim {
return Err(CoreError::InvalidArgument(format!(
"vector has {} dims, collection expects {dim}",
vector.len()
)));
}
Ok(WalOp::Upsert {
collection_id: collection,
external_id: external_id.to_owned(),
vector: f32_to_le_bytes(vector),
payload: payload.to_vec(),
})
}
pub fn prepare_delete(
&self,
collection: CollectionId,
external_id: &str,
) -> Result<Option<WalOp>> {
let existed = self
.collections
.get(&collection)
.ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?
.primary
.contains_key(external_id);
Ok(existed.then(|| WalOp::Delete {
collection_id: collection,
external_id: external_id.to_owned(),
}))
}
pub fn get(&self, collection: CollectionId, external_id: &str) -> Result<Option<Record>> {
let state = self
.collections
.get(&collection)
.ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
match state.primary.get(external_id).copied() {
Some(loc) => Ok(Some(self.record_at(state, loc)?)),
None => Ok(None),
}
}
pub fn scan(&self, collection: CollectionId) -> Result<Vec<(String, Record)>> {
let state = self
.collections
.get(&collection)
.ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
let mut out = Vec::with_capacity(state.primary.len());
for (id, &loc) in &state.primary {
out.push((id.clone(), self.record_at(state, loc)?));
}
Ok(out)
}
fn record_at(&self, state: &CollectionState, loc: Loc) -> Result<Record> {
match loc {
Loc::Active(r) => {
let row = state
.active
.get(r as usize)
.ok_or_else(|| CoreError::MalformedPage(format!("dangling active row {r}")))?;
Ok(Record {
vector: le_bytes_to_f32(&row.vector),
payload: row.payload.clone(),
})
}
Loc::Sealed { seg, row } => {
let segment = state.sealed.get(seg as usize).ok_or_else(|| {
CoreError::MalformedPage(format!("dangling segment index {seg}"))
})?;
let vector_bytes = segment.read_vector(state.codec.as_ref(), row, state.stride)?;
let payload = segment.read_payload(state.codec.as_ref(), row)?;
Ok(Record {
vector: le_bytes_to_f32(&vector_bytes),
payload,
})
}
}
}
#[must_use]
pub fn collection_id(&self, name: &str) -> Option<CollectionId> {
self.name_index.get(name).copied()
}
#[must_use]
pub fn descriptor(&self, collection: CollectionId) -> Option<&Descriptor> {
self.collections.get(&collection).map(|s| &s.descriptor)
}
pub fn collection_codec_clone(&self, collection: CollectionId) -> Result<Box<dyn PageCodec>> {
self.collections
.get(&collection)
.map(|s| s.codec.clone_box())
.ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))
}
#[must_use]
pub fn dir(&self) -> &Path {
&self.dir
}
#[must_use]
pub fn manifest_version(&self) -> u64 {
self.manifest_version
}
#[must_use]
pub fn index_dir(&self, collection: CollectionId) -> PathBuf {
collection_dir(&self.dir, collection).join("index")
}
pub fn read_index_snapshot(&self, collection: CollectionId) -> Result<Option<Vec<u8>>> {
let state = self
.collections
.get(&collection)
.ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
let Some(snap) = &state.index_snapshot else {
return Ok(None);
};
let path = self
.index_dir(collection)
.join(index_snapshot_file_name(snap.id));
let body = read_paged(&path, state.codec.as_ref(), PageType::IndexBlock)?;
Ok(Some(body))
}
pub fn recovery_tail(&self, collection: CollectionId) -> Result<RecoveryTail> {
let state = self
.collections
.get(&collection)
.ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
let mut upserts = Vec::with_capacity(state.active_index.len());
for (ext_id, &row) in &state.active_index {
let ar = &state.active[row as usize];
upserts.push((
ext_id.clone(),
Record {
vector: le_bytes_to_f32(&ar.vector),
payload: ar.payload.clone(),
},
));
}
let mut deleted = Vec::new();
for (&seg_idx, bitmap) in &state.dead_this_window {
if let Some(seg) = state.sealed.get(seg_idx as usize) {
let row_ids = seg.row_ids();
for row in bitmap.iter() {
if let Some(ext) = row_ids.get(row as usize) {
deleted.push(ext.clone());
}
}
}
}
Ok(RecoveryTail { upserts, deleted })
}
pub fn len(&self, collection: CollectionId) -> Result<usize> {
Ok(self
.collections
.get(&collection)
.ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?
.primary
.len())
}
pub fn is_empty(&self, collection: CollectionId) -> Result<bool> {
Ok(self.len(collection)? == 0)
}
#[must_use]
pub fn collection_names(&self) -> Vec<String> {
let mut names: Vec<String> = self.name_index.keys().cloned().collect();
names.sort();
names
}
pub fn matching_ids(
&self,
collection: CollectionId,
predicate: &SecPredicate,
) -> Result<Vec<String>> {
let state = self
.collections
.get(&collection)
.ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
let field_type = state
.descriptor
.filterable
.iter()
.find(|f| f.path == predicate.field())
.map(|f| f.field_type)
.ok_or_else(|| {
CoreError::InvalidArgument(format!("field {} is not filterable", predicate.field()))
})?;
let mut out: Vec<String> = Vec::new();
for (seg_idx, segment) in state.sealed.iter().enumerate() {
let seg_idx = seg_idx as u32;
let Some(rows) = segment.sec_query(predicate)? else {
continue;
};
for row in rows {
if segment.is_dead(row) {
continue;
}
let Some(ext_id) = segment.row_ids().get(row as usize) else {
continue;
};
if matches!(
state.primary.get(ext_id),
Some(Loc::Sealed { seg: s, row: r }) if *s == seg_idx && *r == row
) {
out.push(ext_id.clone());
}
}
}
for (ext_id, &row) in &state.active_index {
if let Some(active) = state.active.get(row as usize)
&& sec::payload_matches(predicate, field_type, &active.payload)
{
out.push(ext_id.clone());
}
}
out.sort();
out.dedup();
Ok(out)
}
pub fn checkpoint(&mut self) -> Result<()> {
self.checkpoint_with_index_snapshots(&HashMap::new())
}
pub fn checkpoint_with_index_snapshots(
&mut self,
index_snapshots: &HashMap<CollectionId, Vec<u8>>,
) -> Result<()> {
let last_lsn = Lsn(self.next_lsn.value().saturating_sub(1));
if last_lsn.value() <= self.last_checkpointed_lsn.value() {
return Ok(()); }
let mut cids: Vec<CollectionId> = self.collections.keys().copied().collect();
cids.sort();
let segment_lsn_low = self.last_checkpointed_lsn.next();
let new_version = self.manifest_version + 1;
let mut pending: HashMap<CollectionId, PendingSegment> = HashMap::new();
for &cid in &cids {
if !self.collections[&cid].has_pending() {
continue;
}
let seg_dir = segments_dir(&self.dir, cid);
fs::create_dir_all(&seg_dir).map_err(|e| CoreError::io(&seg_dir, e))?;
let codec = self.collections[&cid].codec.clone_box();
{
let state = &self.collections[&cid];
for (&seg_idx, newly_dead) in &state.dead_this_window {
if let Some(seg) = state.sealed.get(seg_idx as usize) {
let mut merged = seg.dead_bitmap();
merged |= newly_dead;
segment::write_del(&seg_dir, seg.seg_id, codec.as_ref(), &merged)?;
}
}
}
let new_seg = if self.collections[&cid].active_index.is_empty() {
None
} else {
let seg_id = self.next_segment_id;
self.next_segment_id += 1;
let row_count = {
let state = &self.collections[&cid];
let seal_rows: Vec<SealRow<'_>> = state
.active_index
.iter()
.map(|(id, &row)| SealRow {
external_id: id,
vector: &state.active[row as usize].vector,
payload: &state.active[row as usize].payload,
})
.collect();
segment::write_segment(
&seg_dir,
seg_id,
codec.as_ref(),
&seal_rows,
&state.descriptor.filterable,
)?;
seal_rows.len() as u64
};
Some((seg_id, row_count))
};
fsync_dir(&seg_dir)?;
fsync_dir(&collection_dir(&self.dir, cid))?;
fsync_dir(&self.dir.join("collections"))?;
fsync_dir(&self.dir)?;
if let Some((seg_id, row_count)) = new_seg {
let sealed = segment::open_segment(&seg_dir, seg_id, codec.as_ref())?;
pending.insert(
cid,
PendingSegment {
seg_ref: SegmentRef {
id: seg_id,
row_count,
lsn_low: segment_lsn_low,
lsn_high: last_lsn,
},
sealed,
},
);
}
}
let mut new_index_refs: HashMap<CollectionId, IndexSnapshotRef> = HashMap::new();
for &cid in &cids {
let Some(blob) = index_snapshots.get(&cid) else {
continue;
};
let index_dir = self.index_dir(cid);
fs::create_dir_all(&index_dir).map_err(|e| CoreError::io(&index_dir, e))?;
let codec = self.collections[&cid].codec.clone_box();
let path = index_dir.join(index_snapshot_file_name(new_version));
write_paged(
&path,
codec.as_ref(),
PageType::IndexBlock,
new_version,
blob,
)?;
fsync_dir(&index_dir)?;
fsync_dir(&collection_dir(&self.dir, cid))?;
new_index_refs.insert(
cid,
IndexSnapshotRef {
id: new_version,
lsn: last_lsn,
},
);
}
let mut entries = Vec::with_capacity(cids.len());
for &cid in &cids {
let state = &self.collections[&cid];
let mut segs = state.segments_meta.clone();
if let Some(p) = pending.get(&cid) {
segs.push(p.seg_ref.clone());
}
entries.push(CollectionEntry {
id: state.id,
name: state.name.clone(),
descriptor: postcard::to_allocvec(&state.descriptor)?,
segments: segs,
index_snapshot: new_index_refs.get(&cid).cloned(),
});
}
let new_manifest = Manifest {
format_version: MANIFEST_FORMAT_VERSION,
version: new_version,
last_checkpointed_lsn: last_lsn,
next_collection_id: self.next_collection_id,
next_segment_id: self.next_segment_id,
collections: entries,
};
manifest::write_manifest(&self.dir, &new_manifest, self.keyring.catalog_codec())?;
self.manifest_version = new_version;
self.last_checkpointed_lsn = last_lsn;
for &cid in &cids {
let Some(state) = self.collections.get_mut(&cid) else {
continue;
};
let dead_window = std::mem::take(&mut state.dead_this_window);
for (seg_idx, bitmap) in dead_window {
if let Some(seg) = state.sealed.get_mut(seg_idx as usize) {
seg.mark_dead(&bitmap);
}
}
if let Some(p) = pending.remove(&cid) {
let seg_idx = state.sealed.len() as u32;
for (row, ext_id) in p.sealed.row_ids().iter().enumerate() {
state.primary.insert(
ext_id.clone(),
Loc::Sealed {
seg: seg_idx,
row: row as u32,
},
);
}
state.sealed.push(p.sealed);
state.segments_meta.push(p.seg_ref);
}
state.active.clear();
state.active_index.clear();
state.index_snapshot = new_index_refs.get(&cid).cloned();
}
self.rotate_wal()?;
gc_orphan_segments(&self.dir, &new_manifest, self.keyring.as_ref())?;
gc_orphan_index_snapshots(&self.dir, &new_manifest)?;
self.auto_compact()?;
Ok(())
}
pub fn compact(&mut self) -> Result<()> {
for cid in self.sorted_cids() {
if self.reclaimable(cid) {
self.compact_collection(cid)?;
}
}
Ok(())
}
fn auto_compact(&mut self) -> Result<()> {
for cid in self.sorted_cids() {
if self.needs_compaction(cid) {
self.compact_collection(cid)?;
}
}
Ok(())
}
fn sorted_cids(&self) -> Vec<CollectionId> {
let mut cids: Vec<CollectionId> = self.collections.keys().copied().collect();
cids.sort();
cids
}
fn reclaimable(&self, cid: CollectionId) -> bool {
self.collections.get(&cid).is_some_and(|s| {
s.sealed.len() > 1
|| s.sealed
.iter()
.any(|seg| seg.live_count() < u64::from(seg.row_count()))
})
}
fn needs_compaction(&self, cid: CollectionId) -> bool {
let Some(s) = self.collections.get(&cid) else {
return false;
};
if s.sealed.is_empty() {
return false;
}
let total: u64 = s.sealed.iter().map(|seg| u64::from(seg.row_count())).sum();
let live: u64 = s.sealed.iter().map(SealedSegment::live_count).sum();
s.sealed.len() >= COMPACT_MIN_SEGMENTS || (total > 0 && (total - live) * 2 >= total)
}
fn compact_collection(&mut self, cid: CollectionId) -> Result<()> {
let codec = self
.collections
.get(&cid)
.ok_or_else(|| CoreError::NotFound(format!("collection {cid}")))?
.codec
.clone_box();
let live: Vec<(String, Vec<u8>, Vec<u8>)> = {
let state = self
.collections
.get(&cid)
.ok_or_else(|| CoreError::NotFound(format!("collection {cid}")))?;
let mut out = Vec::with_capacity(state.primary.len());
for (ext_id, &loc) in &state.primary {
if let Loc::Sealed { seg, row } = loc {
let segment = state.sealed.get(seg as usize).ok_or_else(|| {
CoreError::MalformedPage(format!("dangling segment index {seg}"))
})?;
let vector = segment.read_vector(codec.as_ref(), row, state.stride)?;
let payload = segment.read_payload(codec.as_ref(), row)?;
out.push((ext_id.clone(), vector, payload));
}
}
out
};
let (lsn_low, lsn_high) = {
let state = &self.collections[&cid];
let low = state
.segments_meta
.iter()
.map(|s| s.lsn_low.value())
.min()
.map(Lsn)
.unwrap_or(Lsn::ZERO);
let high = state
.segments_meta
.iter()
.map(|s| s.lsn_high.value())
.max()
.map(Lsn)
.unwrap_or(self.last_checkpointed_lsn);
(low, high)
};
let seg_id = self.next_segment_id;
self.next_segment_id += 1;
let seg_dir = segments_dir(&self.dir, cid);
fs::create_dir_all(&seg_dir).map_err(|e| CoreError::io(&seg_dir, e))?;
let seal_rows: Vec<SealRow<'_>> = live
.iter()
.map(|(id, v, p)| SealRow {
external_id: id,
vector: v,
payload: p,
})
.collect();
segment::write_segment(
&seg_dir,
seg_id,
codec.as_ref(),
&seal_rows,
&self.collections[&cid].descriptor.filterable,
)?;
fsync_dir(&seg_dir)?;
fsync_dir(&collection_dir(&self.dir, cid))?;
fsync_dir(&self.dir.join("collections"))?;
fsync_dir(&self.dir)?;
let new_ref = SegmentRef {
id: seg_id,
row_count: seal_rows.len() as u64,
lsn_low,
lsn_high,
};
let sealed = segment::open_segment(&seg_dir, seg_id, codec.as_ref())?;
let new_version = self.manifest_version + 1;
let mut entries = Vec::with_capacity(self.collections.len());
for &other in &self.sorted_cids() {
let state = &self.collections[&other];
let segs = if other == cid {
vec![new_ref.clone()]
} else {
state.segments_meta.clone()
};
entries.push(CollectionEntry {
id: state.id,
name: state.name.clone(),
descriptor: postcard::to_allocvec(&state.descriptor)?,
segments: segs,
index_snapshot: state.index_snapshot.clone(),
});
}
let new_manifest = Manifest {
format_version: MANIFEST_FORMAT_VERSION,
version: new_version,
last_checkpointed_lsn: self.last_checkpointed_lsn,
next_collection_id: self.next_collection_id,
next_segment_id: self.next_segment_id,
collections: entries,
};
manifest::write_manifest(&self.dir, &new_manifest, self.keyring.catalog_codec())?;
self.manifest_version = new_version;
let row_ids: Vec<String> = sealed.row_ids().to_vec();
if let Some(state) = self.collections.get_mut(&cid) {
state.sealed = vec![sealed];
state.segments_meta = vec![new_ref];
state.dead_this_window.clear();
for (row, ext_id) in row_ids.into_iter().enumerate() {
state.primary.insert(
ext_id,
Loc::Sealed {
seg: 0,
row: row as u32,
},
);
}
}
gc_orphan_segments(&self.dir, &new_manifest, self.keyring.as_ref())?;
gc_orphan_index_snapshots(&self.dir, &new_manifest)?;
Ok(())
}
fn rotate_wal(&mut self) -> Result<()> {
let wal_dir = self.dir.join("wal");
let old_seq = self.wal_seq;
let new_seq = old_seq + 1;
let new_wal = WalWriter::create(&wal_file_path(&wal_dir, new_seq), self.next_lsn)?;
fsync_dir(&wal_dir)?;
self.wal = new_wal;
self.wal_seq = new_seq;
for (seq, path) in list_wal_files(&wal_dir)? {
if seq <= old_seq {
remove_file_if_present(&path)?;
}
}
fsync_dir(&wal_dir)?;
Ok(())
}
}
fn apply_wal_entry(
collections: &mut HashMap<CollectionId, CollectionState>,
name_index: &mut HashMap<String, CollectionId>,
entry: &WalEntry,
keyring: &dyn KeyRing,
) -> Result<()> {
match &entry.op {
WalOp::CreateCollection {
collection_id,
name,
descriptor,
} => {
let descriptor = Descriptor::decode(descriptor)?;
let codec = keyring.collection_codec(*collection_id)?;
name_index.insert(name.clone(), *collection_id);
collections.insert(
*collection_id,
CollectionState::new(*collection_id, name.clone(), descriptor, codec),
);
}
WalOp::DropCollection { collection_id } => {
if let Some(state) = collections.remove(collection_id) {
name_index.remove(&state.name);
}
}
WalOp::Upsert {
collection_id,
external_id,
vector,
payload,
} => {
if let Some(state) = collections.get_mut(collection_id) {
state.apply_upsert(external_id, vector.clone(), payload.clone());
}
}
WalOp::Delete {
collection_id,
external_id,
} => {
if let Some(state) = collections.get_mut(collection_id) {
state.apply_delete(external_id);
}
}
WalOp::Checkpoint { .. } => {}
}
Ok(())
}
fn gc_orphan_segments(dir: &Path, mfst: &Manifest, keyring: &dyn KeyRing) -> Result<()> {
let collections_dir = dir.join("collections");
if !collections_dir.exists() {
return Ok(());
}
let mut referenced: HashSet<(u64, u64)> = HashSet::new();
let mut live_collections: HashSet<u64> = HashSet::new();
for c in &mfst.collections {
live_collections.insert(c.id.value());
for s in &c.segments {
referenced.insert((c.id.value(), s.id));
}
}
for entry in fs::read_dir(&collections_dir).map_err(|e| CoreError::io(&collections_dir, e))? {
let entry = entry.map_err(|e| CoreError::io(&collections_dir, e))?;
let cdir = entry.path();
let Some(cid) = entry
.file_name()
.to_str()
.and_then(|n| n.parse::<u64>().ok())
else {
continue;
};
if !live_collections.contains(&cid) {
keyring.shred_collection(CollectionId(cid))?;
if cdir.is_dir() {
fs::remove_dir_all(&cdir).map_err(|e| CoreError::io(&cdir, e))?;
}
continue;
}
let seg_dir = cdir.join("segments");
if !seg_dir.is_dir() {
continue;
}
for seg in fs::read_dir(&seg_dir).map_err(|e| CoreError::io(&seg_dir, e))? {
let seg = seg.map_err(|e| CoreError::io(&seg_dir, e))?;
let path = seg.path();
let Some(name) = seg.file_name().to_str().map(str::to_owned) else {
continue;
};
if segment::is_temp_file(&name) {
remove_file_if_present(&path)?;
continue;
}
let Some(seg_id) = segment::seg_id_of_file(&name) else {
continue;
};
if !referenced.contains(&(cid, seg_id)) {
remove_file_if_present(&path)?;
}
}
}
Ok(())
}
fn gc_orphan_index_snapshots(dir: &Path, mfst: &Manifest) -> Result<()> {
for c in &mfst.collections {
let index_dir = collection_dir(dir, c.id).join("index");
if !index_dir.is_dir() {
continue;
}
let keep = c.index_snapshot.as_ref().map(|r| r.id);
for entry in fs::read_dir(&index_dir).map_err(|e| CoreError::io(&index_dir, e))? {
let entry = entry.map_err(|e| CoreError::io(&index_dir, e))?;
let Some(name) = entry.file_name().to_str().map(str::to_owned) else {
continue;
};
let Some(id) = index_snapshot_id_of_file(&name) else {
continue; };
if Some(id) != keep {
remove_file_if_present(&entry.path())?;
}
}
}
Ok(())
}
fn remove_file_if_present(path: &Path) -> Result<()> {
match fs::remove_file(path) {
Ok(()) => Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(e) => Err(CoreError::io(path, e)),
}
}
fn collection_dir(dir: &Path, cid: CollectionId) -> PathBuf {
dir.join("collections").join(format!("{:010}", cid.value()))
}
fn segments_dir(dir: &Path, cid: CollectionId) -> PathBuf {
collection_dir(dir, cid).join("segments")
}
fn index_snapshot_file_name(id: u64) -> String {
format!("idx-{id:010}")
}
fn index_snapshot_id_of_file(name: &str) -> Option<u64> {
name.strip_prefix("idx-")
.and_then(|s| s.parse::<u64>().ok())
}
fn wal_file_path(wal_dir: &Path, seq: u64) -> PathBuf {
wal_dir.join(format!("wal-{seq:010}.log"))
}
fn list_wal_files(wal_dir: &Path) -> Result<Vec<(u64, PathBuf)>> {
let mut out = Vec::new();
for entry in fs::read_dir(wal_dir).map_err(|e| CoreError::io(wal_dir, e))? {
let entry = entry.map_err(|e| CoreError::io(wal_dir, e))?;
if let Some(seq) = entry.file_name().to_str().and_then(parse_wal_file_name) {
out.push((seq, entry.path()));
}
}
out.sort_by_key(|(seq, _)| *seq);
Ok(out)
}
fn parse_wal_file_name(name: &str) -> Option<u64> {
name.strip_prefix("wal-")
.and_then(|s| s.strip_suffix(".log"))
.and_then(|s| s.parse::<u64>().ok())
}
fn f32_to_le_bytes(v: &[f32]) -> Vec<u8> {
let mut out = Vec::with_capacity(v.len() * 4);
for &x in v {
out.extend_from_slice(&x.to_le_bytes());
}
out
}
fn le_bytes_to_f32(bytes: &[u8]) -> Vec<f32> {
bytes
.chunks_exact(4)
.map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::descriptor::{DistanceMetric, Dtype};
fn desc() -> Descriptor {
Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
}
fn open(dir: &Path) -> Store {
Store::open(dir).unwrap()
}
fn seg_dir_file(dir: &Path, cid: CollectionId, seg_id: u64) -> PathBuf {
segments_dir(dir, cid).join(format!("seg-{seg_id:010}.dir"))
}
#[test]
fn upsert_get_delete_in_memory() {
let tmp = tempfile::tempdir().unwrap();
let mut s = open(tmp.path());
let c = s.create_collection("c", desc()).unwrap();
s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
let got = s.get(c, "a").unwrap().unwrap();
assert_eq!(got.vector, vec![1.0, 2.0, 3.0, 4.0]);
assert_eq!(got.payload, b"{}");
assert!(s.delete(c, "a").unwrap());
assert!(s.get(c, "a").unwrap().is_none());
assert!(!s.delete(c, "a").unwrap());
}
#[test]
fn dim_mismatch_is_rejected() {
let tmp = tempfile::tempdir().unwrap();
let mut s = open(tmp.path());
let c = s.create_collection("c", desc()).unwrap();
assert!(matches!(
s.upsert(c, "a", &[1.0, 2.0], b"{}"),
Err(CoreError::InvalidArgument(_))
));
}
#[test]
fn upsert_batch_commits_all_on_sync() {
let tmp = tempfile::tempdir().unwrap();
{
let mut s = open(tmp.path());
let c = s.create_collection("c", desc()).unwrap();
let vecs: Vec<([f32; 4], String)> = (0..8u32)
.map(|i| ([i as f32; 4], format!("k{i}")))
.collect();
let payload = b"{}";
let records: Vec<(&str, &[f32], &[u8])> = vecs
.iter()
.map(|(v, id)| (id.as_str(), v.as_slice(), payload.as_slice()))
.collect();
let n = s.upsert_batch(c, &records).unwrap();
assert_eq!(n, 8);
for (_, id) in &vecs {
assert!(s.get(c, id).unwrap().is_some(), "missing {id}");
}
}
let s = open(tmp.path());
let c = s.collection_id("c").unwrap();
assert_eq!(s.len(c).unwrap(), 8);
for i in 0..8u32 {
let got = s.get(c, &format!("k{i}")).unwrap().unwrap();
assert_eq!(got.vector, vec![i as f32; 4]);
}
}
#[test]
fn upsert_batch_dim_mismatch_writes_nothing() {
let tmp = tempfile::tempdir().unwrap();
let mut s = open(tmp.path());
let c = s.create_collection("c", desc()).unwrap();
let bad: &[(&str, &[f32], &[u8])] = &[
("a", &[1.0, 2.0, 3.0, 4.0], b"{}"),
("b", &[1.0, 2.0], b"{}"), ];
assert!(matches!(
s.upsert_batch(c, bad),
Err(CoreError::InvalidArgument(_))
));
assert!(s.get(c, "a").unwrap().is_none());
}
#[test]
fn duplicate_collection_is_rejected() {
let tmp = tempfile::tempdir().unwrap();
let mut s = open(tmp.path());
s.create_collection("c", desc()).unwrap();
assert!(matches!(
s.create_collection("c", desc()),
Err(CoreError::AlreadyExists(_))
));
}
#[test]
fn recovers_without_checkpoint_via_wal_replay() {
let tmp = tempfile::tempdir().unwrap();
{
let mut s = open(tmp.path());
let c = s.create_collection("c", desc()).unwrap();
for i in 0..10u32 {
let v = [i as f32; 4];
s.upsert(c, &format!("k{i}"), &v, b"{}").unwrap();
}
}
let s = open(tmp.path());
let c = s.collection_id("c").unwrap();
assert_eq!(s.len(c).unwrap(), 10);
let got = s.get(c, "k7").unwrap().unwrap();
assert_eq!(got.vector, vec![7.0; 4]);
}
#[test]
fn recovers_across_checkpoint_and_wal_tail() {
let tmp = tempfile::tempdir().unwrap();
{
let mut s = open(tmp.path());
let c = s.create_collection("c", desc()).unwrap();
for i in 0..5u32 {
s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"{}")
.unwrap();
}
s.checkpoint().unwrap();
for i in 5..8u32 {
s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"{}")
.unwrap();
}
s.delete(c, "k0").unwrap();
}
let s = open(tmp.path());
let c = s.collection_id("c").unwrap();
assert_eq!(s.len(c).unwrap(), 7); assert!(s.get(c, "k0").unwrap().is_none());
assert_eq!(s.get(c, "k6").unwrap().unwrap().vector, vec![6.0; 4]);
}
#[test]
fn open_with_keyring_round_trips_through_checkpoint() {
let tmp = tempfile::tempdir().unwrap();
{
let mut s =
Store::open_with_keyring(tmp.path(), Box::new(SingleCodecKeyRing::plaintext()))
.unwrap();
let c = s.create_collection("c", desc()).unwrap();
s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
s.checkpoint().unwrap();
s.upsert(c, "b", &[5.0; 4], b"{}").unwrap();
}
let s = Store::open_with_keyring(tmp.path(), Box::new(SingleCodecKeyRing::plaintext()))
.unwrap();
let c = s.collection_id("c").unwrap();
assert_eq!(s.len(c).unwrap(), 2);
assert_eq!(
s.get(c, "a").unwrap().unwrap().vector,
vec![1.0, 2.0, 3.0, 4.0]
);
assert_eq!(s.get(c, "b").unwrap().unwrap().vector, vec![5.0; 4]);
}
#[test]
fn delete_survives_checkpoint() {
let tmp = tempfile::tempdir().unwrap();
{
let mut s = open(tmp.path());
let c = s.create_collection("c", desc()).unwrap();
s.upsert(c, "a", &[1.0; 4], b"{}").unwrap();
s.upsert(c, "b", &[2.0; 4], b"{}").unwrap();
s.checkpoint().unwrap();
s.delete(c, "a").unwrap();
s.checkpoint().unwrap(); }
let s = open(tmp.path());
let c = s.collection_id("c").unwrap();
assert!(s.get(c, "a").unwrap().is_none());
assert!(s.get(c, "b").unwrap().is_some());
assert_eq!(s.len(c).unwrap(), 1);
}
#[test]
fn reopen_is_idempotent() {
let tmp = tempfile::tempdir().unwrap();
{
let mut s = open(tmp.path());
let c = s.create_collection("c", desc()).unwrap();
s.upsert(c, "a", &[1.0; 4], b"{}").unwrap();
s.checkpoint().unwrap();
s.upsert(c, "b", &[2.0; 4], b"{}").unwrap();
}
let snapshot = |dir: &Path| {
let s = open(dir);
let c = s.collection_id("c").unwrap();
s.scan(c).unwrap()
};
assert_eq!(snapshot(tmp.path()), snapshot(tmp.path()));
}
#[test]
fn update_then_checkpoint_keeps_latest_value() {
let tmp = tempfile::tempdir().unwrap();
{
let mut s = open(tmp.path());
let c = s.create_collection("c", desc()).unwrap();
s.upsert(c, "a", &[1.0; 4], b"v1").unwrap();
s.checkpoint().unwrap();
s.upsert(c, "a", &[9.0; 4], b"v2").unwrap(); s.checkpoint().unwrap();
}
let s = open(tmp.path());
let c = s.collection_id("c").unwrap();
let got = s.get(c, "a").unwrap().unwrap();
assert_eq!(got.vector, vec![9.0; 4]);
assert_eq!(got.payload, b"v2");
assert_eq!(s.len(c).unwrap(), 1);
}
#[test]
fn update_within_one_window_seals_latest() {
let tmp = tempfile::tempdir().unwrap();
{
let mut s = open(tmp.path());
let c = s.create_collection("c", desc()).unwrap();
s.upsert(c, "a", &[1.0; 4], b"v1").unwrap();
s.upsert(c, "a", &[2.0; 4], b"v2").unwrap();
s.upsert(c, "a", &[3.0; 4], b"v3").unwrap();
s.checkpoint().unwrap();
}
let s = open(tmp.path());
let c = s.collection_id("c").unwrap();
assert_eq!(s.len(c).unwrap(), 1);
let got = s.get(c, "a").unwrap().unwrap();
assert_eq!(got.vector, vec![3.0; 4]);
assert_eq!(got.payload, b"v3");
}
#[test]
fn dropped_collection_is_gone_after_reopen() {
let tmp = tempfile::tempdir().unwrap();
{
let mut s = open(tmp.path());
let c = s.create_collection("c", desc()).unwrap();
s.upsert(c, "a", &[1.0; 4], b"{}").unwrap();
s.checkpoint().unwrap();
assert!(s.drop_collection("c").unwrap());
s.checkpoint().unwrap();
}
let s = open(tmp.path());
assert!(s.collection_id("c").is_none());
assert!(s.collection_names().is_empty());
}
#[test]
fn orphan_segment_is_garbage_collected() {
let tmp = tempfile::tempdir().unwrap();
let cid;
{
let mut s = open(tmp.path());
let c = s.create_collection("c", desc()).unwrap();
cid = c;
s.upsert(c, "a", &[1.0; 4], b"{}").unwrap();
s.checkpoint().unwrap();
}
let stray = segments_dir(tmp.path(), cid).join("seg-0000009999.vec");
fs::write(&stray, b"junk").unwrap();
assert!(stray.exists());
let _s = open(tmp.path());
assert!(!stray.exists(), "orphan segment should be GC'd on open");
}
#[test]
fn corrupt_segment_is_detected_not_served() {
let tmp = tempfile::tempdir().unwrap();
let cid;
{
let mut s = open(tmp.path());
let c = s.create_collection("c", desc()).unwrap();
cid = c;
s.upsert(c, "a", &[1.0; 4], b"{}").unwrap();
s.checkpoint().unwrap();
}
let path = seg_dir_file(tmp.path(), cid, 0);
let mut bytes = fs::read(&path).unwrap();
bytes[33] ^= 0xFF;
fs::write(&path, &bytes).unwrap();
assert!(matches!(
Store::open(tmp.path()),
Err(CoreError::PageCorrupt { .. })
));
}
#[test]
fn torn_wal_tail_drops_only_unacked_record() {
let tmp = tempfile::tempdir().unwrap();
let wal_path;
{
let mut s = open(tmp.path());
let c = s.create_collection("c", desc()).unwrap();
for i in 0..3u32 {
s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"{}")
.unwrap();
}
wal_path = wal_file_path(&tmp.path().join("wal"), s.wal_seq);
}
{
use std::io::Write as _;
let mut f = fs::OpenOptions::new().append(true).open(&wal_path).unwrap();
f.write_all(&[0xAA, 0xBB, 0xCC]).unwrap();
f.sync_data().unwrap();
}
let s = open(tmp.path());
let c = s.collection_id("c").unwrap();
assert_eq!(s.len(c).unwrap(), 3); }
#[test]
fn reads_served_from_disk_after_checkpoint() {
let tmp = tempfile::tempdir().unwrap();
let mut s = open(tmp.path());
let c = s.create_collection("c", desc()).unwrap();
s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], br#"{"k":1}"#)
.unwrap();
s.checkpoint().unwrap();
let got = s.get(c, "a").unwrap().unwrap();
assert_eq!(got.vector, vec![1.0, 2.0, 3.0, 4.0]);
assert_eq!(got.payload, br#"{"k":1}"#);
}
#[test]
fn high_dim_vectors_straddle_pages() {
let tmp = tempfile::tempdir().unwrap();
let mut s = open(tmp.path());
let dim = 1000usize; let c = s
.create_collection(
"c",
Descriptor::new(dim as u32, Dtype::F32, DistanceMetric::L2),
)
.unwrap();
for i in 0..20u32 {
let v: Vec<f32> = (0..dim).map(|j| (i as f32) * 1000.0 + j as f32).collect();
s.upsert(c, &format!("k{i}"), &v, b"{}").unwrap();
}
s.checkpoint().unwrap();
let s = open(tmp.path());
let c = s.collection_id("c").unwrap();
for i in 0..20u32 {
let got = s.get(c, &format!("k{i}")).unwrap().unwrap();
let want: Vec<f32> = (0..dim).map(|j| (i as f32) * 1000.0 + j as f32).collect();
assert_eq!(
got.vector, want,
"vector k{i} mismatch after straddling read"
);
}
}
#[test]
fn delete_persists_via_del_bitmap_across_reopen() {
let tmp = tempfile::tempdir().unwrap();
let cid;
{
let mut s = open(tmp.path());
let c = s.create_collection("c", desc()).unwrap();
cid = c;
for i in 0..5u32 {
s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"{}")
.unwrap();
}
s.checkpoint().unwrap();
s.delete(c, "k2").unwrap();
s.checkpoint().unwrap();
assert_eq!(
s.collections[&c].sealed.len(),
1,
"no new segment for a delete-only window"
);
}
assert!(
segments_dir(tmp.path(), cid)
.join("seg-0000000000.del")
.exists(),
".del must be persisted for the deleted row"
);
let s = open(tmp.path());
let c = s.collection_id("c").unwrap();
assert!(s.get(c, "k2").unwrap().is_none());
assert_eq!(s.len(c).unwrap(), 4);
for i in [0u32, 1, 3, 4] {
assert!(s.get(c, &format!("k{i}")).unwrap().is_some());
}
}
#[test]
fn shadowed_row_is_tombstoned_and_latest_wins() {
let tmp = tempfile::tempdir().unwrap();
{
let mut s = open(tmp.path());
let c = s.create_collection("c", desc()).unwrap();
for i in 0..5u32 {
s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"v1")
.unwrap();
}
s.checkpoint().unwrap(); s.upsert(c, "k2", &[99.0; 4], b"v2").unwrap();
s.checkpoint().unwrap(); }
let s = open(tmp.path());
let c = s.collection_id("c").unwrap();
assert_eq!(s.len(c).unwrap(), 5); let got = s.get(c, "k2").unwrap().unwrap();
assert_eq!(got.vector, vec![99.0; 4]);
assert_eq!(got.payload, b"v2");
}
#[test]
fn compaction_merges_segments_reclaims_and_keeps_active_rows() {
let tmp = tempfile::tempdir().unwrap();
let cid;
{
let mut s = open(tmp.path());
let c = s.create_collection("c", desc()).unwrap();
cid = c;
for i in 0..6u32 {
s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"{}")
.unwrap();
}
s.checkpoint().unwrap(); for i in 6..12u32 {
s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"{}")
.unwrap();
}
s.checkpoint().unwrap(); s.delete(c, "k0").unwrap();
s.delete(c, "k6").unwrap();
s.checkpoint().unwrap(); assert_eq!(s.collections[&c].sealed.len(), 2);
s.upsert(c, "fresh", &[7.0; 4], b"new").unwrap();
s.compact().unwrap();
assert_eq!(s.collections[&c].sealed.len(), 1, "segments merged to one");
assert!(
!segments_dir(tmp.path(), cid)
.join("seg-0000000000.dir")
.exists(),
"old segment files reclaimed"
);
assert_eq!(s.len(c).unwrap(), 11); assert!(s.get(c, "k0").unwrap().is_none());
assert!(s.get(c, "k6").unwrap().is_none());
assert_eq!(s.get(c, "k5").unwrap().unwrap().vector, vec![5.0; 4]);
assert_eq!(s.get(c, "fresh").unwrap().unwrap().payload, b"new");
}
let s = open(tmp.path());
let c = s.collection_id("c").unwrap();
assert_eq!(s.collections[&c].sealed.len(), 1);
assert_eq!(s.len(c).unwrap(), 11);
assert!(s.get(c, "k0").unwrap().is_none());
assert_eq!(s.get(c, "fresh").unwrap().unwrap().vector, vec![7.0; 4]);
assert_eq!(s.get(c, "k11").unwrap().unwrap().vector, vec![11.0; 4]);
}
#[test]
fn auto_compaction_merges_many_segments() {
let tmp = tempfile::tempdir().unwrap();
let mut s = open(tmp.path());
let c = s.create_collection("c", desc()).unwrap();
for ck in 0..8u32 {
for i in 0..3u32 {
let n = ck * 3 + i;
s.upsert(c, &format!("k{n}"), &[n as f32; 4], b"{}")
.unwrap();
}
s.checkpoint().unwrap();
}
assert!(
s.collections[&c].sealed.len() < COMPACT_MIN_SEGMENTS,
"auto-compaction should have merged the segments"
);
assert_eq!(s.len(c).unwrap(), 24);
assert_eq!(s.get(c, "k0").unwrap().unwrap().vector, vec![0.0; 4]);
assert_eq!(s.get(c, "k23").unwrap().unwrap().vector, vec![23.0; 4]);
}
#[test]
fn matching_ids_spans_secondary_index_and_active_buffer() {
use crate::descriptor::FilterableField;
use crate::sec::SecValue;
let tmp = tempfile::tempdir().unwrap();
let mut s = open(tmp.path());
let descriptor = Descriptor::new(4, Dtype::F32, DistanceMetric::L2).with_filterable(vec![
FilterableField::keyword("city"),
FilterableField::numeric("age"),
]);
let c = s.create_collection("c", descriptor).unwrap();
s.upsert(c, "a", &[0.0; 4], br#"{"city":"paris","age":30}"#)
.unwrap();
s.upsert(c, "b", &[0.0; 4], br#"{"city":"lyon","age":25}"#)
.unwrap();
s.upsert(c, "d", &[0.0; 4], br#"{"city":"paris","age":40}"#)
.unwrap();
s.checkpoint().unwrap();
s.upsert(c, "e", &[0.0; 4], br#"{"city":"paris","age":22}"#)
.unwrap();
let paris = || SecPredicate::Eq {
field: "city".into(),
value: SecValue::Keyword("paris".into()),
};
assert_eq!(s.matching_ids(c, &paris()).unwrap(), ["a", "d", "e"]);
assert_eq!(
s.matching_ids(
c,
&SecPredicate::Range {
field: "age".into(),
lo: Some(SecValue::Numeric(25.0)),
hi: Some(SecValue::Numeric(35.0)),
lo_inclusive: true,
hi_inclusive: true,
}
)
.unwrap(),
["a", "b"]
);
s.delete(c, "a").unwrap();
assert_eq!(s.matching_ids(c, &paris()).unwrap(), ["d", "e"]);
assert!(matches!(
s.matching_ids(
c,
&SecPredicate::Eq {
field: "country".into(),
value: SecValue::Keyword("fr".into()),
}
),
Err(CoreError::InvalidArgument(_))
));
s.checkpoint().unwrap();
let s = open(tmp.path());
let c = s.collection_id("c").unwrap();
assert_eq!(s.matching_ids(c, &paris()).unwrap(), ["d", "e"]);
}
fn index_snapshot_files(dir: &Path, cid: CollectionId) -> Vec<String> {
let idx = collection_dir(dir, cid).join("index");
let mut names: Vec<String> = fs::read_dir(&idx)
.map(|rd| {
rd.filter_map(std::result::Result::ok)
.filter_map(|e| e.file_name().to_str().map(str::to_owned))
.filter(|n| n.starts_with("idx-"))
.collect()
})
.unwrap_or_default();
names.sort();
names
}
#[test]
fn index_snapshot_round_trips_through_checkpoint_and_reopen() {
let tmp = tempfile::tempdir().unwrap();
let blob = b"opaque-index-bytes".to_vec();
let cid = {
let mut s = open(tmp.path());
let c = s.create_collection("c", desc()).unwrap();
s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
s.checkpoint_with_index_snapshots(&HashMap::from([(c, blob.clone())]))
.unwrap();
assert_eq!(s.read_index_snapshot(c).unwrap(), Some(blob.clone()));
assert_eq!(index_snapshot_files(tmp.path(), c).len(), 1);
c
};
let s = open(tmp.path());
assert_eq!(s.read_index_snapshot(cid).unwrap(), Some(blob));
}
#[test]
fn checkpoint_without_a_snapshot_clears_and_reclaims_it() {
let tmp = tempfile::tempdir().unwrap();
let mut s = open(tmp.path());
let c = s.create_collection("c", desc()).unwrap();
s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
s.checkpoint_with_index_snapshots(&HashMap::from([(c, b"blob".to_vec())]))
.unwrap();
assert!(s.read_index_snapshot(c).unwrap().is_some());
s.upsert(c, "b", &[5.0, 6.0, 7.0, 8.0], b"{}").unwrap();
s.checkpoint().unwrap();
assert_eq!(s.read_index_snapshot(c).unwrap(), None);
assert!(index_snapshot_files(tmp.path(), c).is_empty());
let s = open(tmp.path());
assert_eq!(s.read_index_snapshot(c).unwrap(), None);
}
#[test]
fn a_new_snapshot_supersedes_and_reclaims_the_old_one() {
let tmp = tempfile::tempdir().unwrap();
let mut s = open(tmp.path());
let c = s.create_collection("c", desc()).unwrap();
s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
s.checkpoint_with_index_snapshots(&HashMap::from([(c, b"first".to_vec())]))
.unwrap();
s.upsert(c, "b", &[5.0, 6.0, 7.0, 8.0], b"{}").unwrap();
s.checkpoint_with_index_snapshots(&HashMap::from([(c, b"second".to_vec())]))
.unwrap();
assert_eq!(s.read_index_snapshot(c).unwrap(), Some(b"second".to_vec()));
assert_eq!(index_snapshot_files(tmp.path(), c).len(), 1);
}
#[test]
fn compaction_preserves_the_index_snapshot() {
let tmp = tempfile::tempdir().unwrap();
let mut s = open(tmp.path());
let c = s.create_collection("c", desc()).unwrap();
s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
s.checkpoint_with_index_snapshots(&HashMap::from([(c, b"keep".to_vec())]))
.unwrap();
s.upsert(c, "b", &[5.0, 6.0, 7.0, 8.0], b"{}").unwrap();
s.delete(c, "a").unwrap();
s.checkpoint_with_index_snapshots(&HashMap::from([(c, b"keep".to_vec())]))
.unwrap();
s.compact().unwrap();
assert_eq!(s.read_index_snapshot(c).unwrap(), Some(b"keep".to_vec()));
let s = open(tmp.path());
assert_eq!(s.read_index_snapshot(c).unwrap(), Some(b"keep".to_vec()));
}
#[test]
fn orphan_index_snapshot_is_reclaimed_on_open() {
let tmp = tempfile::tempdir().unwrap();
let cid = {
let mut s = open(tmp.path());
let c = s.create_collection("c", desc()).unwrap();
s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
s.checkpoint_with_index_snapshots(&HashMap::from([(c, b"live".to_vec())]))
.unwrap();
let stray = s.index_dir(c).join("idx-9999999999");
fs::write(&stray, b"orphan").unwrap();
c
};
let s = open(tmp.path());
assert!(!s.index_dir(cid).join("idx-9999999999").exists());
assert_eq!(s.read_index_snapshot(cid).unwrap(), Some(b"live".to_vec()));
}
}