Skip to main content

quiver_core/
store.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2//! The storage engine: a durable, crash-safe vector store per collection.
3//!
4//! A [`Store`] ties the [`crate::wal`] and [`crate::manifest`] primitives,
5//! together with immutable `segment`s in the row-addressed on-disk
6//! format (ADR-0004), into a recoverable engine. The durability contract
7//! (ADR-0005): a mutation is acknowledged only after its WAL record is `fsync`'d,
8//! so an acknowledged write survives `kill -9`.
9//!
10//! ## Memory model
11//! Vectors and payloads live on disk in sealed segments and are read through an
12//! `mmap`, decrypted on demand — only the working set is resident. The engine
13//! keeps in RAM a **primary index** (external id → row location) per collection,
14//! plus the **active buffer**: the rows upserted since the last checkpoint, which
15//! are also durable in the WAL. A read resolves the id to either an active row or
16//! a `(segment, row)` and fetches the bytes from the active buffer or the segment.
17//!
18//! ## Write path
19//! `upsert`/`delete`/`create_collection`/`drop_collection` append a WAL record,
20//! `fsync` it (acknowledgement), then update in-memory state. `checkpoint` seals
21//! the active buffer into a new immutable segment per collection, persists the
22//! window's deletes and shadowed rows into the affected segments' `.del`
23//! tombstone bitmaps, atomically swaps in a manifest, rotates the WAL, and
24//! garbage-collects superseded files.
25//!
26//! ## Recovery (on open)
27//! Read `CURRENT` → load the manifest → for each referenced segment, read its
28//! row directory and tombstone bitmap and rebuild the primary index (a row marked
29//! dead in its segment is skipped, so each id is live in exactly one segment) →
30//! replay every WAL record with `lsn > last_checkpointed_lsn` idempotently into
31//! the active buffer → garbage-collect orphan segment files a crash left between a
32//! flush and the manifest swap. A torn trailing WAL record fails its frame check
33//! and is dropped; it was never acknowledged.
34//!
35//! ## Concurrency
36//! Phase 1/2 is a single-writer engine: mutations take `&mut self`, reads take
37//! `&self`. The lock-free MVCC snapshot model (ADR-0006) arrives with the
38//! server integration; until then a server wraps the store in a lock.
39
40use std::collections::{BTreeMap, HashMap, HashSet};
41use std::fs;
42use std::path::{Path, PathBuf};
43use std::sync::Arc;
44
45use roaring::RoaringBitmap;
46
47use crate::descriptor::Descriptor;
48use crate::error::{CoreError, Result};
49use crate::ids::{CollectionId, Lsn};
50use crate::keyring::{KeyRing, SingleCodecKeyRing};
51use crate::manifest::{
52    self, CollectionEntry, IndexSnapshotRef, MANIFEST_FORMAT_VERSION, Manifest, SegmentRef,
53};
54use crate::page::{PageCodec, PageType};
55use crate::paged::{fsync_dir, read_paged, write_paged};
56use crate::sec::{self, SecPredicate};
57use crate::segment::{self, SealRow, SealedSegment};
58use crate::wal::{self, WalEntry, WalOp, WalWriter};
59
60/// Number of sealed segments at which a checkpoint auto-compacts a collection,
61/// merging them to keep reads and recovery from fanning out across many files.
62const COMPACT_MIN_SEGMENTS: usize = 8;
63
64/// A stored record returned by reads: the decoded vector and opaque payload.
65#[derive(Debug, Clone, PartialEq)]
66pub struct Record {
67    /// The vector, decoded from its on-disk little-endian bytes.
68    pub vector: Vec<f32>,
69    /// The opaque payload bytes (validated UTF-8 JSON at the API edge).
70    pub payload: Vec<u8>,
71}
72
73/// The post-checkpoint mutations a restored index snapshot must replay to reach
74/// the current state — the WAL tail `open` already applied to the store
75/// (ADR-0025). Both lists are bounded by the checkpoint cadence, not the
76/// collection size.
77#[derive(Debug, Default)]
78pub struct RecoveryTail {
79    /// Live rows upserted since the last checkpoint (the active buffer).
80    pub upserts: Vec<(String, Record)>,
81    /// External ids whose pre-checkpoint row died this window (deleted, or
82    /// shadowed by a re-upsert) and so must be removed from a restored index.
83    pub deleted: Vec<String>,
84}
85
86// Where a live row's bytes are: in the in-RAM active buffer, or in a sealed
87// segment at `(segment index, row)`.
88#[derive(Debug, Clone, Copy)]
89enum Loc {
90    Active(u32),
91    Sealed { seg: u32, row: u32 },
92}
93
94// A row buffered in RAM since the last checkpoint. Also durable in the WAL until
95// the checkpoint seals it to disk.
96#[derive(Debug, Clone)]
97struct ActiveRow {
98    vector: Vec<u8>,
99    payload: Vec<u8>,
100}
101
102// In-memory state of one collection.
103struct CollectionState {
104    id: CollectionId,
105    name: String,
106    descriptor: Descriptor,
107    // The codec that seals this collection's segments and index artifacts —
108    // its own data-encryption key under an envelope key-ring, or the shared
109    // codec under a single-codec key-ring. Built once from the key-ring at
110    // create/open and held so reads need no per-call key derivation.
111    codec: Box<dyn PageCodec>,
112    // Bytes per vector (`dim × dtype size`), cached from the descriptor.
113    stride: usize,
114    // Live external id → location. The authority for `get`/`len`/`scan`; ordered
115    // so `scan` yields ids deterministically.
116    primary: BTreeMap<String, Loc>,
117    // Sealed segments in creation order; `Loc::Sealed.seg` indexes this.
118    sealed: Vec<SealedSegment>,
119    // Manifest segment refs, parallel to `sealed`.
120    segments_meta: Vec<SegmentRef>,
121    // Rows upserted since the last checkpoint; index = `Loc::Active` row.
122    active: Vec<ActiveRow>,
123    // Live external id → its latest active row, for sealing at the next checkpoint.
124    active_index: BTreeMap<String, u32>,
125    // Sealed-segment rows that died this window (deleted or shadowed), keyed by
126    // segment index; merged into each segment's `.del` at the next checkpoint.
127    dead_this_window: BTreeMap<u32, RoaringBitmap>,
128    // The durable index snapshot reference (ADR-0025): loaded from the manifest on
129    // open, refreshed at each checkpoint; `None` if the index is rebuilt on open.
130    index_snapshot: Option<IndexSnapshotRef>,
131}
132
133impl CollectionState {
134    fn new(
135        id: CollectionId,
136        name: String,
137        descriptor: Descriptor,
138        codec: Box<dyn PageCodec>,
139    ) -> Self {
140        let stride = descriptor.stride();
141        Self {
142            id,
143            name,
144            descriptor,
145            codec,
146            stride,
147            primary: BTreeMap::new(),
148            sealed: Vec::new(),
149            segments_meta: Vec::new(),
150            active: Vec::new(),
151            active_index: BTreeMap::new(),
152            dead_this_window: BTreeMap::new(),
153            index_snapshot: None,
154        }
155    }
156
157    fn has_pending(&self) -> bool {
158        !self.active_index.is_empty() || !self.dead_this_window.is_empty()
159    }
160
161    // Apply an upsert to in-memory state (shared by the write path and WAL
162    // replay). If the id currently lives in a sealed segment, that row is now
163    // shadowed and recorded for tombstoning at the next checkpoint.
164    fn apply_upsert(&mut self, external_id: &str, vector: Vec<u8>, payload: Vec<u8>) {
165        if let Some(Loc::Sealed { seg, row }) = self.primary.get(external_id).copied() {
166            self.dead_this_window.entry(seg).or_default().insert(row);
167        }
168        let row = self.active.len() as u32;
169        self.active.push(ActiveRow { vector, payload });
170        self.active_index.insert(external_id.to_owned(), row);
171        self.primary
172            .insert(external_id.to_owned(), Loc::Active(row));
173    }
174
175    // Apply a delete to in-memory state (shared by the write path and WAL
176    // replay). Returns whether the id existed. A deleted sealed row is recorded
177    // for tombstoning; a deleted active row is simply dropped from the buffer.
178    fn apply_delete(&mut self, external_id: &str) -> bool {
179        match self.primary.remove(external_id) {
180            Some(Loc::Sealed { seg, row }) => {
181                self.dead_this_window.entry(seg).or_default().insert(row);
182                self.active_index.remove(external_id);
183                true
184            }
185            Some(Loc::Active(_)) => {
186                self.active_index.remove(external_id);
187                true
188            }
189            None => false,
190        }
191    }
192}
193
194// A segment written during a checkpoint, opened and ready to install after the
195// manifest swap commits. The repointing ids come from `sealed.row_ids()`.
196struct PendingSegment {
197    seg_ref: SegmentRef,
198    sealed: SealedSegment,
199}
200
201/// A synchronous hook invoked with each committed [`WalEntry`], in commit order.
202/// Leader-follower replication (ADR-0030) installs one to publish each op to its
203/// replication stream. A plain `Fn` keeps the engine runtime-agnostic — no async
204/// dependency leaks into `quiver-core`.
205pub type CommitObserver = Arc<dyn Fn(&WalEntry) + Send + Sync>;
206
207/// The durable storage engine for one data directory.
208pub struct Store {
209    dir: PathBuf,
210    keyring: Box<dyn KeyRing>,
211    collections: HashMap<CollectionId, CollectionState>,
212    name_index: HashMap<String, CollectionId>,
213    next_lsn: Lsn,
214    next_collection_id: u64,
215    next_segment_id: u64,
216    manifest_version: u64,
217    last_checkpointed_lsn: Lsn,
218    wal: WalWriter,
219    wal_seq: u64,
220    commit_observer: Option<CommitObserver>,
221}
222
223impl Store {
224    /// Open (creating if absent) the store at `dir` with encryption-at-rest
225    /// disabled (the plaintext codec). Runs full crash recovery.
226    pub fn open(dir: &Path) -> Result<Self> {
227        Self::open_with_keyring(dir, Box::new(SingleCodecKeyRing::plaintext()))
228    }
229
230    /// Open the store sealing every byte — catalog and all collections — with a
231    /// single [`PageCodec`]. Used by `quiver-crypto` to enable encryption-at-rest
232    /// under one root key (no per-collection envelope). Runs full crash recovery.
233    pub fn open_with_codec(dir: &Path, codec: Box<dyn PageCodec>) -> Result<Self> {
234        Self::open_with_keyring(dir, Box::new(SingleCodecKeyRing::new(codec)))
235    }
236
237    /// Open the store with a [`KeyRing`] supplying a catalog codec (manifest and
238    /// WAL) and a per-collection codec (segments and index artifacts). This is
239    /// the seam `quiver-crypto`'s envelope key-ring uses to seal each collection
240    /// under its own data-encryption key, enabling crypto-shredding. Runs full
241    /// crash recovery.
242    pub fn open_with_keyring(dir: &Path, keyring: Box<dyn KeyRing>) -> Result<Self> {
243        fs::create_dir_all(dir).map_err(|e| CoreError::io(dir, e))?;
244        let wal_dir = dir.join("wal");
245        fs::create_dir_all(&wal_dir).map_err(|e| CoreError::io(&wal_dir, e))?;
246        fsync_dir(dir)?;
247        fsync_dir(&wal_dir)?;
248
249        // 1. Load the manifest (or start empty).
250        let mfst = manifest::read_current(dir, keyring.catalog_codec())?.unwrap_or_default();
251
252        // 2. Rebuild the primary index from the sealed segments the manifest
253        //    references. A row tombstoned in its segment's `.del` is skipped, so
254        //    each external id is added from the single segment in which it is live.
255        let mut collections: HashMap<CollectionId, CollectionState> = HashMap::new();
256        let mut name_index: HashMap<String, CollectionId> = HashMap::new();
257        for entry in &mfst.collections {
258            let descriptor = Descriptor::decode(&entry.descriptor)?;
259            let codec = keyring.collection_codec(entry.id)?;
260            let mut state = CollectionState::new(entry.id, entry.name.clone(), descriptor, codec);
261            state.segments_meta = entry.segments.clone();
262            state.index_snapshot = entry.index_snapshot.clone();
263            let seg_dir = segments_dir(dir, entry.id);
264            for seg in &entry.segments {
265                let sealed = segment::open_segment(&seg_dir, seg.id, state.codec.as_ref())?;
266                let seg_idx = state.sealed.len() as u32;
267                for (row, ext_id) in sealed.row_ids().iter().enumerate() {
268                    let row = row as u32;
269                    if !sealed.is_dead(row) {
270                        state
271                            .primary
272                            .insert(ext_id.clone(), Loc::Sealed { seg: seg_idx, row });
273                    }
274                }
275                state.sealed.push(sealed);
276            }
277            name_index.insert(state.name.clone(), state.id);
278            collections.insert(state.id, state);
279        }
280
281        // 3. Replay the WAL tail (records past the checkpoint), idempotently.
282        let floor = mfst.last_checkpointed_lsn;
283        let mut max_lsn = floor;
284        let wal_files = list_wal_files(&wal_dir)?;
285        let mut max_seq = 0u64;
286        let mut keep_seqs: HashSet<u64> = HashSet::new();
287        for (seq, path) in &wal_files {
288            max_seq = (*seq).max(max_seq);
289            let replay = wal::read_all(path, keyring.catalog_codec())?;
290            let mut had_live = false;
291            for entry in replay.entries {
292                if entry.lsn.value() <= floor.value() {
293                    continue; // already captured in a segment
294                }
295                had_live = true;
296                if entry.lsn > max_lsn {
297                    max_lsn = entry.lsn;
298                }
299                apply_wal_entry(&mut collections, &mut name_index, &entry, keyring.as_ref())?;
300            }
301            if had_live {
302                keep_seqs.insert(*seq);
303            }
304        }
305        let next_lsn = max_lsn.next();
306
307        // 4. GC orphan segment files not referenced by the manifest (a crash
308        //    between a segment flush and the manifest swap), then the analogous
309        //    orphan/superseded index snapshots (ADR-0025).
310        gc_orphan_segments(dir, &mfst, keyring.as_ref())?;
311        gc_orphan_index_snapshots(dir, &mfst)?;
312
313        // 5. Start a fresh WAL segment for new appends, then drop superseded WAL
314        //    files (empty or fully below the checkpoint).
315        let wal_seq = max_seq + 1;
316        let wal = WalWriter::create(&wal_file_path(&wal_dir, wal_seq), next_lsn)?;
317        fsync_dir(&wal_dir)?;
318        for (seq, path) in &wal_files {
319            if !keep_seqs.contains(seq) {
320                remove_file_if_present(path)?;
321            }
322        }
323        fsync_dir(&wal_dir)?;
324
325        Ok(Self {
326            dir: dir.to_path_buf(),
327            keyring,
328            collections,
329            name_index,
330            next_lsn,
331            next_collection_id: mfst.next_collection_id,
332            next_segment_id: mfst.next_segment_id,
333            manifest_version: mfst.version,
334            last_checkpointed_lsn: floor,
335            wal,
336            wal_seq,
337            commit_observer: None,
338        })
339    }
340
341    /// Install a hook invoked with each committed [`WalEntry`], in commit order
342    /// (ADR-0030). Used by the server to drive a leader's replication stream;
343    /// replaces any previous observer.
344    pub fn set_commit_observer(&mut self, observer: CommitObserver) {
345        self.commit_observer = Some(observer);
346    }
347
348    // Notify the commit observer (if any) of a durably-committed entry.
349    fn publish(&self, entry: &WalEntry) {
350        if let Some(observer) = &self.commit_observer {
351            observer(entry);
352        }
353    }
354
355    /// The operations that recreate the store's current logical state, for a
356    /// replication follower to bootstrap from (ADR-0030): a `CreateCollection`
357    /// per collection, each followed by an `Upsert` per live point. Collections
358    /// are emitted before their points so a follower can apply the stream in
359    /// order.
360    pub fn replication_snapshot(&self) -> Result<Vec<WalOp>> {
361        let mut ops = Vec::new();
362        for (&id, state) in &self.collections {
363            ops.push(WalOp::CreateCollection {
364                collection_id: id,
365                name: state.name.clone(),
366                descriptor: postcard::to_allocvec(&state.descriptor)?,
367            });
368            for (external_id, record) in self.scan(id)? {
369                ops.push(WalOp::Upsert {
370                    collection_id: id,
371                    external_id,
372                    vector: f32_to_le_bytes(&record.vector),
373                    payload: record.payload,
374                });
375            }
376        }
377        Ok(ops)
378    }
379
380    /// Apply a replicated operation received from a leader (ADR-0030). The op is
381    /// persisted to *this* node's WAL under a locally-assigned LSN — preserving
382    /// the leader's collection id so later ops resolve — then applied to in-memory
383    /// state through the same path crash recovery uses. `Checkpoint` ops are a
384    /// per-node concern and are ignored; followers checkpoint themselves.
385    pub fn apply_replicated(&mut self, op: WalOp) -> Result<()> {
386        if let WalOp::Checkpoint { .. } = op {
387            return Ok(());
388        }
389        if let WalOp::CreateCollection { collection_id, .. } = &op {
390            // Provision key material before the collection's codec is needed, and
391            // keep the local id allocator ahead of the leader's ids.
392            self.keyring.provision_collection(*collection_id)?;
393            self.next_collection_id = self.next_collection_id.max(collection_id.0 + 1);
394        }
395        let lsn = self.next_lsn;
396        let entry = WalEntry { lsn, op };
397        self.wal.append_sync(self.keyring.catalog_codec(), &entry)?;
398        self.next_lsn = lsn.next();
399        apply_wal_entry(
400            &mut self.collections,
401            &mut self.name_index,
402            &entry,
403            self.keyring.as_ref(),
404        )?;
405        self.publish(&entry);
406        Ok(())
407    }
408
409    /// Create a collection. Fails if the name is already taken.
410    pub fn create_collection(
411        &mut self,
412        name: &str,
413        descriptor: Descriptor,
414    ) -> Result<CollectionId> {
415        if self.name_index.contains_key(name) {
416            return Err(CoreError::AlreadyExists(format!("collection {name}")));
417        }
418        if descriptor.dim == 0 {
419            return Err(CoreError::InvalidArgument(
420                "dim must be non-zero".to_owned(),
421            ));
422        }
423        let id = CollectionId(self.next_collection_id);
424        // Provision the collection's key material before its first durable record
425        // references it, so WAL replay on recovery can always open what it needs.
426        self.keyring.provision_collection(id)?;
427        let descriptor_bytes = postcard::to_allocvec(&descriptor)?;
428        let lsn = self.next_lsn;
429        let entry = WalEntry {
430            lsn,
431            op: WalOp::CreateCollection {
432                collection_id: id,
433                name: name.to_owned(),
434                descriptor: descriptor_bytes,
435            },
436        };
437        self.wal.append_sync(self.keyring.catalog_codec(), &entry)?;
438        self.next_lsn = lsn.next();
439        self.publish(&entry);
440        self.next_collection_id += 1;
441        let codec = self.keyring.collection_codec(id)?;
442        self.collections.insert(
443            id,
444            CollectionState::new(id, name.to_owned(), descriptor, codec),
445        );
446        self.name_index.insert(name.to_owned(), id);
447        Ok(id)
448    }
449
450    /// Drop a collection and all of its data. Its segment files are reclaimed at
451    /// the next checkpoint or the next open. Returns whether it existed.
452    pub fn drop_collection(&mut self, name: &str) -> Result<bool> {
453        let Some(&id) = self.name_index.get(name) else {
454            return Ok(false);
455        };
456        let lsn = self.next_lsn;
457        let entry = WalEntry {
458            lsn,
459            op: WalOp::DropCollection { collection_id: id },
460        };
461        self.wal.append_sync(self.keyring.catalog_codec(), &entry)?;
462        self.next_lsn = lsn.next();
463        self.publish(&entry);
464        self.collections.remove(&id);
465        self.name_index.remove(name);
466        Ok(true)
467    }
468
469    /// Crypto-shred a collection: drop it, checkpoint so the manifest no longer
470    /// references it and its files are reclaimed, then destroy its key material.
471    /// After this its sealed segments and index are unrecoverable even to the
472    /// master-key holder (ADR-0010); with a single-codec key-ring there is no
473    /// per-collection key, so this is `drop` plus a checkpoint. Returns whether
474    /// the collection existed.
475    pub fn shred_collection(&mut self, name: &str) -> Result<bool> {
476        let Some(id) = self.collection_id(name) else {
477            return Ok(false);
478        };
479        self.drop_collection(name)?;
480        // Seal any un-checkpointed rows into DEK-protected segments and rotate
481        // the WAL, so no live catalog-keyed copy of the collection survives; the
482        // checkpoint's GC then reclaims its files and shreds its key.
483        self.checkpoint()?;
484        // Destroy the key explicitly too, covering a collection that never
485        // reached a segment directory for GC to find. Idempotent.
486        self.keyring.shred_collection(id)?;
487        Ok(true)
488    }
489
490    /// Insert or replace a point. The vector length must equal the collection's
491    /// dimensionality; the payload is stored opaquely. Returns the assigned LSN
492    /// once the write is durable.
493    pub fn upsert(
494        &mut self,
495        collection: CollectionId,
496        external_id: &str,
497        vector: &[f32],
498        payload: &[u8],
499    ) -> Result<Lsn> {
500        let dim = self
501            .collections
502            .get(&collection)
503            .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?
504            .descriptor
505            .dim as usize;
506        if vector.len() != dim {
507            return Err(CoreError::InvalidArgument(format!(
508                "vector has {} dims, collection expects {dim}",
509                vector.len()
510            )));
511        }
512        let vector_bytes = f32_to_le_bytes(vector);
513        let lsn = self.next_lsn;
514        let entry = WalEntry {
515            lsn,
516            op: WalOp::Upsert {
517                collection_id: collection,
518                external_id: external_id.to_owned(),
519                vector: vector_bytes.clone(),
520                payload: payload.to_vec(),
521            },
522        };
523        self.wal.append_sync(self.keyring.catalog_codec(), &entry)?;
524        self.next_lsn = lsn.next();
525        self.publish(&entry);
526        let state = self
527            .collections
528            .get_mut(&collection)
529            .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
530        state.apply_upsert(external_id, vector_bytes, payload.to_vec());
531        Ok(lsn)
532    }
533
534    /// Upsert a batch of points with a **single** `fdatasync` instead of one
535    /// per point.  All records are acknowledged atomically — if the server
536    /// crashes before the sync completes, none of the batch is durable (the
537    /// caller, seeing no response, should retry the whole batch).  This is the
538    /// standard batch-commit pattern used by every production database.
539    ///
540    /// `records` is `(external_id, vector, payload_bytes)` slices; the vectors
541    /// must match the collection's dimensionality or the call returns an error
542    /// before writing anything.
543    pub fn upsert_batch(
544        &mut self,
545        collection: CollectionId,
546        records: &[(&str, &[f32], &[u8])],
547    ) -> Result<u64> {
548        if records.is_empty() {
549            return Ok(0);
550        }
551        let dim = self
552            .collections
553            .get(&collection)
554            .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?
555            .descriptor
556            .dim as usize;
557        for (_, vector, _) in records {
558            if vector.len() != dim {
559                return Err(CoreError::InvalidArgument(format!(
560                    "vector has {} dims, collection expects {dim}",
561                    vector.len()
562                )));
563            }
564        }
565
566        // Build one WalEntry per record, advancing the LSN for each.
567        let mut entries: Vec<WalEntry> = Vec::with_capacity(records.len());
568        for (ext_id, vector, payload) in records {
569            let lsn = self.next_lsn;
570            self.next_lsn = lsn.next();
571            entries.push(WalEntry {
572                lsn,
573                op: WalOp::Upsert {
574                    collection_id: collection,
575                    external_id: ext_id.to_string(),
576                    vector: f32_to_le_bytes(vector),
577                    payload: payload.to_vec(),
578                },
579            });
580        }
581
582        // Append all records without syncing, then ONE fdatasync.
583        for entry in &entries {
584            self.wal.append(self.keyring.catalog_codec(), entry)?;
585        }
586        self.wal.sync()?;
587
588        // Publish and apply in commit order.
589        for entry in &entries {
590            self.publish(entry);
591            if let WalOp::Upsert {
592                external_id,
593                vector,
594                payload,
595                ..
596            } = &entry.op
597            {
598                let state = self
599                    .collections
600                    .get_mut(&collection)
601                    .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
602                state.apply_upsert(external_id, vector.clone(), payload.clone());
603            }
604        }
605        Ok(records.len() as u64)
606    }
607
608    /// Delete a point by external id. Returns whether it existed.
609    pub fn delete(&mut self, collection: CollectionId, external_id: &str) -> Result<bool> {
610        let existed = self
611            .collections
612            .get(&collection)
613            .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?
614            .primary
615            .contains_key(external_id);
616        if !existed {
617            return Ok(false);
618        }
619        let lsn = self.next_lsn;
620        let entry = WalEntry {
621            lsn,
622            op: WalOp::Delete {
623                collection_id: collection,
624                external_id: external_id.to_owned(),
625            },
626        };
627        self.wal.append_sync(self.keyring.catalog_codec(), &entry)?;
628        self.next_lsn = lsn.next();
629        self.publish(&entry);
630        let state = self
631            .collections
632            .get_mut(&collection)
633            .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
634        state.apply_delete(external_id);
635        Ok(true)
636    }
637
638    /// Build the validated [`WalOp`] that [`Store::create_collection`] would log,
639    /// **without** applying it. The per-shard Raft write path (ADR-0067) proposes
640    /// this op through consensus so a quorum commits it before any member applies
641    /// it (via [`Store::apply_replicated`]). The new collection's id is assigned
642    /// here, on the leader, and carried in the op exactly as a direct create would
643    /// — so every member applies the same id; the caller serializes concurrent
644    /// creates so two cannot claim the same `next_collection_id`.
645    pub fn prepare_create_collection(&self, name: &str, descriptor: &Descriptor) -> Result<WalOp> {
646        if self.name_index.contains_key(name) {
647            return Err(CoreError::AlreadyExists(format!("collection {name}")));
648        }
649        if descriptor.dim == 0 {
650            return Err(CoreError::InvalidArgument(
651                "dim must be non-zero".to_owned(),
652            ));
653        }
654        Ok(WalOp::CreateCollection {
655            collection_id: CollectionId(self.next_collection_id),
656            name: name.to_owned(),
657            descriptor: postcard::to_allocvec(descriptor)?,
658        })
659    }
660
661    /// Build the validated [`WalOp::Upsert`] that [`Store::upsert`] would log,
662    /// without applying it (the Raft write path; see
663    /// [`Store::prepare_create_collection`]). The vector is encoded identically to
664    /// the direct path, so a member applying the proposed op reaches the same state
665    /// a direct upsert would.
666    pub fn prepare_upsert(
667        &self,
668        collection: CollectionId,
669        external_id: &str,
670        vector: &[f32],
671        payload: &[u8],
672    ) -> Result<WalOp> {
673        let dim = self
674            .collections
675            .get(&collection)
676            .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?
677            .descriptor
678            .dim as usize;
679        if vector.len() != dim {
680            return Err(CoreError::InvalidArgument(format!(
681                "vector has {} dims, collection expects {dim}",
682                vector.len()
683            )));
684        }
685        Ok(WalOp::Upsert {
686            collection_id: collection,
687            external_id: external_id.to_owned(),
688            vector: f32_to_le_bytes(vector),
689            payload: payload.to_vec(),
690        })
691    }
692
693    /// Build the [`WalOp::Delete`] that [`Store::delete`] would log, or `None` if
694    /// the point does not exist, without applying it (the Raft write path).
695    pub fn prepare_delete(
696        &self,
697        collection: CollectionId,
698        external_id: &str,
699    ) -> Result<Option<WalOp>> {
700        let existed = self
701            .collections
702            .get(&collection)
703            .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?
704            .primary
705            .contains_key(external_id);
706        Ok(existed.then(|| WalOp::Delete {
707            collection_id: collection,
708            external_id: external_id.to_owned(),
709        }))
710    }
711
712    /// Fetch a point by external id.
713    pub fn get(&self, collection: CollectionId, external_id: &str) -> Result<Option<Record>> {
714        let state = self
715            .collections
716            .get(&collection)
717            .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
718        match state.primary.get(external_id).copied() {
719            Some(loc) => Ok(Some(self.record_at(state, loc)?)),
720            None => Ok(None),
721        }
722    }
723
724    /// Iterate every live `(external_id, record)` in a collection, in id order.
725    /// Used to build the in-memory index and for brute-force scans.
726    pub fn scan(&self, collection: CollectionId) -> Result<Vec<(String, Record)>> {
727        let state = self
728            .collections
729            .get(&collection)
730            .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
731        let mut out = Vec::with_capacity(state.primary.len());
732        for (id, &loc) in &state.primary {
733            out.push((id.clone(), self.record_at(state, loc)?));
734        }
735        Ok(out)
736    }
737
738    // Materialize the record at `loc`, reading from the active buffer or the
739    // sealed segment (decrypting and CRC-checking the touched pages).
740    fn record_at(&self, state: &CollectionState, loc: Loc) -> Result<Record> {
741        match loc {
742            Loc::Active(r) => {
743                let row = state
744                    .active
745                    .get(r as usize)
746                    .ok_or_else(|| CoreError::MalformedPage(format!("dangling active row {r}")))?;
747                Ok(Record {
748                    vector: le_bytes_to_f32(&row.vector),
749                    payload: row.payload.clone(),
750                })
751            }
752            Loc::Sealed { seg, row } => {
753                let segment = state.sealed.get(seg as usize).ok_or_else(|| {
754                    CoreError::MalformedPage(format!("dangling segment index {seg}"))
755                })?;
756                let vector_bytes = segment.read_vector(state.codec.as_ref(), row, state.stride)?;
757                let payload = segment.read_payload(state.codec.as_ref(), row)?;
758                Ok(Record {
759                    vector: le_bytes_to_f32(&vector_bytes),
760                    payload,
761                })
762            }
763        }
764    }
765
766    /// The id of a collection by name, if it exists.
767    #[must_use]
768    pub fn collection_id(&self, name: &str) -> Option<CollectionId> {
769        self.name_index.get(name).copied()
770    }
771
772    /// The descriptor of a collection, if it exists.
773    #[must_use]
774    pub fn descriptor(&self, collection: CollectionId) -> Option<&Descriptor> {
775        self.collections.get(&collection).map(|s| &s.descriptor)
776    }
777
778    /// A clone of a collection's page codec, for a component that seals its own
779    /// files with that collection's key — e.g. a disk-resident index artifact
780    /// (ADR-0019). The same owned handle can both write and `mmap`-open the
781    /// artifact, so it shares the collection's data-encryption key.
782    ///
783    /// # Errors
784    /// [`CoreError::NotFound`] if the collection is unknown.
785    pub fn collection_codec_clone(&self, collection: CollectionId) -> Result<Box<dyn PageCodec>> {
786        self.collections
787            .get(&collection)
788            .map(|s| s.codec.clone_box())
789            .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))
790    }
791
792    /// The store's root data directory.
793    #[must_use]
794    pub fn dir(&self) -> &Path {
795        &self.dir
796    }
797
798    /// The current manifest version — the catalog generation a snapshot of this
799    /// store captures (ADR-0050).
800    #[must_use]
801    pub fn manifest_version(&self) -> u64 {
802        self.manifest_version
803    }
804
805    /// The directory that holds a collection's index artifacts
806    /// (`<data_dir>/collections/<id>/index`). Not created by this call.
807    #[must_use]
808    pub fn index_dir(&self, collection: CollectionId) -> PathBuf {
809        collection_dir(&self.dir, collection).join("index")
810    }
811
812    /// Read and decrypt the current durable index snapshot for a collection, if
813    /// one is referenced by the manifest (ADR-0025). Returns the opaque blob the
814    /// index layer wrote at the last checkpoint, or `None` if the index must be
815    /// rebuilt (no snapshot, or a store written before v2).
816    ///
817    /// # Errors
818    /// [`CoreError::NotFound`] if the collection is unknown, or an I/O / decrypt /
819    /// page-integrity error reading the snapshot file.
820    pub fn read_index_snapshot(&self, collection: CollectionId) -> Result<Option<Vec<u8>>> {
821        let state = self
822            .collections
823            .get(&collection)
824            .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
825        let Some(snap) = &state.index_snapshot else {
826            return Ok(None);
827        };
828        let path = self
829            .index_dir(collection)
830            .join(index_snapshot_file_name(snap.id));
831        let body = read_paged(&path, state.codec.as_ref(), PageType::IndexBlock)?;
832        Ok(Some(body))
833    }
834
835    /// The post-checkpoint mutations a restored index snapshot must replay to
836    /// catch up to the current state (ADR-0025): the active-buffer upserts and the
837    /// external ids whose checkpointed row died this window. Both are bounded by
838    /// the checkpoint cadence, not the collection size.
839    ///
840    /// # Errors
841    /// [`CoreError::NotFound`] if the collection is unknown.
842    pub fn recovery_tail(&self, collection: CollectionId) -> Result<RecoveryTail> {
843        let state = self
844            .collections
845            .get(&collection)
846            .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
847        let mut upserts = Vec::with_capacity(state.active_index.len());
848        for (ext_id, &row) in &state.active_index {
849            let ar = &state.active[row as usize];
850            upserts.push((
851                ext_id.clone(),
852                Record {
853                    vector: le_bytes_to_f32(&ar.vector),
854                    payload: ar.payload.clone(),
855                },
856            ));
857        }
858        let mut deleted = Vec::new();
859        for (&seg_idx, bitmap) in &state.dead_this_window {
860            if let Some(seg) = state.sealed.get(seg_idx as usize) {
861                let row_ids = seg.row_ids();
862                for row in bitmap.iter() {
863                    if let Some(ext) = row_ids.get(row as usize) {
864                        deleted.push(ext.clone());
865                    }
866                }
867            }
868        }
869        Ok(RecoveryTail { upserts, deleted })
870    }
871
872    /// The number of live rows in a collection.
873    pub fn len(&self, collection: CollectionId) -> Result<usize> {
874        Ok(self
875            .collections
876            .get(&collection)
877            .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?
878            .primary
879            .len())
880    }
881
882    /// Whether a collection has no live rows.
883    pub fn is_empty(&self, collection: CollectionId) -> Result<bool> {
884        Ok(self.len(collection)? == 0)
885    }
886
887    /// Names of all collections, sorted.
888    #[must_use]
889    pub fn collection_names(&self) -> Vec<String> {
890        let mut names: Vec<String> = self.name_index.keys().cloned().collect();
891        names.sort();
892        names
893    }
894
895    /// The live external ids whose payload satisfies an indexable `predicate`,
896    /// resolved through the sealed segments' secondary indexes (`.sec`) plus a
897    /// scan of the active buffer. The result is sorted and de-duplicated. This is
898    /// the pre-filter primitive the query planner builds hybrid search on.
899    ///
900    /// # Errors
901    /// [`CoreError::NotFound`] if the collection is unknown, or
902    /// [`CoreError::InvalidArgument`] if the predicate's field is not declared
903    /// filterable in the collection schema.
904    pub fn matching_ids(
905        &self,
906        collection: CollectionId,
907        predicate: &SecPredicate,
908    ) -> Result<Vec<String>> {
909        let state = self
910            .collections
911            .get(&collection)
912            .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
913        let field_type = state
914            .descriptor
915            .filterable
916            .iter()
917            .find(|f| f.path == predicate.field())
918            .map(|f| f.field_type)
919            .ok_or_else(|| {
920                CoreError::InvalidArgument(format!("field {} is not filterable", predicate.field()))
921            })?;
922
923        let mut out: Vec<String> = Vec::new();
924        // Sealed segments: query each `.sec`, keeping rows still live here (a row
925        // dead or shadowed no longer has the primary index pointing at it).
926        for (seg_idx, segment) in state.sealed.iter().enumerate() {
927            let seg_idx = seg_idx as u32;
928            let Some(rows) = segment.sec_query(predicate)? else {
929                continue;
930            };
931            for row in rows {
932                if segment.is_dead(row) {
933                    continue;
934                }
935                let Some(ext_id) = segment.row_ids().get(row as usize) else {
936                    continue;
937                };
938                if matches!(
939                    state.primary.get(ext_id),
940                    Some(Loc::Sealed { seg: s, row: r }) if *s == seg_idx && *r == row
941                ) {
942                    out.push(ext_id.clone());
943                }
944            }
945        }
946        // Active (un-checkpointed) rows: evaluate the predicate directly.
947        for (ext_id, &row) in &state.active_index {
948            if let Some(active) = state.active.get(row as usize)
949                && sec::payload_matches(predicate, field_type, &active.payload)
950            {
951                out.push(ext_id.clone());
952            }
953        }
954        out.sort();
955        out.dedup();
956        Ok(out)
957    }
958
959    /// Seal everything changed since the last checkpoint into new immutable
960    /// segments, install a new manifest atomically, rotate the WAL, and reclaim
961    /// superseded files. A no-op if nothing has changed since the last
962    /// checkpoint. Crash-safe at every step (see the module docs).
963    ///
964    /// Equivalent to [`Store::checkpoint_with_index_snapshots`] with no index
965    /// snapshots (any existing snapshot references are cleared).
966    pub fn checkpoint(&mut self) -> Result<()> {
967        self.checkpoint_with_index_snapshots(&HashMap::new())
968    }
969
970    /// Like [`Store::checkpoint`], but also durably captures the supplied
971    /// per-collection index snapshots (ADR-0025): each opaque blob is sealed with
972    /// its collection's codec, fsync'd, and referenced by the same atomic manifest
973    /// swap that publishes the segments — so the `(segments, index)` pair is
974    /// consistent at one LSN. The map is the *complete* set for this checkpoint; a
975    /// collection absent from it has any existing snapshot cleared, so a
976    /// referenced snapshot's LSN always equals the new checkpoint's LSN.
977    pub fn checkpoint_with_index_snapshots(
978        &mut self,
979        index_snapshots: &HashMap<CollectionId, Vec<u8>>,
980    ) -> Result<()> {
981        let last_lsn = Lsn(self.next_lsn.value().saturating_sub(1));
982        if last_lsn.value() <= self.last_checkpointed_lsn.value() {
983            return Ok(()); // nothing new since the last checkpoint
984        }
985        let mut cids: Vec<CollectionId> = self.collections.keys().copied().collect();
986        cids.sort();
987        let segment_lsn_low = self.last_checkpointed_lsn.next();
988        let new_version = self.manifest_version + 1;
989
990        // Phase A: for each collection with pending changes, persist the window's
991        // dead rows into the affected segments' `.del` bitmaps, seal the active
992        // buffer into a new segment (if any), and re-open it ready to install.
993        let mut pending: HashMap<CollectionId, PendingSegment> = HashMap::new();
994        for &cid in &cids {
995            if !self.collections[&cid].has_pending() {
996                continue;
997            }
998            let seg_dir = segments_dir(&self.dir, cid);
999            fs::create_dir_all(&seg_dir).map_err(|e| CoreError::io(&seg_dir, e))?;
1000            // This collection's own codec (its data-encryption key under an
1001            // envelope key-ring) seals its segments and tombstone bitmaps.
1002            let codec = self.collections[&cid].codec.clone_box();
1003
1004            // Merge the window's dead rows into each affected segment's tombstone
1005            // bitmap and rewrite it atomically (temp + rename).
1006            {
1007                let state = &self.collections[&cid];
1008                for (&seg_idx, newly_dead) in &state.dead_this_window {
1009                    if let Some(seg) = state.sealed.get(seg_idx as usize) {
1010                        let mut merged = seg.dead_bitmap();
1011                        merged |= newly_dead;
1012                        segment::write_del(&seg_dir, seg.seg_id, codec.as_ref(), &merged)?;
1013                    }
1014                }
1015            }
1016
1017            // Seal the active buffer (in deterministic id order) into a new
1018            // segment, if there is anything to seal. The borrow of
1019            // `self.collections` ends with this block, before the commit phase.
1020            let new_seg = if self.collections[&cid].active_index.is_empty() {
1021                None
1022            } else {
1023                let seg_id = self.next_segment_id;
1024                self.next_segment_id += 1;
1025                let row_count = {
1026                    let state = &self.collections[&cid];
1027                    let seal_rows: Vec<SealRow<'_>> = state
1028                        .active_index
1029                        .iter()
1030                        .map(|(id, &row)| SealRow {
1031                            external_id: id,
1032                            vector: &state.active[row as usize].vector,
1033                            payload: &state.active[row as usize].payload,
1034                        })
1035                        .collect();
1036                    segment::write_segment(
1037                        &seg_dir,
1038                        seg_id,
1039                        codec.as_ref(),
1040                        &seal_rows,
1041                        &state.descriptor.filterable,
1042                    )?;
1043                    seal_rows.len() as u64
1044                };
1045                Some((seg_id, row_count))
1046            };
1047
1048            // Make the new files and their parent directories durable before the
1049            // manifest references them.
1050            fsync_dir(&seg_dir)?;
1051            fsync_dir(&collection_dir(&self.dir, cid))?;
1052            fsync_dir(&self.dir.join("collections"))?;
1053            fsync_dir(&self.dir)?;
1054
1055            if let Some((seg_id, row_count)) = new_seg {
1056                let sealed = segment::open_segment(&seg_dir, seg_id, codec.as_ref())?;
1057                pending.insert(
1058                    cid,
1059                    PendingSegment {
1060                        seg_ref: SegmentRef {
1061                            id: seg_id,
1062                            row_count,
1063                            lsn_low: segment_lsn_low,
1064                            lsn_high: last_lsn,
1065                        },
1066                        sealed,
1067                    },
1068                );
1069            }
1070        }
1071
1072        // Phase A2: persist the supplied index snapshots (ADR-0025), each sealed
1073        // with its collection's codec and fsync'd, so the manifest swap can
1074        // reference a durable file. The snapshot file id is the new manifest
1075        // version, unique per checkpoint.
1076        let mut new_index_refs: HashMap<CollectionId, IndexSnapshotRef> = HashMap::new();
1077        for &cid in &cids {
1078            let Some(blob) = index_snapshots.get(&cid) else {
1079                continue;
1080            };
1081            let index_dir = self.index_dir(cid);
1082            fs::create_dir_all(&index_dir).map_err(|e| CoreError::io(&index_dir, e))?;
1083            let codec = self.collections[&cid].codec.clone_box();
1084            let path = index_dir.join(index_snapshot_file_name(new_version));
1085            write_paged(
1086                &path,
1087                codec.as_ref(),
1088                PageType::IndexBlock,
1089                new_version,
1090                blob,
1091            )?;
1092            fsync_dir(&index_dir)?;
1093            fsync_dir(&collection_dir(&self.dir, cid))?;
1094            new_index_refs.insert(
1095                cid,
1096                IndexSnapshotRef {
1097                    id: new_version,
1098                    lsn: last_lsn,
1099                },
1100            );
1101        }
1102
1103        // Phase B: build and atomically install the new manifest.
1104        let mut entries = Vec::with_capacity(cids.len());
1105        for &cid in &cids {
1106            let state = &self.collections[&cid];
1107            let mut segs = state.segments_meta.clone();
1108            if let Some(p) = pending.get(&cid) {
1109                segs.push(p.seg_ref.clone());
1110            }
1111            entries.push(CollectionEntry {
1112                id: state.id,
1113                name: state.name.clone(),
1114                descriptor: postcard::to_allocvec(&state.descriptor)?,
1115                segments: segs,
1116                index_snapshot: new_index_refs.get(&cid).cloned(),
1117            });
1118        }
1119        let new_manifest = Manifest {
1120            format_version: MANIFEST_FORMAT_VERSION,
1121            version: new_version,
1122            last_checkpointed_lsn: last_lsn,
1123            next_collection_id: self.next_collection_id,
1124            next_segment_id: self.next_segment_id,
1125            collections: entries,
1126        };
1127        manifest::write_manifest(&self.dir, &new_manifest, self.keyring.catalog_codec())?;
1128
1129        // Phase C: commit in-memory state, rotate the WAL, GC superseded files.
1130        self.manifest_version = new_version;
1131        self.last_checkpointed_lsn = last_lsn;
1132        for &cid in &cids {
1133            let Some(state) = self.collections.get_mut(&cid) else {
1134                continue;
1135            };
1136            // Fold this window's dead rows into the in-memory segment bitmaps
1137            // (the `.del` files were already persisted in Phase A).
1138            let dead_window = std::mem::take(&mut state.dead_this_window);
1139            for (seg_idx, bitmap) in dead_window {
1140                if let Some(seg) = state.sealed.get_mut(seg_idx as usize) {
1141                    seg.mark_dead(&bitmap);
1142                }
1143            }
1144            // Install the new segment, if any, repointing its now-sealed ids.
1145            if let Some(p) = pending.remove(&cid) {
1146                let seg_idx = state.sealed.len() as u32;
1147                for (row, ext_id) in p.sealed.row_ids().iter().enumerate() {
1148                    state.primary.insert(
1149                        ext_id.clone(),
1150                        Loc::Sealed {
1151                            seg: seg_idx,
1152                            row: row as u32,
1153                        },
1154                    );
1155                }
1156                state.sealed.push(p.sealed);
1157                state.segments_meta.push(p.seg_ref);
1158            }
1159            state.active.clear();
1160            state.active_index.clear();
1161            state.index_snapshot = new_index_refs.get(&cid).cloned();
1162        }
1163        self.rotate_wal()?;
1164        gc_orphan_segments(&self.dir, &new_manifest, self.keyring.as_ref())?;
1165        gc_orphan_index_snapshots(&self.dir, &new_manifest)?;
1166        self.auto_compact()?;
1167        Ok(())
1168    }
1169
1170    /// Compact every collection with reclaimable space: merge its sealed segments,
1171    /// dropping dead (deleted or shadowed) rows, into a single fresh segment. Each
1172    /// collection commits via its own atomic manifest swap and is crash-safe like
1173    /// a checkpoint — the old segments stay valid until the swap, so a crash
1174    /// before it leaves the pre-compaction state intact.
1175    pub fn compact(&mut self) -> Result<()> {
1176        for cid in self.sorted_cids() {
1177            if self.reclaimable(cid) {
1178                self.compact_collection(cid)?;
1179            }
1180        }
1181        Ok(())
1182    }
1183
1184    // Compact only collections that have crossed the automatic threshold; run at
1185    // the end of a checkpoint.
1186    fn auto_compact(&mut self) -> Result<()> {
1187        for cid in self.sorted_cids() {
1188            if self.needs_compaction(cid) {
1189                self.compact_collection(cid)?;
1190            }
1191        }
1192        Ok(())
1193    }
1194
1195    fn sorted_cids(&self) -> Vec<CollectionId> {
1196        let mut cids: Vec<CollectionId> = self.collections.keys().copied().collect();
1197        cids.sort();
1198        cids
1199    }
1200
1201    // Whether a collection has any space to reclaim: more than one segment to
1202    // merge, or any dead rows in a segment.
1203    fn reclaimable(&self, cid: CollectionId) -> bool {
1204        self.collections.get(&cid).is_some_and(|s| {
1205            s.sealed.len() > 1
1206                || s.sealed
1207                    .iter()
1208                    .any(|seg| seg.live_count() < u64::from(seg.row_count()))
1209        })
1210    }
1211
1212    // Whether a collection has crossed the automatic compaction threshold: many
1213    // segments to merge, or at least half of its sealed rows dead.
1214    fn needs_compaction(&self, cid: CollectionId) -> bool {
1215        let Some(s) = self.collections.get(&cid) else {
1216            return false;
1217        };
1218        if s.sealed.is_empty() {
1219            return false;
1220        }
1221        let total: u64 = s.sealed.iter().map(|seg| u64::from(seg.row_count())).sum();
1222        let live: u64 = s.sealed.iter().map(SealedSegment::live_count).sum();
1223        s.sealed.len() >= COMPACT_MIN_SEGMENTS || (total > 0 && (total - live) * 2 >= total)
1224    }
1225
1226    // Merge one collection's sealed segments into a single fresh segment holding
1227    // only its live rows, install it atomically, and reclaim the old files.
1228    fn compact_collection(&mut self, cid: CollectionId) -> Result<()> {
1229        // This collection's own codec (its DEK under an envelope key-ring) seals
1230        // both the rows read from the old segments and the merged one written.
1231        let codec = self
1232            .collections
1233            .get(&cid)
1234            .ok_or_else(|| CoreError::NotFound(format!("collection {cid}")))?
1235            .codec
1236            .clone_box();
1237        // Gather the live sealed rows (active rows are untouched). `primary` is
1238        // ordered, so the rewritten segment is deterministic.
1239        let live: Vec<(String, Vec<u8>, Vec<u8>)> = {
1240            let state = self
1241                .collections
1242                .get(&cid)
1243                .ok_or_else(|| CoreError::NotFound(format!("collection {cid}")))?;
1244            let mut out = Vec::with_capacity(state.primary.len());
1245            for (ext_id, &loc) in &state.primary {
1246                if let Loc::Sealed { seg, row } = loc {
1247                    let segment = state.sealed.get(seg as usize).ok_or_else(|| {
1248                        CoreError::MalformedPage(format!("dangling segment index {seg}"))
1249                    })?;
1250                    let vector = segment.read_vector(codec.as_ref(), row, state.stride)?;
1251                    let payload = segment.read_payload(codec.as_ref(), row)?;
1252                    out.push((ext_id.clone(), vector, payload));
1253                }
1254            }
1255            out
1256        };
1257
1258        // The merged segment spans the full lsn range of its inputs.
1259        let (lsn_low, lsn_high) = {
1260            let state = &self.collections[&cid];
1261            let low = state
1262                .segments_meta
1263                .iter()
1264                .map(|s| s.lsn_low.value())
1265                .min()
1266                .map(Lsn)
1267                .unwrap_or(Lsn::ZERO);
1268            let high = state
1269                .segments_meta
1270                .iter()
1271                .map(|s| s.lsn_high.value())
1272                .max()
1273                .map(Lsn)
1274                .unwrap_or(self.last_checkpointed_lsn);
1275            (low, high)
1276        };
1277
1278        let seg_id = self.next_segment_id;
1279        self.next_segment_id += 1;
1280        let seg_dir = segments_dir(&self.dir, cid);
1281        fs::create_dir_all(&seg_dir).map_err(|e| CoreError::io(&seg_dir, e))?;
1282        let seal_rows: Vec<SealRow<'_>> = live
1283            .iter()
1284            .map(|(id, v, p)| SealRow {
1285                external_id: id,
1286                vector: v,
1287                payload: p,
1288            })
1289            .collect();
1290        segment::write_segment(
1291            &seg_dir,
1292            seg_id,
1293            codec.as_ref(),
1294            &seal_rows,
1295            &self.collections[&cid].descriptor.filterable,
1296        )?;
1297        fsync_dir(&seg_dir)?;
1298        fsync_dir(&collection_dir(&self.dir, cid))?;
1299        fsync_dir(&self.dir.join("collections"))?;
1300        fsync_dir(&self.dir)?;
1301        let new_ref = SegmentRef {
1302            id: seg_id,
1303            row_count: seal_rows.len() as u64,
1304            lsn_low,
1305            lsn_high,
1306        };
1307        let sealed = segment::open_segment(&seg_dir, seg_id, codec.as_ref())?;
1308
1309        // New manifest: this collection now has exactly one segment; others are
1310        // unchanged. The atomic swap is the commit point.
1311        let new_version = self.manifest_version + 1;
1312        let mut entries = Vec::with_capacity(self.collections.len());
1313        for &other in &self.sorted_cids() {
1314            let state = &self.collections[&other];
1315            let segs = if other == cid {
1316                vec![new_ref.clone()]
1317            } else {
1318                state.segments_meta.clone()
1319            };
1320            entries.push(CollectionEntry {
1321                id: state.id,
1322                name: state.name.clone(),
1323                descriptor: postcard::to_allocvec(&state.descriptor)?,
1324                segments: segs,
1325                index_snapshot: state.index_snapshot.clone(),
1326            });
1327        }
1328        let new_manifest = Manifest {
1329            format_version: MANIFEST_FORMAT_VERSION,
1330            version: new_version,
1331            last_checkpointed_lsn: self.last_checkpointed_lsn,
1332            next_collection_id: self.next_collection_id,
1333            next_segment_id: self.next_segment_id,
1334            collections: entries,
1335        };
1336        manifest::write_manifest(&self.dir, &new_manifest, self.keyring.catalog_codec())?;
1337
1338        // Commit: replace the segments (dropping the old mmaps before the files
1339        // are reclaimed), repoint the now-merged ids, and drop pending tombstones
1340        // (their rows no longer exist).
1341        self.manifest_version = new_version;
1342        let row_ids: Vec<String> = sealed.row_ids().to_vec();
1343        if let Some(state) = self.collections.get_mut(&cid) {
1344            state.sealed = vec![sealed];
1345            state.segments_meta = vec![new_ref];
1346            state.dead_this_window.clear();
1347            for (row, ext_id) in row_ids.into_iter().enumerate() {
1348                state.primary.insert(
1349                    ext_id,
1350                    Loc::Sealed {
1351                        seg: 0,
1352                        row: row as u32,
1353                    },
1354                );
1355            }
1356        }
1357        gc_orphan_segments(&self.dir, &new_manifest, self.keyring.as_ref())?;
1358        gc_orphan_index_snapshots(&self.dir, &new_manifest)?;
1359        Ok(())
1360    }
1361
1362    // Start a new WAL segment and delete every older one (all of their records
1363    // are now <= last_checkpointed_lsn and captured in segments).
1364    fn rotate_wal(&mut self) -> Result<()> {
1365        let wal_dir = self.dir.join("wal");
1366        let old_seq = self.wal_seq;
1367        let new_seq = old_seq + 1;
1368        let new_wal = WalWriter::create(&wal_file_path(&wal_dir, new_seq), self.next_lsn)?;
1369        fsync_dir(&wal_dir)?;
1370        self.wal = new_wal;
1371        self.wal_seq = new_seq;
1372        for (seq, path) in list_wal_files(&wal_dir)? {
1373            if seq <= old_seq {
1374                remove_file_if_present(&path)?;
1375            }
1376        }
1377        fsync_dir(&wal_dir)?;
1378        Ok(())
1379    }
1380}
1381
1382// Apply a recovered WAL record to the in-memory state during open. Upserts land
1383// in the active buffer (and are re-sealed at the next checkpoint); deletes remove
1384// from the primary index and are recorded for tombstoning.
1385fn apply_wal_entry(
1386    collections: &mut HashMap<CollectionId, CollectionState>,
1387    name_index: &mut HashMap<String, CollectionId>,
1388    entry: &WalEntry,
1389    keyring: &dyn KeyRing,
1390) -> Result<()> {
1391    match &entry.op {
1392        WalOp::CreateCollection {
1393            collection_id,
1394            name,
1395            descriptor,
1396        } => {
1397            let descriptor = Descriptor::decode(descriptor)?;
1398            // The key material was provisioned before this record was made
1399            // durable, so the collection's codec is available on replay.
1400            let codec = keyring.collection_codec(*collection_id)?;
1401            name_index.insert(name.clone(), *collection_id);
1402            collections.insert(
1403                *collection_id,
1404                CollectionState::new(*collection_id, name.clone(), descriptor, codec),
1405            );
1406        }
1407        WalOp::DropCollection { collection_id } => {
1408            if let Some(state) = collections.remove(collection_id) {
1409                name_index.remove(&state.name);
1410            }
1411        }
1412        WalOp::Upsert {
1413            collection_id,
1414            external_id,
1415            vector,
1416            payload,
1417        } => {
1418            if let Some(state) = collections.get_mut(collection_id) {
1419                state.apply_upsert(external_id, vector.clone(), payload.clone());
1420            }
1421        }
1422        WalOp::Delete {
1423            collection_id,
1424            external_id,
1425        } => {
1426            if let Some(state) = collections.get_mut(collection_id) {
1427                state.apply_delete(external_id);
1428            }
1429        }
1430        // The manifest is the authoritative checkpoint record; explicit
1431        // Checkpoint WAL records are not emitted and are a no-op here.
1432        WalOp::Checkpoint { .. } => {}
1433    }
1434    Ok(())
1435}
1436
1437// Delete superseded segment files (and whole dropped-collection directories)
1438// that the manifest no longer references.
1439fn gc_orphan_segments(dir: &Path, mfst: &Manifest, keyring: &dyn KeyRing) -> Result<()> {
1440    let collections_dir = dir.join("collections");
1441    if !collections_dir.exists() {
1442        return Ok(());
1443    }
1444    let mut referenced: HashSet<(u64, u64)> = HashSet::new();
1445    let mut live_collections: HashSet<u64> = HashSet::new();
1446    for c in &mfst.collections {
1447        live_collections.insert(c.id.value());
1448        for s in &c.segments {
1449            referenced.insert((c.id.value(), s.id));
1450        }
1451    }
1452    for entry in fs::read_dir(&collections_dir).map_err(|e| CoreError::io(&collections_dir, e))? {
1453        let entry = entry.map_err(|e| CoreError::io(&collections_dir, e))?;
1454        let cdir = entry.path();
1455        let Some(cid) = entry
1456            .file_name()
1457            .to_str()
1458            .and_then(|n| n.parse::<u64>().ok())
1459        else {
1460            continue;
1461        };
1462        if !live_collections.contains(&cid) {
1463            // A dropped collection: crypto-shred its key first (so a crash before
1464            // the files are reclaimed still leaves them unrecoverable), then
1465            // reclaim its whole directory.
1466            keyring.shred_collection(CollectionId(cid))?;
1467            if cdir.is_dir() {
1468                fs::remove_dir_all(&cdir).map_err(|e| CoreError::io(&cdir, e))?;
1469            }
1470            continue;
1471        }
1472        let seg_dir = cdir.join("segments");
1473        if !seg_dir.is_dir() {
1474            continue;
1475        }
1476        for seg in fs::read_dir(&seg_dir).map_err(|e| CoreError::io(&seg_dir, e))? {
1477            let seg = seg.map_err(|e| CoreError::io(&seg_dir, e))?;
1478            let path = seg.path();
1479            let Some(name) = seg.file_name().to_str().map(str::to_owned) else {
1480                continue;
1481            };
1482            // A crash-leftover temp (an interrupted `.del` rewrite) is always junk.
1483            if segment::is_temp_file(&name) {
1484                remove_file_if_present(&path)?;
1485                continue;
1486            }
1487            let Some(seg_id) = segment::seg_id_of_file(&name) else {
1488                continue;
1489            };
1490            if !referenced.contains(&(cid, seg_id)) {
1491                remove_file_if_present(&path)?;
1492            }
1493        }
1494    }
1495    Ok(())
1496}
1497
1498// Delete stale or orphaned index snapshot files (`idx-*`) that a live
1499// collection's manifest entry no longer references — a superseded snapshot, or
1500// one written by a checkpoint that crashed before its manifest swap (ADR-0025).
1501// Non-snapshot index artifacts (e.g. the disk graph) are left untouched; dropped
1502// collections are reclaimed wholesale by `gc_orphan_segments`.
1503fn gc_orphan_index_snapshots(dir: &Path, mfst: &Manifest) -> Result<()> {
1504    for c in &mfst.collections {
1505        let index_dir = collection_dir(dir, c.id).join("index");
1506        if !index_dir.is_dir() {
1507            continue;
1508        }
1509        let keep = c.index_snapshot.as_ref().map(|r| r.id);
1510        for entry in fs::read_dir(&index_dir).map_err(|e| CoreError::io(&index_dir, e))? {
1511            let entry = entry.map_err(|e| CoreError::io(&index_dir, e))?;
1512            let Some(name) = entry.file_name().to_str().map(str::to_owned) else {
1513                continue;
1514            };
1515            let Some(id) = index_snapshot_id_of_file(&name) else {
1516                continue; // not a snapshot file
1517            };
1518            if Some(id) != keep {
1519                remove_file_if_present(&entry.path())?;
1520            }
1521        }
1522    }
1523    Ok(())
1524}
1525
1526fn remove_file_if_present(path: &Path) -> Result<()> {
1527    match fs::remove_file(path) {
1528        Ok(()) => Ok(()),
1529        Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
1530        Err(e) => Err(CoreError::io(path, e)),
1531    }
1532}
1533
1534fn collection_dir(dir: &Path, cid: CollectionId) -> PathBuf {
1535    dir.join("collections").join(format!("{:010}", cid.value()))
1536}
1537
1538fn segments_dir(dir: &Path, cid: CollectionId) -> PathBuf {
1539    collection_dir(dir, cid).join("segments")
1540}
1541
1542// Name of a collection's index snapshot file for snapshot id `id` (ADR-0025);
1543// zero-padded so lexical order matches numeric order.
1544fn index_snapshot_file_name(id: u64) -> String {
1545    format!("idx-{id:010}")
1546}
1547
1548// Parse a snapshot id from an `idx-NNNNNNNNNN` file name, or `None` for any other
1549// file (so non-snapshot index artifacts are ignored by snapshot GC).
1550fn index_snapshot_id_of_file(name: &str) -> Option<u64> {
1551    name.strip_prefix("idx-")
1552        .and_then(|s| s.parse::<u64>().ok())
1553}
1554
1555fn wal_file_path(wal_dir: &Path, seq: u64) -> PathBuf {
1556    wal_dir.join(format!("wal-{seq:010}.log"))
1557}
1558
1559fn list_wal_files(wal_dir: &Path) -> Result<Vec<(u64, PathBuf)>> {
1560    let mut out = Vec::new();
1561    for entry in fs::read_dir(wal_dir).map_err(|e| CoreError::io(wal_dir, e))? {
1562        let entry = entry.map_err(|e| CoreError::io(wal_dir, e))?;
1563        if let Some(seq) = entry.file_name().to_str().and_then(parse_wal_file_name) {
1564            out.push((seq, entry.path()));
1565        }
1566    }
1567    out.sort_by_key(|(seq, _)| *seq);
1568    Ok(out)
1569}
1570
1571fn parse_wal_file_name(name: &str) -> Option<u64> {
1572    name.strip_prefix("wal-")
1573        .and_then(|s| s.strip_suffix(".log"))
1574        .and_then(|s| s.parse::<u64>().ok())
1575}
1576
1577fn f32_to_le_bytes(v: &[f32]) -> Vec<u8> {
1578    let mut out = Vec::with_capacity(v.len() * 4);
1579    for &x in v {
1580        out.extend_from_slice(&x.to_le_bytes());
1581    }
1582    out
1583}
1584
1585fn le_bytes_to_f32(bytes: &[u8]) -> Vec<f32> {
1586    bytes
1587        .chunks_exact(4)
1588        .map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
1589        .collect()
1590}
1591
1592#[cfg(test)]
1593mod tests {
1594    use super::*;
1595    use crate::descriptor::{DistanceMetric, Dtype};
1596
1597    fn desc() -> Descriptor {
1598        Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
1599    }
1600
1601    fn open(dir: &Path) -> Store {
1602        Store::open(dir).unwrap()
1603    }
1604
1605    // Path to a segment's row-directory file, for corruption/orphan tests.
1606    fn seg_dir_file(dir: &Path, cid: CollectionId, seg_id: u64) -> PathBuf {
1607        segments_dir(dir, cid).join(format!("seg-{seg_id:010}.dir"))
1608    }
1609
1610    #[test]
1611    fn upsert_get_delete_in_memory() {
1612        let tmp = tempfile::tempdir().unwrap();
1613        let mut s = open(tmp.path());
1614        let c = s.create_collection("c", desc()).unwrap();
1615        s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
1616        let got = s.get(c, "a").unwrap().unwrap();
1617        assert_eq!(got.vector, vec![1.0, 2.0, 3.0, 4.0]);
1618        assert_eq!(got.payload, b"{}");
1619        assert!(s.delete(c, "a").unwrap());
1620        assert!(s.get(c, "a").unwrap().is_none());
1621        assert!(!s.delete(c, "a").unwrap());
1622    }
1623
1624    #[test]
1625    fn dim_mismatch_is_rejected() {
1626        let tmp = tempfile::tempdir().unwrap();
1627        let mut s = open(tmp.path());
1628        let c = s.create_collection("c", desc()).unwrap();
1629        assert!(matches!(
1630            s.upsert(c, "a", &[1.0, 2.0], b"{}"),
1631            Err(CoreError::InvalidArgument(_))
1632        ));
1633    }
1634
1635    #[test]
1636    fn upsert_batch_commits_all_on_sync() {
1637        let tmp = tempfile::tempdir().unwrap();
1638        {
1639            let mut s = open(tmp.path());
1640            let c = s.create_collection("c", desc()).unwrap();
1641            let vecs: Vec<([f32; 4], String)> = (0..8u32)
1642                .map(|i| ([i as f32; 4], format!("k{i}")))
1643                .collect();
1644            let payload = b"{}";
1645            let records: Vec<(&str, &[f32], &[u8])> = vecs
1646                .iter()
1647                .map(|(v, id)| (id.as_str(), v.as_slice(), payload.as_slice()))
1648                .collect();
1649            let n = s.upsert_batch(c, &records).unwrap();
1650            assert_eq!(n, 8);
1651            // All points readable from in-memory state immediately.
1652            for (_, id) in &vecs {
1653                assert!(s.get(c, id).unwrap().is_some(), "missing {id}");
1654            }
1655        }
1656        // Re-open: WAL replay must restore all 8 points.
1657        let s = open(tmp.path());
1658        let c = s.collection_id("c").unwrap();
1659        assert_eq!(s.len(c).unwrap(), 8);
1660        for i in 0..8u32 {
1661            let got = s.get(c, &format!("k{i}")).unwrap().unwrap();
1662            assert_eq!(got.vector, vec![i as f32; 4]);
1663        }
1664    }
1665
1666    #[test]
1667    fn upsert_batch_dim_mismatch_writes_nothing() {
1668        let tmp = tempfile::tempdir().unwrap();
1669        let mut s = open(tmp.path());
1670        let c = s.create_collection("c", desc()).unwrap();
1671        // First record correct, second has wrong dim — the whole batch must fail.
1672        let bad: &[(&str, &[f32], &[u8])] = &[
1673            ("a", &[1.0, 2.0, 3.0, 4.0], b"{}"),
1674            ("b", &[1.0, 2.0], b"{}"), // wrong dim
1675        ];
1676        assert!(matches!(
1677            s.upsert_batch(c, bad),
1678            Err(CoreError::InvalidArgument(_))
1679        ));
1680        // Neither point was written.
1681        assert!(s.get(c, "a").unwrap().is_none());
1682    }
1683
1684    #[test]
1685    fn duplicate_collection_is_rejected() {
1686        let tmp = tempfile::tempdir().unwrap();
1687        let mut s = open(tmp.path());
1688        s.create_collection("c", desc()).unwrap();
1689        assert!(matches!(
1690            s.create_collection("c", desc()),
1691            Err(CoreError::AlreadyExists(_))
1692        ));
1693    }
1694
1695    #[test]
1696    fn recovers_without_checkpoint_via_wal_replay() {
1697        let tmp = tempfile::tempdir().unwrap();
1698        {
1699            let mut s = open(tmp.path());
1700            let c = s.create_collection("c", desc()).unwrap();
1701            for i in 0..10u32 {
1702                let v = [i as f32; 4];
1703                s.upsert(c, &format!("k{i}"), &v, b"{}").unwrap();
1704            }
1705        }
1706        let s = open(tmp.path());
1707        let c = s.collection_id("c").unwrap();
1708        assert_eq!(s.len(c).unwrap(), 10);
1709        let got = s.get(c, "k7").unwrap().unwrap();
1710        assert_eq!(got.vector, vec![7.0; 4]);
1711    }
1712
1713    #[test]
1714    fn recovers_across_checkpoint_and_wal_tail() {
1715        let tmp = tempfile::tempdir().unwrap();
1716        {
1717            let mut s = open(tmp.path());
1718            let c = s.create_collection("c", desc()).unwrap();
1719            for i in 0..5u32 {
1720                s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"{}")
1721                    .unwrap();
1722            }
1723            s.checkpoint().unwrap();
1724            // Post-checkpoint writes live only in the WAL until recovery.
1725            for i in 5..8u32 {
1726                s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"{}")
1727                    .unwrap();
1728            }
1729            s.delete(c, "k0").unwrap();
1730        }
1731        let s = open(tmp.path());
1732        let c = s.collection_id("c").unwrap();
1733        assert_eq!(s.len(c).unwrap(), 7); // k1..k7
1734        assert!(s.get(c, "k0").unwrap().is_none());
1735        assert_eq!(s.get(c, "k6").unwrap().unwrap().vector, vec![6.0; 4]);
1736    }
1737
1738    #[test]
1739    fn open_with_keyring_round_trips_through_checkpoint() {
1740        let tmp = tempfile::tempdir().unwrap();
1741        {
1742            let mut s =
1743                Store::open_with_keyring(tmp.path(), Box::new(SingleCodecKeyRing::plaintext()))
1744                    .unwrap();
1745            let c = s.create_collection("c", desc()).unwrap();
1746            s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
1747            s.checkpoint().unwrap();
1748            s.upsert(c, "b", &[5.0; 4], b"{}").unwrap();
1749        }
1750        // Reopen through the same key-ring: data recovers from the sealed segment
1751        // and the WAL tail, each opened with the collection's own codec.
1752        let s = Store::open_with_keyring(tmp.path(), Box::new(SingleCodecKeyRing::plaintext()))
1753            .unwrap();
1754        let c = s.collection_id("c").unwrap();
1755        assert_eq!(s.len(c).unwrap(), 2);
1756        assert_eq!(
1757            s.get(c, "a").unwrap().unwrap().vector,
1758            vec![1.0, 2.0, 3.0, 4.0]
1759        );
1760        assert_eq!(s.get(c, "b").unwrap().unwrap().vector, vec![5.0; 4]);
1761    }
1762
1763    #[test]
1764    fn delete_survives_checkpoint() {
1765        let tmp = tempfile::tempdir().unwrap();
1766        {
1767            let mut s = open(tmp.path());
1768            let c = s.create_collection("c", desc()).unwrap();
1769            s.upsert(c, "a", &[1.0; 4], b"{}").unwrap();
1770            s.upsert(c, "b", &[2.0; 4], b"{}").unwrap();
1771            s.checkpoint().unwrap();
1772            s.delete(c, "a").unwrap();
1773            s.checkpoint().unwrap(); // tombstone sealed into a new segment
1774        }
1775        let s = open(tmp.path());
1776        let c = s.collection_id("c").unwrap();
1777        assert!(s.get(c, "a").unwrap().is_none());
1778        assert!(s.get(c, "b").unwrap().is_some());
1779        assert_eq!(s.len(c).unwrap(), 1);
1780    }
1781
1782    #[test]
1783    fn reopen_is_idempotent() {
1784        let tmp = tempfile::tempdir().unwrap();
1785        {
1786            let mut s = open(tmp.path());
1787            let c = s.create_collection("c", desc()).unwrap();
1788            s.upsert(c, "a", &[1.0; 4], b"{}").unwrap();
1789            s.checkpoint().unwrap();
1790            s.upsert(c, "b", &[2.0; 4], b"{}").unwrap();
1791        }
1792        let snapshot = |dir: &Path| {
1793            let s = open(dir);
1794            let c = s.collection_id("c").unwrap();
1795            s.scan(c).unwrap()
1796        };
1797        assert_eq!(snapshot(tmp.path()), snapshot(tmp.path()));
1798    }
1799
1800    #[test]
1801    fn update_then_checkpoint_keeps_latest_value() {
1802        let tmp = tempfile::tempdir().unwrap();
1803        {
1804            let mut s = open(tmp.path());
1805            let c = s.create_collection("c", desc()).unwrap();
1806            s.upsert(c, "a", &[1.0; 4], b"v1").unwrap();
1807            s.checkpoint().unwrap();
1808            s.upsert(c, "a", &[9.0; 4], b"v2").unwrap(); // shadow the sealed row
1809            s.checkpoint().unwrap();
1810        }
1811        let s = open(tmp.path());
1812        let c = s.collection_id("c").unwrap();
1813        let got = s.get(c, "a").unwrap().unwrap();
1814        assert_eq!(got.vector, vec![9.0; 4]);
1815        assert_eq!(got.payload, b"v2");
1816        assert_eq!(s.len(c).unwrap(), 1);
1817    }
1818
1819    #[test]
1820    fn update_within_one_window_seals_latest() {
1821        // Re-upsert the same id several times before any checkpoint: only the
1822        // latest active row must be sealed and recoverable.
1823        let tmp = tempfile::tempdir().unwrap();
1824        {
1825            let mut s = open(tmp.path());
1826            let c = s.create_collection("c", desc()).unwrap();
1827            s.upsert(c, "a", &[1.0; 4], b"v1").unwrap();
1828            s.upsert(c, "a", &[2.0; 4], b"v2").unwrap();
1829            s.upsert(c, "a", &[3.0; 4], b"v3").unwrap();
1830            s.checkpoint().unwrap();
1831        }
1832        let s = open(tmp.path());
1833        let c = s.collection_id("c").unwrap();
1834        assert_eq!(s.len(c).unwrap(), 1);
1835        let got = s.get(c, "a").unwrap().unwrap();
1836        assert_eq!(got.vector, vec![3.0; 4]);
1837        assert_eq!(got.payload, b"v3");
1838    }
1839
1840    #[test]
1841    fn dropped_collection_is_gone_after_reopen() {
1842        let tmp = tempfile::tempdir().unwrap();
1843        {
1844            let mut s = open(tmp.path());
1845            let c = s.create_collection("c", desc()).unwrap();
1846            s.upsert(c, "a", &[1.0; 4], b"{}").unwrap();
1847            s.checkpoint().unwrap();
1848            assert!(s.drop_collection("c").unwrap());
1849            s.checkpoint().unwrap();
1850        }
1851        let s = open(tmp.path());
1852        assert!(s.collection_id("c").is_none());
1853        assert!(s.collection_names().is_empty());
1854    }
1855
1856    #[test]
1857    fn orphan_segment_is_garbage_collected() {
1858        let tmp = tempfile::tempdir().unwrap();
1859        let cid;
1860        {
1861            let mut s = open(tmp.path());
1862            let c = s.create_collection("c", desc()).unwrap();
1863            cid = c;
1864            s.upsert(c, "a", &[1.0; 4], b"{}").unwrap();
1865            s.checkpoint().unwrap();
1866        }
1867        // Drop a stray segment file the manifest does not reference.
1868        let stray = segments_dir(tmp.path(), cid).join("seg-0000009999.vec");
1869        fs::write(&stray, b"junk").unwrap();
1870        assert!(stray.exists());
1871        let _s = open(tmp.path());
1872        assert!(!stray.exists(), "orphan segment should be GC'd on open");
1873    }
1874
1875    #[test]
1876    fn corrupt_segment_is_detected_not_served() {
1877        let tmp = tempfile::tempdir().unwrap();
1878        let cid;
1879        {
1880            let mut s = open(tmp.path());
1881            let c = s.create_collection("c", desc()).unwrap();
1882            cid = c;
1883            s.upsert(c, "a", &[1.0; 4], b"{}").unwrap();
1884            s.checkpoint().unwrap();
1885        }
1886        // Corrupt the sealed segment's row directory (read and verified at open).
1887        // Flip a byte in page 0's live body (the 8-byte length prefix), which the
1888        // CRC covers — a small directory's postcard body does not reach far into
1889        // the 16 KiB page, so a deep offset would land in uncovered padding.
1890        let path = seg_dir_file(tmp.path(), cid, 0);
1891        let mut bytes = fs::read(&path).unwrap();
1892        bytes[33] ^= 0xFF;
1893        fs::write(&path, &bytes).unwrap();
1894        assert!(matches!(
1895            Store::open(tmp.path()),
1896            Err(CoreError::PageCorrupt { .. })
1897        ));
1898    }
1899
1900    #[test]
1901    fn torn_wal_tail_drops_only_unacked_record() {
1902        let tmp = tempfile::tempdir().unwrap();
1903        let wal_path;
1904        {
1905            let mut s = open(tmp.path());
1906            let c = s.create_collection("c", desc()).unwrap();
1907            for i in 0..3u32 {
1908                s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"{}")
1909                    .unwrap();
1910            }
1911            wal_path = wal_file_path(&tmp.path().join("wal"), s.wal_seq);
1912        }
1913        // Append a torn (partial) frame to the tail of the active WAL.
1914        {
1915            use std::io::Write as _;
1916            let mut f = fs::OpenOptions::new().append(true).open(&wal_path).unwrap();
1917            f.write_all(&[0xAA, 0xBB, 0xCC]).unwrap();
1918            f.sync_data().unwrap();
1919        }
1920        let s = open(tmp.path());
1921        let c = s.collection_id("c").unwrap();
1922        assert_eq!(s.len(c).unwrap(), 3); // the 3 acked upserts recovered intact
1923    }
1924
1925    #[test]
1926    fn reads_served_from_disk_after_checkpoint() {
1927        // After a checkpoint the active buffer is cleared, so a get must come
1928        // from the sealed segment's mmap'd columns — exercising the disk path.
1929        let tmp = tempfile::tempdir().unwrap();
1930        let mut s = open(tmp.path());
1931        let c = s.create_collection("c", desc()).unwrap();
1932        s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], br#"{"k":1}"#)
1933            .unwrap();
1934        s.checkpoint().unwrap();
1935        let got = s.get(c, "a").unwrap().unwrap();
1936        assert_eq!(got.vector, vec![1.0, 2.0, 3.0, 4.0]);
1937        assert_eq!(got.payload, br#"{"k":1}"#);
1938    }
1939
1940    #[test]
1941    fn high_dim_vectors_straddle_pages() {
1942        // A dimensionality whose stride does not divide the page body, forcing
1943        // vectors to straddle 16 KiB block boundaries in the .vec column.
1944        let tmp = tempfile::tempdir().unwrap();
1945        let mut s = open(tmp.path());
1946        let dim = 1000usize; // stride = 4000 B; ~4 vectors per 16352-B page body
1947        let c = s
1948            .create_collection(
1949                "c",
1950                Descriptor::new(dim as u32, Dtype::F32, DistanceMetric::L2),
1951            )
1952            .unwrap();
1953        for i in 0..20u32 {
1954            let v: Vec<f32> = (0..dim).map(|j| (i as f32) * 1000.0 + j as f32).collect();
1955            s.upsert(c, &format!("k{i}"), &v, b"{}").unwrap();
1956        }
1957        s.checkpoint().unwrap();
1958        let s = open(tmp.path());
1959        let c = s.collection_id("c").unwrap();
1960        for i in 0..20u32 {
1961            let got = s.get(c, &format!("k{i}")).unwrap().unwrap();
1962            let want: Vec<f32> = (0..dim).map(|j| (i as f32) * 1000.0 + j as f32).collect();
1963            assert_eq!(
1964                got.vector, want,
1965                "vector k{i} mismatch after straddling read"
1966            );
1967        }
1968    }
1969
1970    #[test]
1971    fn delete_persists_via_del_bitmap_across_reopen() {
1972        // Five rows in one segment; deleting one is 20% dead with a single
1973        // segment, so auto-compaction does not fire — the delete must survive
1974        // purely via the persisted `.del` tombstone bitmap.
1975        let tmp = tempfile::tempdir().unwrap();
1976        let cid;
1977        {
1978            let mut s = open(tmp.path());
1979            let c = s.create_collection("c", desc()).unwrap();
1980            cid = c;
1981            for i in 0..5u32 {
1982                s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"{}")
1983                    .unwrap();
1984            }
1985            s.checkpoint().unwrap();
1986            s.delete(c, "k2").unwrap();
1987            s.checkpoint().unwrap();
1988            assert_eq!(
1989                s.collections[&c].sealed.len(),
1990                1,
1991                "no new segment for a delete-only window"
1992            );
1993        }
1994        // The tombstone bitmap was written for segment 0.
1995        assert!(
1996            segments_dir(tmp.path(), cid)
1997                .join("seg-0000000000.del")
1998                .exists(),
1999            ".del must be persisted for the deleted row"
2000        );
2001        let s = open(tmp.path());
2002        let c = s.collection_id("c").unwrap();
2003        assert!(s.get(c, "k2").unwrap().is_none());
2004        assert_eq!(s.len(c).unwrap(), 4);
2005        for i in [0u32, 1, 3, 4] {
2006            assert!(s.get(c, &format!("k{i}")).unwrap().is_some());
2007        }
2008    }
2009
2010    #[test]
2011    fn shadowed_row_is_tombstoned_and_latest_wins() {
2012        let tmp = tempfile::tempdir().unwrap();
2013        {
2014            let mut s = open(tmp.path());
2015            let c = s.create_collection("c", desc()).unwrap();
2016            for i in 0..5u32 {
2017                s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"v1")
2018                    .unwrap();
2019            }
2020            s.checkpoint().unwrap(); // seg 0
2021            s.upsert(c, "k2", &[99.0; 4], b"v2").unwrap();
2022            s.checkpoint().unwrap(); // seg 1 holds the new k2; seg 0 row tombstoned
2023        }
2024        let s = open(tmp.path());
2025        let c = s.collection_id("c").unwrap();
2026        assert_eq!(s.len(c).unwrap(), 5); // k2 counted once
2027        let got = s.get(c, "k2").unwrap().unwrap();
2028        assert_eq!(got.vector, vec![99.0; 4]);
2029        assert_eq!(got.payload, b"v2");
2030    }
2031
2032    #[test]
2033    fn compaction_merges_segments_reclaims_and_keeps_active_rows() {
2034        let tmp = tempfile::tempdir().unwrap();
2035        let cid;
2036        {
2037            let mut s = open(tmp.path());
2038            let c = s.create_collection("c", desc()).unwrap();
2039            cid = c;
2040            for i in 0..6u32 {
2041                s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"{}")
2042                    .unwrap();
2043            }
2044            s.checkpoint().unwrap(); // seg 0: k0..k5
2045            for i in 6..12u32 {
2046                s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"{}")
2047                    .unwrap();
2048            }
2049            s.checkpoint().unwrap(); // seg 1: k6..k11
2050            s.delete(c, "k0").unwrap();
2051            s.delete(c, "k6").unwrap();
2052            s.checkpoint().unwrap(); // tombstones only; still two segments
2053            assert_eq!(s.collections[&c].sealed.len(), 2);
2054
2055            // An un-checkpointed row must survive the compaction untouched.
2056            s.upsert(c, "fresh", &[7.0; 4], b"new").unwrap();
2057            s.compact().unwrap();
2058            assert_eq!(s.collections[&c].sealed.len(), 1, "segments merged to one");
2059            assert!(
2060                !segments_dir(tmp.path(), cid)
2061                    .join("seg-0000000000.dir")
2062                    .exists(),
2063                "old segment files reclaimed"
2064            );
2065            assert_eq!(s.len(c).unwrap(), 11); // 10 live sealed + 1 active
2066            assert!(s.get(c, "k0").unwrap().is_none());
2067            assert!(s.get(c, "k6").unwrap().is_none());
2068            assert_eq!(s.get(c, "k5").unwrap().unwrap().vector, vec![5.0; 4]);
2069            assert_eq!(s.get(c, "fresh").unwrap().unwrap().payload, b"new");
2070        }
2071        // Everything survives a reopen, including the active row via WAL replay.
2072        let s = open(tmp.path());
2073        let c = s.collection_id("c").unwrap();
2074        assert_eq!(s.collections[&c].sealed.len(), 1);
2075        assert_eq!(s.len(c).unwrap(), 11);
2076        assert!(s.get(c, "k0").unwrap().is_none());
2077        assert_eq!(s.get(c, "fresh").unwrap().unwrap().vector, vec![7.0; 4]);
2078        assert_eq!(s.get(c, "k11").unwrap().unwrap().vector, vec![11.0; 4]);
2079    }
2080
2081    #[test]
2082    fn auto_compaction_merges_many_segments() {
2083        let tmp = tempfile::tempdir().unwrap();
2084        let mut s = open(tmp.path());
2085        let c = s.create_collection("c", desc()).unwrap();
2086        // Eight checkpoints create eight segments; the eighth checkpoint's
2087        // auto-compaction merges them.
2088        for ck in 0..8u32 {
2089            for i in 0..3u32 {
2090                let n = ck * 3 + i;
2091                s.upsert(c, &format!("k{n}"), &[n as f32; 4], b"{}")
2092                    .unwrap();
2093            }
2094            s.checkpoint().unwrap();
2095        }
2096        assert!(
2097            s.collections[&c].sealed.len() < COMPACT_MIN_SEGMENTS,
2098            "auto-compaction should have merged the segments"
2099        );
2100        assert_eq!(s.len(c).unwrap(), 24);
2101        assert_eq!(s.get(c, "k0").unwrap().unwrap().vector, vec![0.0; 4]);
2102        assert_eq!(s.get(c, "k23").unwrap().unwrap().vector, vec![23.0; 4]);
2103    }
2104
2105    #[test]
2106    fn matching_ids_spans_secondary_index_and_active_buffer() {
2107        use crate::descriptor::FilterableField;
2108        use crate::sec::SecValue;
2109
2110        let tmp = tempfile::tempdir().unwrap();
2111        let mut s = open(tmp.path());
2112        let descriptor = Descriptor::new(4, Dtype::F32, DistanceMetric::L2).with_filterable(vec![
2113            FilterableField::keyword("city"),
2114            FilterableField::numeric("age"),
2115        ]);
2116        let c = s.create_collection("c", descriptor).unwrap();
2117        s.upsert(c, "a", &[0.0; 4], br#"{"city":"paris","age":30}"#)
2118            .unwrap();
2119        s.upsert(c, "b", &[0.0; 4], br#"{"city":"lyon","age":25}"#)
2120            .unwrap();
2121        s.upsert(c, "d", &[0.0; 4], br#"{"city":"paris","age":40}"#)
2122            .unwrap();
2123        s.checkpoint().unwrap();
2124        // An active (un-checkpointed) row, matched by scanning the buffer.
2125        s.upsert(c, "e", &[0.0; 4], br#"{"city":"paris","age":22}"#)
2126            .unwrap();
2127
2128        let paris = || SecPredicate::Eq {
2129            field: "city".into(),
2130            value: SecValue::Keyword("paris".into()),
2131        };
2132        assert_eq!(s.matching_ids(c, &paris()).unwrap(), ["a", "d", "e"]);
2133
2134        // Numeric range [25, 35]: 30 (a, sealed) and 25 (b, sealed); not 40 or 22.
2135        assert_eq!(
2136            s.matching_ids(
2137                c,
2138                &SecPredicate::Range {
2139                    field: "age".into(),
2140                    lo: Some(SecValue::Numeric(25.0)),
2141                    hi: Some(SecValue::Numeric(35.0)),
2142                    lo_inclusive: true,
2143                    hi_inclusive: true,
2144                }
2145            )
2146            .unwrap(),
2147            ["a", "b"]
2148        );
2149
2150        // Deleting a sealed row drops it via the primary-consistency check.
2151        s.delete(c, "a").unwrap();
2152        assert_eq!(s.matching_ids(c, &paris()).unwrap(), ["d", "e"]);
2153
2154        // A non-filterable field is rejected (the planner must post-filter it).
2155        assert!(matches!(
2156            s.matching_ids(
2157                c,
2158                &SecPredicate::Eq {
2159                    field: "country".into(),
2160                    value: SecValue::Keyword("fr".into()),
2161                }
2162            ),
2163            Err(CoreError::InvalidArgument(_))
2164        ));
2165
2166        // Checkpoint seals the active row and the deletion; results survive reopen.
2167        s.checkpoint().unwrap();
2168        let s = open(tmp.path());
2169        let c = s.collection_id("c").unwrap();
2170        assert_eq!(s.matching_ids(c, &paris()).unwrap(), ["d", "e"]);
2171    }
2172
2173    // ----- durable index snapshots (ADR-0025) -----
2174
2175    // The `idx-*` snapshot files currently on disk for a collection, sorted.
2176    fn index_snapshot_files(dir: &Path, cid: CollectionId) -> Vec<String> {
2177        let idx = collection_dir(dir, cid).join("index");
2178        let mut names: Vec<String> = fs::read_dir(&idx)
2179            .map(|rd| {
2180                rd.filter_map(std::result::Result::ok)
2181                    .filter_map(|e| e.file_name().to_str().map(str::to_owned))
2182                    .filter(|n| n.starts_with("idx-"))
2183                    .collect()
2184            })
2185            .unwrap_or_default();
2186        names.sort();
2187        names
2188    }
2189
2190    #[test]
2191    fn index_snapshot_round_trips_through_checkpoint_and_reopen() {
2192        let tmp = tempfile::tempdir().unwrap();
2193        let blob = b"opaque-index-bytes".to_vec();
2194        let cid = {
2195            let mut s = open(tmp.path());
2196            let c = s.create_collection("c", desc()).unwrap();
2197            s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
2198            s.checkpoint_with_index_snapshots(&HashMap::from([(c, blob.clone())]))
2199                .unwrap();
2200            // Available immediately, exactly one snapshot file on disk.
2201            assert_eq!(s.read_index_snapshot(c).unwrap(), Some(blob.clone()));
2202            assert_eq!(index_snapshot_files(tmp.path(), c).len(), 1);
2203            c
2204        };
2205        // Survives reopen, loaded via the manifest reference.
2206        let s = open(tmp.path());
2207        assert_eq!(s.read_index_snapshot(cid).unwrap(), Some(blob));
2208    }
2209
2210    #[test]
2211    fn checkpoint_without_a_snapshot_clears_and_reclaims_it() {
2212        let tmp = tempfile::tempdir().unwrap();
2213        let mut s = open(tmp.path());
2214        let c = s.create_collection("c", desc()).unwrap();
2215        s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
2216        s.checkpoint_with_index_snapshots(&HashMap::from([(c, b"blob".to_vec())]))
2217            .unwrap();
2218        assert!(s.read_index_snapshot(c).unwrap().is_some());
2219
2220        // A later plain checkpoint (with new data) carries no snapshot → cleared.
2221        s.upsert(c, "b", &[5.0, 6.0, 7.0, 8.0], b"{}").unwrap();
2222        s.checkpoint().unwrap();
2223        assert_eq!(s.read_index_snapshot(c).unwrap(), None);
2224        assert!(index_snapshot_files(tmp.path(), c).is_empty());
2225
2226        let s = open(tmp.path());
2227        assert_eq!(s.read_index_snapshot(c).unwrap(), None);
2228    }
2229
2230    #[test]
2231    fn a_new_snapshot_supersedes_and_reclaims_the_old_one() {
2232        let tmp = tempfile::tempdir().unwrap();
2233        let mut s = open(tmp.path());
2234        let c = s.create_collection("c", desc()).unwrap();
2235        s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
2236        s.checkpoint_with_index_snapshots(&HashMap::from([(c, b"first".to_vec())]))
2237            .unwrap();
2238        s.upsert(c, "b", &[5.0, 6.0, 7.0, 8.0], b"{}").unwrap();
2239        s.checkpoint_with_index_snapshots(&HashMap::from([(c, b"second".to_vec())]))
2240            .unwrap();
2241
2242        assert_eq!(s.read_index_snapshot(c).unwrap(), Some(b"second".to_vec()));
2243        assert_eq!(index_snapshot_files(tmp.path(), c).len(), 1);
2244    }
2245
2246    #[test]
2247    fn compaction_preserves_the_index_snapshot() {
2248        let tmp = tempfile::tempdir().unwrap();
2249        let mut s = open(tmp.path());
2250        let c = s.create_collection("c", desc()).unwrap();
2251        s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
2252        s.checkpoint_with_index_snapshots(&HashMap::from([(c, b"keep".to_vec())]))
2253            .unwrap();
2254        // More changes, then re-snapshot at the new floor and compact.
2255        s.upsert(c, "b", &[5.0, 6.0, 7.0, 8.0], b"{}").unwrap();
2256        s.delete(c, "a").unwrap();
2257        s.checkpoint_with_index_snapshots(&HashMap::from([(c, b"keep".to_vec())]))
2258            .unwrap();
2259        s.compact().unwrap();
2260        assert_eq!(s.read_index_snapshot(c).unwrap(), Some(b"keep".to_vec()));
2261
2262        let s = open(tmp.path());
2263        assert_eq!(s.read_index_snapshot(c).unwrap(), Some(b"keep".to_vec()));
2264    }
2265
2266    #[test]
2267    fn orphan_index_snapshot_is_reclaimed_on_open() {
2268        let tmp = tempfile::tempdir().unwrap();
2269        let cid = {
2270            let mut s = open(tmp.path());
2271            let c = s.create_collection("c", desc()).unwrap();
2272            s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
2273            s.checkpoint_with_index_snapshots(&HashMap::from([(c, b"live".to_vec())]))
2274                .unwrap();
2275            // Simulate a checkpoint that wrote a snapshot file but crashed before
2276            // the manifest swap: an unreferenced idx-* in the index dir.
2277            let stray = s.index_dir(c).join("idx-9999999999");
2278            fs::write(&stray, b"orphan").unwrap();
2279            c
2280        };
2281        let s = open(tmp.path());
2282        // Recovery reclaims the orphan but keeps the referenced snapshot.
2283        assert!(!s.index_dir(cid).join("idx-9999999999").exists());
2284        assert_eq!(s.read_index_snapshot(cid).unwrap(), Some(b"live".to_vec()));
2285    }
2286}