Skip to main content

quiver_core/
store.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2//! The storage engine: a durable, crash-safe vector store per collection.
3//!
4//! A [`Store`] ties the [`crate::wal`] and [`crate::manifest`] primitives,
5//! together with immutable `segment`s in the row-addressed on-disk
6//! format (ADR-0004), into a recoverable engine. The durability contract
7//! (ADR-0005): a mutation is acknowledged only after its WAL record is `fsync`'d,
8//! so an acknowledged write survives `kill -9`.
9//!
10//! ## Memory model
11//! Vectors and payloads live on disk in sealed segments and are read through an
12//! `mmap`, decrypted on demand — only the working set is resident. The engine
13//! keeps in RAM a **primary index** (external id → row location) per collection,
14//! plus the **active buffer**: the rows upserted since the last checkpoint, which
15//! are also durable in the WAL. A read resolves the id to either an active row or
16//! a `(segment, row)` and fetches the bytes from the active buffer or the segment.
17//!
18//! ## Write path
19//! `upsert`/`delete`/`create_collection`/`drop_collection` append a WAL record,
20//! `fsync` it (acknowledgement), then update in-memory state. `checkpoint` seals
21//! the active buffer into a new immutable segment per collection, persists the
22//! window's deletes and shadowed rows into the affected segments' `.del`
23//! tombstone bitmaps, atomically swaps in a manifest, rotates the WAL, and
24//! garbage-collects superseded files.
25//!
26//! ## Recovery (on open)
27//! Read `CURRENT` → load the manifest → for each referenced segment, read its
28//! row directory and tombstone bitmap and rebuild the primary index (a row marked
29//! dead in its segment is skipped, so each id is live in exactly one segment) →
30//! replay every WAL record with `lsn > last_checkpointed_lsn` idempotently into
31//! the active buffer → garbage-collect orphan segment files a crash left between a
32//! flush and the manifest swap. A torn trailing WAL record fails its frame check
33//! and is dropped; it was never acknowledged.
34//!
35//! ## Concurrency
36//! Phase 1/2 is a single-writer engine: mutations take `&mut self`, reads take
37//! `&self`. The lock-free MVCC snapshot model (ADR-0006) arrives with the
38//! server integration; until then a server wraps the store in a lock.
39
40use std::collections::{BTreeMap, HashMap, HashSet};
41use std::fs;
42use std::path::{Path, PathBuf};
43use std::sync::Arc;
44
45use roaring::RoaringBitmap;
46
47use crate::descriptor::Descriptor;
48use crate::error::{CoreError, Result};
49use crate::ids::{CollectionId, Lsn};
50use crate::keyring::{KeyRing, SingleCodecKeyRing};
51use crate::manifest::{
52    self, CollectionEntry, IndexSnapshotRef, MANIFEST_FORMAT_VERSION, Manifest, SegmentRef,
53};
54use crate::page::{PageCodec, PageType};
55use crate::paged::{fsync_dir, read_paged, write_paged};
56use crate::sec::{self, SecPredicate};
57use crate::segment::{self, SealRow, SealedSegment};
58use crate::wal::{self, WalEntry, WalOp, WalWriter};
59
60/// Number of sealed segments at which a checkpoint auto-compacts a collection,
61/// merging them to keep reads and recovery from fanning out across many files.
62const COMPACT_MIN_SEGMENTS: usize = 8;
63
64/// A stored record returned by reads: the decoded vector and opaque payload.
65#[derive(Debug, Clone, PartialEq)]
66pub struct Record {
67    /// The vector, decoded from its on-disk little-endian bytes.
68    pub vector: Vec<f32>,
69    /// The opaque payload bytes (validated UTF-8 JSON at the API edge).
70    pub payload: Vec<u8>,
71}
72
73/// The post-checkpoint mutations a restored index snapshot must replay to reach
74/// the current state — the WAL tail `open` already applied to the store
75/// (ADR-0025). Both lists are bounded by the checkpoint cadence, not the
76/// collection size.
77#[derive(Debug, Default)]
78pub struct RecoveryTail {
79    /// Live rows upserted since the last checkpoint (the active buffer).
80    pub upserts: Vec<(String, Record)>,
81    /// External ids whose pre-checkpoint row died this window (deleted, or
82    /// shadowed by a re-upsert) and so must be removed from a restored index.
83    pub deleted: Vec<String>,
84}
85
86// Where a live row's bytes are: in the in-RAM active buffer, or in a sealed
87// segment at `(segment index, row)`.
88#[derive(Debug, Clone, Copy)]
89enum Loc {
90    Active(u32),
91    Sealed { seg: u32, row: u32 },
92}
93
94// A row buffered in RAM since the last checkpoint. Also durable in the WAL until
95// the checkpoint seals it to disk.
96#[derive(Debug, Clone)]
97struct ActiveRow {
98    vector: Vec<u8>,
99    payload: Vec<u8>,
100}
101
102// In-memory state of one collection.
103struct CollectionState {
104    id: CollectionId,
105    name: String,
106    descriptor: Descriptor,
107    // The codec that seals this collection's segments and index artifacts —
108    // its own data-encryption key under an envelope key-ring, or the shared
109    // codec under a single-codec key-ring. Built once from the key-ring at
110    // create/open and held so reads need no per-call key derivation.
111    codec: Box<dyn PageCodec>,
112    // Bytes per vector (`dim × dtype size`), cached from the descriptor.
113    stride: usize,
114    // Live external id → location. The authority for `get`/`len`/`scan`; ordered
115    // so `scan` yields ids deterministically.
116    primary: BTreeMap<String, Loc>,
117    // Sealed segments in creation order; `Loc::Sealed.seg` indexes this.
118    sealed: Vec<SealedSegment>,
119    // Manifest segment refs, parallel to `sealed`.
120    segments_meta: Vec<SegmentRef>,
121    // Rows upserted since the last checkpoint; index = `Loc::Active` row.
122    active: Vec<ActiveRow>,
123    // Live external id → its latest active row, for sealing at the next checkpoint.
124    active_index: BTreeMap<String, u32>,
125    // Sealed-segment rows that died this window (deleted or shadowed), keyed by
126    // segment index; merged into each segment's `.del` at the next checkpoint.
127    dead_this_window: BTreeMap<u32, RoaringBitmap>,
128    // The durable index snapshot reference (ADR-0025): loaded from the manifest on
129    // open, refreshed at each checkpoint; `None` if the index is rebuilt on open.
130    index_snapshot: Option<IndexSnapshotRef>,
131}
132
133impl CollectionState {
134    fn new(
135        id: CollectionId,
136        name: String,
137        descriptor: Descriptor,
138        codec: Box<dyn PageCodec>,
139    ) -> Self {
140        let stride = descriptor.stride();
141        Self {
142            id,
143            name,
144            descriptor,
145            codec,
146            stride,
147            primary: BTreeMap::new(),
148            sealed: Vec::new(),
149            segments_meta: Vec::new(),
150            active: Vec::new(),
151            active_index: BTreeMap::new(),
152            dead_this_window: BTreeMap::new(),
153            index_snapshot: None,
154        }
155    }
156
157    fn has_pending(&self) -> bool {
158        !self.active_index.is_empty() || !self.dead_this_window.is_empty()
159    }
160
161    // Apply an upsert to in-memory state (shared by the write path and WAL
162    // replay). If the id currently lives in a sealed segment, that row is now
163    // shadowed and recorded for tombstoning at the next checkpoint.
164    fn apply_upsert(&mut self, external_id: &str, vector: Vec<u8>, payload: Vec<u8>) {
165        if let Some(Loc::Sealed { seg, row }) = self.primary.get(external_id).copied() {
166            self.dead_this_window.entry(seg).or_default().insert(row);
167        }
168        let row = self.active.len() as u32;
169        self.active.push(ActiveRow { vector, payload });
170        self.active_index.insert(external_id.to_owned(), row);
171        self.primary
172            .insert(external_id.to_owned(), Loc::Active(row));
173    }
174
175    // Apply a delete to in-memory state (shared by the write path and WAL
176    // replay). Returns whether the id existed. A deleted sealed row is recorded
177    // for tombstoning; a deleted active row is simply dropped from the buffer.
178    fn apply_delete(&mut self, external_id: &str) -> bool {
179        match self.primary.remove(external_id) {
180            Some(Loc::Sealed { seg, row }) => {
181                self.dead_this_window.entry(seg).or_default().insert(row);
182                self.active_index.remove(external_id);
183                true
184            }
185            Some(Loc::Active(_)) => {
186                self.active_index.remove(external_id);
187                true
188            }
189            None => false,
190        }
191    }
192}
193
194// A segment written during a checkpoint, opened and ready to install after the
195// manifest swap commits. The repointing ids come from `sealed.row_ids()`.
196struct PendingSegment {
197    seg_ref: SegmentRef,
198    sealed: SealedSegment,
199}
200
201/// A synchronous hook invoked with each committed [`WalEntry`], in commit order.
202/// Leader-follower replication (ADR-0030) installs one to publish each op to its
203/// replication stream. A plain `Fn` keeps the engine runtime-agnostic — no async
204/// dependency leaks into `quiver-core`.
205pub type CommitObserver = Arc<dyn Fn(&WalEntry) + Send + Sync>;
206
207/// The durable storage engine for one data directory.
208pub struct Store {
209    dir: PathBuf,
210    keyring: Box<dyn KeyRing>,
211    collections: HashMap<CollectionId, CollectionState>,
212    name_index: HashMap<String, CollectionId>,
213    next_lsn: Lsn,
214    next_collection_id: u64,
215    next_segment_id: u64,
216    manifest_version: u64,
217    last_checkpointed_lsn: Lsn,
218    wal: WalWriter,
219    wal_seq: u64,
220    commit_observer: Option<CommitObserver>,
221}
222
223impl Store {
224    /// Open (creating if absent) the store at `dir` with encryption-at-rest
225    /// disabled (the plaintext codec). Runs full crash recovery.
226    pub fn open(dir: &Path) -> Result<Self> {
227        Self::open_with_keyring(dir, Box::new(SingleCodecKeyRing::plaintext()))
228    }
229
230    /// Open the store sealing every byte — catalog and all collections — with a
231    /// single [`PageCodec`]. Used by `quiver-crypto` to enable encryption-at-rest
232    /// under one root key (no per-collection envelope). Runs full crash recovery.
233    pub fn open_with_codec(dir: &Path, codec: Box<dyn PageCodec>) -> Result<Self> {
234        Self::open_with_keyring(dir, Box::new(SingleCodecKeyRing::new(codec)))
235    }
236
237    /// Open the store with a [`KeyRing`] supplying a catalog codec (manifest and
238    /// WAL) and a per-collection codec (segments and index artifacts). This is
239    /// the seam `quiver-crypto`'s envelope key-ring uses to seal each collection
240    /// under its own data-encryption key, enabling crypto-shredding. Runs full
241    /// crash recovery.
242    pub fn open_with_keyring(dir: &Path, keyring: Box<dyn KeyRing>) -> Result<Self> {
243        fs::create_dir_all(dir).map_err(|e| CoreError::io(dir, e))?;
244        let wal_dir = dir.join("wal");
245        fs::create_dir_all(&wal_dir).map_err(|e| CoreError::io(&wal_dir, e))?;
246        fsync_dir(dir)?;
247        fsync_dir(&wal_dir)?;
248
249        // 1. Load the manifest (or start empty).
250        let mfst = manifest::read_current(dir, keyring.catalog_codec())?.unwrap_or_default();
251
252        // 2. Rebuild the primary index from the sealed segments the manifest
253        //    references. A row tombstoned in its segment's `.del` is skipped, so
254        //    each external id is added from the single segment in which it is live.
255        let mut collections: HashMap<CollectionId, CollectionState> = HashMap::new();
256        let mut name_index: HashMap<String, CollectionId> = HashMap::new();
257        for entry in &mfst.collections {
258            let descriptor = Descriptor::decode(&entry.descriptor)?;
259            let codec = keyring.collection_codec(entry.id)?;
260            let mut state = CollectionState::new(entry.id, entry.name.clone(), descriptor, codec);
261            state.segments_meta = entry.segments.clone();
262            state.index_snapshot = entry.index_snapshot.clone();
263            let seg_dir = segments_dir(dir, entry.id);
264            for seg in &entry.segments {
265                let sealed = segment::open_segment(&seg_dir, seg.id, state.codec.as_ref())?;
266                let seg_idx = state.sealed.len() as u32;
267                for (row, ext_id) in sealed.row_ids().iter().enumerate() {
268                    let row = row as u32;
269                    if !sealed.is_dead(row) {
270                        state
271                            .primary
272                            .insert(ext_id.clone(), Loc::Sealed { seg: seg_idx, row });
273                    }
274                }
275                state.sealed.push(sealed);
276            }
277            name_index.insert(state.name.clone(), state.id);
278            collections.insert(state.id, state);
279        }
280
281        // 3. Replay the WAL tail (records past the checkpoint), idempotently.
282        let floor = mfst.last_checkpointed_lsn;
283        let mut max_lsn = floor;
284        let wal_files = list_wal_files(&wal_dir)?;
285        let mut max_seq = 0u64;
286        let mut keep_seqs: HashSet<u64> = HashSet::new();
287        for (seq, path) in &wal_files {
288            max_seq = (*seq).max(max_seq);
289            let replay = wal::read_all(path, keyring.catalog_codec())?;
290            let mut had_live = false;
291            for entry in replay.entries {
292                if entry.lsn.value() <= floor.value() {
293                    continue; // already captured in a segment
294                }
295                had_live = true;
296                if entry.lsn > max_lsn {
297                    max_lsn = entry.lsn;
298                }
299                apply_wal_entry(&mut collections, &mut name_index, &entry, keyring.as_ref())?;
300            }
301            if had_live {
302                keep_seqs.insert(*seq);
303            }
304        }
305        let next_lsn = max_lsn.next();
306
307        // 4. GC orphan segment files not referenced by the manifest (a crash
308        //    between a segment flush and the manifest swap), then the analogous
309        //    orphan/superseded index snapshots (ADR-0025).
310        gc_orphan_segments(dir, &mfst, keyring.as_ref())?;
311        gc_orphan_index_snapshots(dir, &mfst)?;
312
313        // 5. Start a fresh WAL segment for new appends, then drop superseded WAL
314        //    files (empty or fully below the checkpoint).
315        let wal_seq = max_seq + 1;
316        let wal = WalWriter::create(&wal_file_path(&wal_dir, wal_seq), next_lsn)?;
317        fsync_dir(&wal_dir)?;
318        for (seq, path) in &wal_files {
319            if !keep_seqs.contains(seq) {
320                remove_file_if_present(path)?;
321            }
322        }
323        fsync_dir(&wal_dir)?;
324
325        Ok(Self {
326            dir: dir.to_path_buf(),
327            keyring,
328            collections,
329            name_index,
330            next_lsn,
331            next_collection_id: mfst.next_collection_id,
332            next_segment_id: mfst.next_segment_id,
333            manifest_version: mfst.version,
334            last_checkpointed_lsn: floor,
335            wal,
336            wal_seq,
337            commit_observer: None,
338        })
339    }
340
341    /// Install a hook invoked with each committed [`WalEntry`], in commit order
342    /// (ADR-0030). Used by the server to drive a leader's replication stream;
343    /// replaces any previous observer.
344    pub fn set_commit_observer(&mut self, observer: CommitObserver) {
345        self.commit_observer = Some(observer);
346    }
347
348    // Notify the commit observer (if any) of a durably-committed entry.
349    fn publish(&self, entry: &WalEntry) {
350        if let Some(observer) = &self.commit_observer {
351            observer(entry);
352        }
353    }
354
355    /// The operations that recreate the store's current logical state, for a
356    /// replication follower to bootstrap from (ADR-0030): a `CreateCollection`
357    /// per collection, each followed by an `Upsert` per live point. Collections
358    /// are emitted before their points so a follower can apply the stream in
359    /// order.
360    pub fn replication_snapshot(&self) -> Result<Vec<WalOp>> {
361        let mut ops = Vec::new();
362        for (&id, state) in &self.collections {
363            ops.push(WalOp::CreateCollection {
364                collection_id: id,
365                name: state.name.clone(),
366                descriptor: postcard::to_allocvec(&state.descriptor)?,
367            });
368            for (external_id, record) in self.scan(id)? {
369                ops.push(WalOp::Upsert {
370                    collection_id: id,
371                    external_id,
372                    vector: f32_to_le_bytes(&record.vector),
373                    payload: record.payload,
374                });
375            }
376        }
377        Ok(ops)
378    }
379
380    /// Apply a replicated operation received from a leader (ADR-0030). The op is
381    /// persisted to *this* node's WAL under a locally-assigned LSN — preserving
382    /// the leader's collection id so later ops resolve — then applied to in-memory
383    /// state through the same path crash recovery uses. `Checkpoint` ops are a
384    /// per-node concern and are ignored; followers checkpoint themselves.
385    pub fn apply_replicated(&mut self, op: WalOp) -> Result<()> {
386        if let WalOp::Checkpoint { .. } = op {
387            return Ok(());
388        }
389        if let WalOp::CreateCollection { collection_id, .. } = &op {
390            // Provision key material before the collection's codec is needed, and
391            // keep the local id allocator ahead of the leader's ids.
392            self.keyring.provision_collection(*collection_id)?;
393            self.next_collection_id = self.next_collection_id.max(collection_id.0 + 1);
394        }
395        let lsn = self.next_lsn;
396        let entry = WalEntry { lsn, op };
397        self.wal.append_sync(self.keyring.catalog_codec(), &entry)?;
398        self.next_lsn = lsn.next();
399        apply_wal_entry(
400            &mut self.collections,
401            &mut self.name_index,
402            &entry,
403            self.keyring.as_ref(),
404        )?;
405        self.publish(&entry);
406        Ok(())
407    }
408
409    /// Create a collection. Fails if the name is already taken.
410    pub fn create_collection(
411        &mut self,
412        name: &str,
413        descriptor: Descriptor,
414    ) -> Result<CollectionId> {
415        if self.name_index.contains_key(name) {
416            return Err(CoreError::AlreadyExists(format!("collection {name}")));
417        }
418        if descriptor.dim == 0 {
419            return Err(CoreError::InvalidArgument(
420                "dim must be non-zero".to_owned(),
421            ));
422        }
423        let id = CollectionId(self.next_collection_id);
424        // Provision the collection's key material before its first durable record
425        // references it, so WAL replay on recovery can always open what it needs.
426        self.keyring.provision_collection(id)?;
427        let descriptor_bytes = postcard::to_allocvec(&descriptor)?;
428        let lsn = self.next_lsn;
429        let entry = WalEntry {
430            lsn,
431            op: WalOp::CreateCollection {
432                collection_id: id,
433                name: name.to_owned(),
434                descriptor: descriptor_bytes,
435            },
436        };
437        self.wal.append_sync(self.keyring.catalog_codec(), &entry)?;
438        self.next_lsn = lsn.next();
439        self.publish(&entry);
440        self.next_collection_id += 1;
441        let codec = self.keyring.collection_codec(id)?;
442        self.collections.insert(
443            id,
444            CollectionState::new(id, name.to_owned(), descriptor, codec),
445        );
446        self.name_index.insert(name.to_owned(), id);
447        Ok(id)
448    }
449
450    /// Drop a collection and all of its data. Its segment files are reclaimed at
451    /// the next checkpoint or the next open. Returns whether it existed.
452    pub fn drop_collection(&mut self, name: &str) -> Result<bool> {
453        let Some(&id) = self.name_index.get(name) else {
454            return Ok(false);
455        };
456        let lsn = self.next_lsn;
457        let entry = WalEntry {
458            lsn,
459            op: WalOp::DropCollection { collection_id: id },
460        };
461        self.wal.append_sync(self.keyring.catalog_codec(), &entry)?;
462        self.next_lsn = lsn.next();
463        self.publish(&entry);
464        self.collections.remove(&id);
465        self.name_index.remove(name);
466        Ok(true)
467    }
468
469    /// Crypto-shred a collection: drop it, checkpoint so the manifest no longer
470    /// references it and its files are reclaimed, then destroy its key material.
471    /// After this its sealed segments and index are unrecoverable even to the
472    /// master-key holder (ADR-0010); with a single-codec key-ring there is no
473    /// per-collection key, so this is `drop` plus a checkpoint. Returns whether
474    /// the collection existed.
475    pub fn shred_collection(&mut self, name: &str) -> Result<bool> {
476        let Some(id) = self.collection_id(name) else {
477            return Ok(false);
478        };
479        self.drop_collection(name)?;
480        // Seal any un-checkpointed rows into DEK-protected segments and rotate
481        // the WAL, so no live catalog-keyed copy of the collection survives; the
482        // checkpoint's GC then reclaims its files and shreds its key.
483        self.checkpoint()?;
484        // Destroy the key explicitly too, covering a collection that never
485        // reached a segment directory for GC to find. Idempotent.
486        self.keyring.shred_collection(id)?;
487        Ok(true)
488    }
489
490    /// Insert or replace a point. The vector length must equal the collection's
491    /// dimensionality; the payload is stored opaquely. Returns the assigned LSN
492    /// once the write is durable.
493    pub fn upsert(
494        &mut self,
495        collection: CollectionId,
496        external_id: &str,
497        vector: &[f32],
498        payload: &[u8],
499    ) -> Result<Lsn> {
500        let dim = self
501            .collections
502            .get(&collection)
503            .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?
504            .descriptor
505            .dim as usize;
506        if vector.len() != dim {
507            return Err(CoreError::InvalidArgument(format!(
508                "vector has {} dims, collection expects {dim}",
509                vector.len()
510            )));
511        }
512        let vector_bytes = f32_to_le_bytes(vector);
513        let lsn = self.next_lsn;
514        let entry = WalEntry {
515            lsn,
516            op: WalOp::Upsert {
517                collection_id: collection,
518                external_id: external_id.to_owned(),
519                vector: vector_bytes.clone(),
520                payload: payload.to_vec(),
521            },
522        };
523        self.wal.append_sync(self.keyring.catalog_codec(), &entry)?;
524        self.next_lsn = lsn.next();
525        self.publish(&entry);
526        let state = self
527            .collections
528            .get_mut(&collection)
529            .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
530        state.apply_upsert(external_id, vector_bytes, payload.to_vec());
531        Ok(lsn)
532    }
533
534    /// Upsert a batch of points with a **single** `fdatasync` instead of one
535    /// per point.  All records are acknowledged atomically — if the server
536    /// crashes before the sync completes, none of the batch is durable (the
537    /// caller, seeing no response, should retry the whole batch).  This is the
538    /// standard batch-commit pattern used by every production database.
539    ///
540    /// `records` is `(external_id, vector, payload_bytes)` slices; the vectors
541    /// must match the collection's dimensionality or the call returns an error
542    /// before writing anything.
543    pub fn upsert_batch(
544        &mut self,
545        collection: CollectionId,
546        records: &[(&str, &[f32], &[u8])],
547    ) -> Result<u64> {
548        if records.is_empty() {
549            return Ok(0);
550        }
551        let dim = self
552            .collections
553            .get(&collection)
554            .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?
555            .descriptor
556            .dim as usize;
557        for (_, vector, _) in records {
558            if vector.len() != dim {
559                return Err(CoreError::InvalidArgument(format!(
560                    "vector has {} dims, collection expects {dim}",
561                    vector.len()
562                )));
563            }
564        }
565
566        // Build one WalEntry per record, advancing the LSN for each.
567        let mut entries: Vec<WalEntry> = Vec::with_capacity(records.len());
568        for (ext_id, vector, payload) in records {
569            let lsn = self.next_lsn;
570            self.next_lsn = lsn.next();
571            entries.push(WalEntry {
572                lsn,
573                op: WalOp::Upsert {
574                    collection_id: collection,
575                    external_id: ext_id.to_string(),
576                    vector: f32_to_le_bytes(vector),
577                    payload: payload.to_vec(),
578                },
579            });
580        }
581
582        // Append all records without syncing, then ONE fdatasync.
583        for entry in &entries {
584            self.wal.append(self.keyring.catalog_codec(), entry)?;
585        }
586        self.wal.sync()?;
587
588        // Publish and apply in commit order.
589        for entry in &entries {
590            self.publish(entry);
591            if let WalOp::Upsert {
592                external_id,
593                vector,
594                payload,
595                ..
596            } = &entry.op
597            {
598                let state = self
599                    .collections
600                    .get_mut(&collection)
601                    .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
602                state.apply_upsert(external_id, vector.clone(), payload.clone());
603            }
604        }
605        Ok(records.len() as u64)
606    }
607
608    /// Delete a point by external id. Returns whether it existed.
609    pub fn delete(&mut self, collection: CollectionId, external_id: &str) -> Result<bool> {
610        let existed = self
611            .collections
612            .get(&collection)
613            .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?
614            .primary
615            .contains_key(external_id);
616        if !existed {
617            return Ok(false);
618        }
619        let lsn = self.next_lsn;
620        let entry = WalEntry {
621            lsn,
622            op: WalOp::Delete {
623                collection_id: collection,
624                external_id: external_id.to_owned(),
625            },
626        };
627        self.wal.append_sync(self.keyring.catalog_codec(), &entry)?;
628        self.next_lsn = lsn.next();
629        self.publish(&entry);
630        let state = self
631            .collections
632            .get_mut(&collection)
633            .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
634        state.apply_delete(external_id);
635        Ok(true)
636    }
637
638    /// Fetch a point by external id.
639    pub fn get(&self, collection: CollectionId, external_id: &str) -> Result<Option<Record>> {
640        let state = self
641            .collections
642            .get(&collection)
643            .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
644        match state.primary.get(external_id).copied() {
645            Some(loc) => Ok(Some(self.record_at(state, loc)?)),
646            None => Ok(None),
647        }
648    }
649
650    /// Iterate every live `(external_id, record)` in a collection, in id order.
651    /// Used to build the in-memory index and for brute-force scans.
652    pub fn scan(&self, collection: CollectionId) -> Result<Vec<(String, Record)>> {
653        let state = self
654            .collections
655            .get(&collection)
656            .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
657        let mut out = Vec::with_capacity(state.primary.len());
658        for (id, &loc) in &state.primary {
659            out.push((id.clone(), self.record_at(state, loc)?));
660        }
661        Ok(out)
662    }
663
664    // Materialize the record at `loc`, reading from the active buffer or the
665    // sealed segment (decrypting and CRC-checking the touched pages).
666    fn record_at(&self, state: &CollectionState, loc: Loc) -> Result<Record> {
667        match loc {
668            Loc::Active(r) => {
669                let row = state
670                    .active
671                    .get(r as usize)
672                    .ok_or_else(|| CoreError::MalformedPage(format!("dangling active row {r}")))?;
673                Ok(Record {
674                    vector: le_bytes_to_f32(&row.vector),
675                    payload: row.payload.clone(),
676                })
677            }
678            Loc::Sealed { seg, row } => {
679                let segment = state.sealed.get(seg as usize).ok_or_else(|| {
680                    CoreError::MalformedPage(format!("dangling segment index {seg}"))
681                })?;
682                let vector_bytes = segment.read_vector(state.codec.as_ref(), row, state.stride)?;
683                let payload = segment.read_payload(state.codec.as_ref(), row)?;
684                Ok(Record {
685                    vector: le_bytes_to_f32(&vector_bytes),
686                    payload,
687                })
688            }
689        }
690    }
691
692    /// The id of a collection by name, if it exists.
693    #[must_use]
694    pub fn collection_id(&self, name: &str) -> Option<CollectionId> {
695        self.name_index.get(name).copied()
696    }
697
698    /// The descriptor of a collection, if it exists.
699    #[must_use]
700    pub fn descriptor(&self, collection: CollectionId) -> Option<&Descriptor> {
701        self.collections.get(&collection).map(|s| &s.descriptor)
702    }
703
704    /// A clone of a collection's page codec, for a component that seals its own
705    /// files with that collection's key — e.g. a disk-resident index artifact
706    /// (ADR-0019). The same owned handle can both write and `mmap`-open the
707    /// artifact, so it shares the collection's data-encryption key.
708    ///
709    /// # Errors
710    /// [`CoreError::NotFound`] if the collection is unknown.
711    pub fn collection_codec_clone(&self, collection: CollectionId) -> Result<Box<dyn PageCodec>> {
712        self.collections
713            .get(&collection)
714            .map(|s| s.codec.clone_box())
715            .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))
716    }
717
718    /// The store's root data directory.
719    #[must_use]
720    pub fn dir(&self) -> &Path {
721        &self.dir
722    }
723
724    /// The current manifest version — the catalog generation a snapshot of this
725    /// store captures (ADR-0050).
726    #[must_use]
727    pub fn manifest_version(&self) -> u64 {
728        self.manifest_version
729    }
730
731    /// The directory that holds a collection's index artifacts
732    /// (`<data_dir>/collections/<id>/index`). Not created by this call.
733    #[must_use]
734    pub fn index_dir(&self, collection: CollectionId) -> PathBuf {
735        collection_dir(&self.dir, collection).join("index")
736    }
737
738    /// Read and decrypt the current durable index snapshot for a collection, if
739    /// one is referenced by the manifest (ADR-0025). Returns the opaque blob the
740    /// index layer wrote at the last checkpoint, or `None` if the index must be
741    /// rebuilt (no snapshot, or a store written before v2).
742    ///
743    /// # Errors
744    /// [`CoreError::NotFound`] if the collection is unknown, or an I/O / decrypt /
745    /// page-integrity error reading the snapshot file.
746    pub fn read_index_snapshot(&self, collection: CollectionId) -> Result<Option<Vec<u8>>> {
747        let state = self
748            .collections
749            .get(&collection)
750            .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
751        let Some(snap) = &state.index_snapshot else {
752            return Ok(None);
753        };
754        let path = self
755            .index_dir(collection)
756            .join(index_snapshot_file_name(snap.id));
757        let body = read_paged(&path, state.codec.as_ref(), PageType::IndexBlock)?;
758        Ok(Some(body))
759    }
760
761    /// The post-checkpoint mutations a restored index snapshot must replay to
762    /// catch up to the current state (ADR-0025): the active-buffer upserts and the
763    /// external ids whose checkpointed row died this window. Both are bounded by
764    /// the checkpoint cadence, not the collection size.
765    ///
766    /// # Errors
767    /// [`CoreError::NotFound`] if the collection is unknown.
768    pub fn recovery_tail(&self, collection: CollectionId) -> Result<RecoveryTail> {
769        let state = self
770            .collections
771            .get(&collection)
772            .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
773        let mut upserts = Vec::with_capacity(state.active_index.len());
774        for (ext_id, &row) in &state.active_index {
775            let ar = &state.active[row as usize];
776            upserts.push((
777                ext_id.clone(),
778                Record {
779                    vector: le_bytes_to_f32(&ar.vector),
780                    payload: ar.payload.clone(),
781                },
782            ));
783        }
784        let mut deleted = Vec::new();
785        for (&seg_idx, bitmap) in &state.dead_this_window {
786            if let Some(seg) = state.sealed.get(seg_idx as usize) {
787                let row_ids = seg.row_ids();
788                for row in bitmap.iter() {
789                    if let Some(ext) = row_ids.get(row as usize) {
790                        deleted.push(ext.clone());
791                    }
792                }
793            }
794        }
795        Ok(RecoveryTail { upserts, deleted })
796    }
797
798    /// The number of live rows in a collection.
799    pub fn len(&self, collection: CollectionId) -> Result<usize> {
800        Ok(self
801            .collections
802            .get(&collection)
803            .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?
804            .primary
805            .len())
806    }
807
808    /// Whether a collection has no live rows.
809    pub fn is_empty(&self, collection: CollectionId) -> Result<bool> {
810        Ok(self.len(collection)? == 0)
811    }
812
813    /// Names of all collections, sorted.
814    #[must_use]
815    pub fn collection_names(&self) -> Vec<String> {
816        let mut names: Vec<String> = self.name_index.keys().cloned().collect();
817        names.sort();
818        names
819    }
820
821    /// The live external ids whose payload satisfies an indexable `predicate`,
822    /// resolved through the sealed segments' secondary indexes (`.sec`) plus a
823    /// scan of the active buffer. The result is sorted and de-duplicated. This is
824    /// the pre-filter primitive the query planner builds hybrid search on.
825    ///
826    /// # Errors
827    /// [`CoreError::NotFound`] if the collection is unknown, or
828    /// [`CoreError::InvalidArgument`] if the predicate's field is not declared
829    /// filterable in the collection schema.
830    pub fn matching_ids(
831        &self,
832        collection: CollectionId,
833        predicate: &SecPredicate,
834    ) -> Result<Vec<String>> {
835        let state = self
836            .collections
837            .get(&collection)
838            .ok_or_else(|| CoreError::NotFound(format!("collection {collection}")))?;
839        let field_type = state
840            .descriptor
841            .filterable
842            .iter()
843            .find(|f| f.path == predicate.field())
844            .map(|f| f.field_type)
845            .ok_or_else(|| {
846                CoreError::InvalidArgument(format!("field {} is not filterable", predicate.field()))
847            })?;
848
849        let mut out: Vec<String> = Vec::new();
850        // Sealed segments: query each `.sec`, keeping rows still live here (a row
851        // dead or shadowed no longer has the primary index pointing at it).
852        for (seg_idx, segment) in state.sealed.iter().enumerate() {
853            let seg_idx = seg_idx as u32;
854            let Some(rows) = segment.sec_query(predicate)? else {
855                continue;
856            };
857            for row in rows {
858                if segment.is_dead(row) {
859                    continue;
860                }
861                let Some(ext_id) = segment.row_ids().get(row as usize) else {
862                    continue;
863                };
864                if matches!(
865                    state.primary.get(ext_id),
866                    Some(Loc::Sealed { seg: s, row: r }) if *s == seg_idx && *r == row
867                ) {
868                    out.push(ext_id.clone());
869                }
870            }
871        }
872        // Active (un-checkpointed) rows: evaluate the predicate directly.
873        for (ext_id, &row) in &state.active_index {
874            if let Some(active) = state.active.get(row as usize)
875                && sec::payload_matches(predicate, field_type, &active.payload)
876            {
877                out.push(ext_id.clone());
878            }
879        }
880        out.sort();
881        out.dedup();
882        Ok(out)
883    }
884
885    /// Seal everything changed since the last checkpoint into new immutable
886    /// segments, install a new manifest atomically, rotate the WAL, and reclaim
887    /// superseded files. A no-op if nothing has changed since the last
888    /// checkpoint. Crash-safe at every step (see the module docs).
889    ///
890    /// Equivalent to [`Store::checkpoint_with_index_snapshots`] with no index
891    /// snapshots (any existing snapshot references are cleared).
892    pub fn checkpoint(&mut self) -> Result<()> {
893        self.checkpoint_with_index_snapshots(&HashMap::new())
894    }
895
896    /// Like [`Store::checkpoint`], but also durably captures the supplied
897    /// per-collection index snapshots (ADR-0025): each opaque blob is sealed with
898    /// its collection's codec, fsync'd, and referenced by the same atomic manifest
899    /// swap that publishes the segments — so the `(segments, index)` pair is
900    /// consistent at one LSN. The map is the *complete* set for this checkpoint; a
901    /// collection absent from it has any existing snapshot cleared, so a
902    /// referenced snapshot's LSN always equals the new checkpoint's LSN.
903    pub fn checkpoint_with_index_snapshots(
904        &mut self,
905        index_snapshots: &HashMap<CollectionId, Vec<u8>>,
906    ) -> Result<()> {
907        let last_lsn = Lsn(self.next_lsn.value().saturating_sub(1));
908        if last_lsn.value() <= self.last_checkpointed_lsn.value() {
909            return Ok(()); // nothing new since the last checkpoint
910        }
911        let mut cids: Vec<CollectionId> = self.collections.keys().copied().collect();
912        cids.sort();
913        let segment_lsn_low = self.last_checkpointed_lsn.next();
914        let new_version = self.manifest_version + 1;
915
916        // Phase A: for each collection with pending changes, persist the window's
917        // dead rows into the affected segments' `.del` bitmaps, seal the active
918        // buffer into a new segment (if any), and re-open it ready to install.
919        let mut pending: HashMap<CollectionId, PendingSegment> = HashMap::new();
920        for &cid in &cids {
921            if !self.collections[&cid].has_pending() {
922                continue;
923            }
924            let seg_dir = segments_dir(&self.dir, cid);
925            fs::create_dir_all(&seg_dir).map_err(|e| CoreError::io(&seg_dir, e))?;
926            // This collection's own codec (its data-encryption key under an
927            // envelope key-ring) seals its segments and tombstone bitmaps.
928            let codec = self.collections[&cid].codec.clone_box();
929
930            // Merge the window's dead rows into each affected segment's tombstone
931            // bitmap and rewrite it atomically (temp + rename).
932            {
933                let state = &self.collections[&cid];
934                for (&seg_idx, newly_dead) in &state.dead_this_window {
935                    if let Some(seg) = state.sealed.get(seg_idx as usize) {
936                        let mut merged = seg.dead_bitmap();
937                        merged |= newly_dead;
938                        segment::write_del(&seg_dir, seg.seg_id, codec.as_ref(), &merged)?;
939                    }
940                }
941            }
942
943            // Seal the active buffer (in deterministic id order) into a new
944            // segment, if there is anything to seal. The borrow of
945            // `self.collections` ends with this block, before the commit phase.
946            let new_seg = if self.collections[&cid].active_index.is_empty() {
947                None
948            } else {
949                let seg_id = self.next_segment_id;
950                self.next_segment_id += 1;
951                let row_count = {
952                    let state = &self.collections[&cid];
953                    let seal_rows: Vec<SealRow<'_>> = state
954                        .active_index
955                        .iter()
956                        .map(|(id, &row)| SealRow {
957                            external_id: id,
958                            vector: &state.active[row as usize].vector,
959                            payload: &state.active[row as usize].payload,
960                        })
961                        .collect();
962                    segment::write_segment(
963                        &seg_dir,
964                        seg_id,
965                        codec.as_ref(),
966                        &seal_rows,
967                        &state.descriptor.filterable,
968                    )?;
969                    seal_rows.len() as u64
970                };
971                Some((seg_id, row_count))
972            };
973
974            // Make the new files and their parent directories durable before the
975            // manifest references them.
976            fsync_dir(&seg_dir)?;
977            fsync_dir(&collection_dir(&self.dir, cid))?;
978            fsync_dir(&self.dir.join("collections"))?;
979            fsync_dir(&self.dir)?;
980
981            if let Some((seg_id, row_count)) = new_seg {
982                let sealed = segment::open_segment(&seg_dir, seg_id, codec.as_ref())?;
983                pending.insert(
984                    cid,
985                    PendingSegment {
986                        seg_ref: SegmentRef {
987                            id: seg_id,
988                            row_count,
989                            lsn_low: segment_lsn_low,
990                            lsn_high: last_lsn,
991                        },
992                        sealed,
993                    },
994                );
995            }
996        }
997
998        // Phase A2: persist the supplied index snapshots (ADR-0025), each sealed
999        // with its collection's codec and fsync'd, so the manifest swap can
1000        // reference a durable file. The snapshot file id is the new manifest
1001        // version, unique per checkpoint.
1002        let mut new_index_refs: HashMap<CollectionId, IndexSnapshotRef> = HashMap::new();
1003        for &cid in &cids {
1004            let Some(blob) = index_snapshots.get(&cid) else {
1005                continue;
1006            };
1007            let index_dir = self.index_dir(cid);
1008            fs::create_dir_all(&index_dir).map_err(|e| CoreError::io(&index_dir, e))?;
1009            let codec = self.collections[&cid].codec.clone_box();
1010            let path = index_dir.join(index_snapshot_file_name(new_version));
1011            write_paged(
1012                &path,
1013                codec.as_ref(),
1014                PageType::IndexBlock,
1015                new_version,
1016                blob,
1017            )?;
1018            fsync_dir(&index_dir)?;
1019            fsync_dir(&collection_dir(&self.dir, cid))?;
1020            new_index_refs.insert(
1021                cid,
1022                IndexSnapshotRef {
1023                    id: new_version,
1024                    lsn: last_lsn,
1025                },
1026            );
1027        }
1028
1029        // Phase B: build and atomically install the new manifest.
1030        let mut entries = Vec::with_capacity(cids.len());
1031        for &cid in &cids {
1032            let state = &self.collections[&cid];
1033            let mut segs = state.segments_meta.clone();
1034            if let Some(p) = pending.get(&cid) {
1035                segs.push(p.seg_ref.clone());
1036            }
1037            entries.push(CollectionEntry {
1038                id: state.id,
1039                name: state.name.clone(),
1040                descriptor: postcard::to_allocvec(&state.descriptor)?,
1041                segments: segs,
1042                index_snapshot: new_index_refs.get(&cid).cloned(),
1043            });
1044        }
1045        let new_manifest = Manifest {
1046            format_version: MANIFEST_FORMAT_VERSION,
1047            version: new_version,
1048            last_checkpointed_lsn: last_lsn,
1049            next_collection_id: self.next_collection_id,
1050            next_segment_id: self.next_segment_id,
1051            collections: entries,
1052        };
1053        manifest::write_manifest(&self.dir, &new_manifest, self.keyring.catalog_codec())?;
1054
1055        // Phase C: commit in-memory state, rotate the WAL, GC superseded files.
1056        self.manifest_version = new_version;
1057        self.last_checkpointed_lsn = last_lsn;
1058        for &cid in &cids {
1059            let Some(state) = self.collections.get_mut(&cid) else {
1060                continue;
1061            };
1062            // Fold this window's dead rows into the in-memory segment bitmaps
1063            // (the `.del` files were already persisted in Phase A).
1064            let dead_window = std::mem::take(&mut state.dead_this_window);
1065            for (seg_idx, bitmap) in dead_window {
1066                if let Some(seg) = state.sealed.get_mut(seg_idx as usize) {
1067                    seg.mark_dead(&bitmap);
1068                }
1069            }
1070            // Install the new segment, if any, repointing its now-sealed ids.
1071            if let Some(p) = pending.remove(&cid) {
1072                let seg_idx = state.sealed.len() as u32;
1073                for (row, ext_id) in p.sealed.row_ids().iter().enumerate() {
1074                    state.primary.insert(
1075                        ext_id.clone(),
1076                        Loc::Sealed {
1077                            seg: seg_idx,
1078                            row: row as u32,
1079                        },
1080                    );
1081                }
1082                state.sealed.push(p.sealed);
1083                state.segments_meta.push(p.seg_ref);
1084            }
1085            state.active.clear();
1086            state.active_index.clear();
1087            state.index_snapshot = new_index_refs.get(&cid).cloned();
1088        }
1089        self.rotate_wal()?;
1090        gc_orphan_segments(&self.dir, &new_manifest, self.keyring.as_ref())?;
1091        gc_orphan_index_snapshots(&self.dir, &new_manifest)?;
1092        self.auto_compact()?;
1093        Ok(())
1094    }
1095
1096    /// Compact every collection with reclaimable space: merge its sealed segments,
1097    /// dropping dead (deleted or shadowed) rows, into a single fresh segment. Each
1098    /// collection commits via its own atomic manifest swap and is crash-safe like
1099    /// a checkpoint — the old segments stay valid until the swap, so a crash
1100    /// before it leaves the pre-compaction state intact.
1101    pub fn compact(&mut self) -> Result<()> {
1102        for cid in self.sorted_cids() {
1103            if self.reclaimable(cid) {
1104                self.compact_collection(cid)?;
1105            }
1106        }
1107        Ok(())
1108    }
1109
1110    // Compact only collections that have crossed the automatic threshold; run at
1111    // the end of a checkpoint.
1112    fn auto_compact(&mut self) -> Result<()> {
1113        for cid in self.sorted_cids() {
1114            if self.needs_compaction(cid) {
1115                self.compact_collection(cid)?;
1116            }
1117        }
1118        Ok(())
1119    }
1120
1121    fn sorted_cids(&self) -> Vec<CollectionId> {
1122        let mut cids: Vec<CollectionId> = self.collections.keys().copied().collect();
1123        cids.sort();
1124        cids
1125    }
1126
1127    // Whether a collection has any space to reclaim: more than one segment to
1128    // merge, or any dead rows in a segment.
1129    fn reclaimable(&self, cid: CollectionId) -> bool {
1130        self.collections.get(&cid).is_some_and(|s| {
1131            s.sealed.len() > 1
1132                || s.sealed
1133                    .iter()
1134                    .any(|seg| seg.live_count() < u64::from(seg.row_count()))
1135        })
1136    }
1137
1138    // Whether a collection has crossed the automatic compaction threshold: many
1139    // segments to merge, or at least half of its sealed rows dead.
1140    fn needs_compaction(&self, cid: CollectionId) -> bool {
1141        let Some(s) = self.collections.get(&cid) else {
1142            return false;
1143        };
1144        if s.sealed.is_empty() {
1145            return false;
1146        }
1147        let total: u64 = s.sealed.iter().map(|seg| u64::from(seg.row_count())).sum();
1148        let live: u64 = s.sealed.iter().map(SealedSegment::live_count).sum();
1149        s.sealed.len() >= COMPACT_MIN_SEGMENTS || (total > 0 && (total - live) * 2 >= total)
1150    }
1151
1152    // Merge one collection's sealed segments into a single fresh segment holding
1153    // only its live rows, install it atomically, and reclaim the old files.
1154    fn compact_collection(&mut self, cid: CollectionId) -> Result<()> {
1155        // This collection's own codec (its DEK under an envelope key-ring) seals
1156        // both the rows read from the old segments and the merged one written.
1157        let codec = self
1158            .collections
1159            .get(&cid)
1160            .ok_or_else(|| CoreError::NotFound(format!("collection {cid}")))?
1161            .codec
1162            .clone_box();
1163        // Gather the live sealed rows (active rows are untouched). `primary` is
1164        // ordered, so the rewritten segment is deterministic.
1165        let live: Vec<(String, Vec<u8>, Vec<u8>)> = {
1166            let state = self
1167                .collections
1168                .get(&cid)
1169                .ok_or_else(|| CoreError::NotFound(format!("collection {cid}")))?;
1170            let mut out = Vec::with_capacity(state.primary.len());
1171            for (ext_id, &loc) in &state.primary {
1172                if let Loc::Sealed { seg, row } = loc {
1173                    let segment = state.sealed.get(seg as usize).ok_or_else(|| {
1174                        CoreError::MalformedPage(format!("dangling segment index {seg}"))
1175                    })?;
1176                    let vector = segment.read_vector(codec.as_ref(), row, state.stride)?;
1177                    let payload = segment.read_payload(codec.as_ref(), row)?;
1178                    out.push((ext_id.clone(), vector, payload));
1179                }
1180            }
1181            out
1182        };
1183
1184        // The merged segment spans the full lsn range of its inputs.
1185        let (lsn_low, lsn_high) = {
1186            let state = &self.collections[&cid];
1187            let low = state
1188                .segments_meta
1189                .iter()
1190                .map(|s| s.lsn_low.value())
1191                .min()
1192                .map(Lsn)
1193                .unwrap_or(Lsn::ZERO);
1194            let high = state
1195                .segments_meta
1196                .iter()
1197                .map(|s| s.lsn_high.value())
1198                .max()
1199                .map(Lsn)
1200                .unwrap_or(self.last_checkpointed_lsn);
1201            (low, high)
1202        };
1203
1204        let seg_id = self.next_segment_id;
1205        self.next_segment_id += 1;
1206        let seg_dir = segments_dir(&self.dir, cid);
1207        fs::create_dir_all(&seg_dir).map_err(|e| CoreError::io(&seg_dir, e))?;
1208        let seal_rows: Vec<SealRow<'_>> = live
1209            .iter()
1210            .map(|(id, v, p)| SealRow {
1211                external_id: id,
1212                vector: v,
1213                payload: p,
1214            })
1215            .collect();
1216        segment::write_segment(
1217            &seg_dir,
1218            seg_id,
1219            codec.as_ref(),
1220            &seal_rows,
1221            &self.collections[&cid].descriptor.filterable,
1222        )?;
1223        fsync_dir(&seg_dir)?;
1224        fsync_dir(&collection_dir(&self.dir, cid))?;
1225        fsync_dir(&self.dir.join("collections"))?;
1226        fsync_dir(&self.dir)?;
1227        let new_ref = SegmentRef {
1228            id: seg_id,
1229            row_count: seal_rows.len() as u64,
1230            lsn_low,
1231            lsn_high,
1232        };
1233        let sealed = segment::open_segment(&seg_dir, seg_id, codec.as_ref())?;
1234
1235        // New manifest: this collection now has exactly one segment; others are
1236        // unchanged. The atomic swap is the commit point.
1237        let new_version = self.manifest_version + 1;
1238        let mut entries = Vec::with_capacity(self.collections.len());
1239        for &other in &self.sorted_cids() {
1240            let state = &self.collections[&other];
1241            let segs = if other == cid {
1242                vec![new_ref.clone()]
1243            } else {
1244                state.segments_meta.clone()
1245            };
1246            entries.push(CollectionEntry {
1247                id: state.id,
1248                name: state.name.clone(),
1249                descriptor: postcard::to_allocvec(&state.descriptor)?,
1250                segments: segs,
1251                index_snapshot: state.index_snapshot.clone(),
1252            });
1253        }
1254        let new_manifest = Manifest {
1255            format_version: MANIFEST_FORMAT_VERSION,
1256            version: new_version,
1257            last_checkpointed_lsn: self.last_checkpointed_lsn,
1258            next_collection_id: self.next_collection_id,
1259            next_segment_id: self.next_segment_id,
1260            collections: entries,
1261        };
1262        manifest::write_manifest(&self.dir, &new_manifest, self.keyring.catalog_codec())?;
1263
1264        // Commit: replace the segments (dropping the old mmaps before the files
1265        // are reclaimed), repoint the now-merged ids, and drop pending tombstones
1266        // (their rows no longer exist).
1267        self.manifest_version = new_version;
1268        let row_ids: Vec<String> = sealed.row_ids().to_vec();
1269        if let Some(state) = self.collections.get_mut(&cid) {
1270            state.sealed = vec![sealed];
1271            state.segments_meta = vec![new_ref];
1272            state.dead_this_window.clear();
1273            for (row, ext_id) in row_ids.into_iter().enumerate() {
1274                state.primary.insert(
1275                    ext_id,
1276                    Loc::Sealed {
1277                        seg: 0,
1278                        row: row as u32,
1279                    },
1280                );
1281            }
1282        }
1283        gc_orphan_segments(&self.dir, &new_manifest, self.keyring.as_ref())?;
1284        gc_orphan_index_snapshots(&self.dir, &new_manifest)?;
1285        Ok(())
1286    }
1287
1288    // Start a new WAL segment and delete every older one (all of their records
1289    // are now <= last_checkpointed_lsn and captured in segments).
1290    fn rotate_wal(&mut self) -> Result<()> {
1291        let wal_dir = self.dir.join("wal");
1292        let old_seq = self.wal_seq;
1293        let new_seq = old_seq + 1;
1294        let new_wal = WalWriter::create(&wal_file_path(&wal_dir, new_seq), self.next_lsn)?;
1295        fsync_dir(&wal_dir)?;
1296        self.wal = new_wal;
1297        self.wal_seq = new_seq;
1298        for (seq, path) in list_wal_files(&wal_dir)? {
1299            if seq <= old_seq {
1300                remove_file_if_present(&path)?;
1301            }
1302        }
1303        fsync_dir(&wal_dir)?;
1304        Ok(())
1305    }
1306}
1307
1308// Apply a recovered WAL record to the in-memory state during open. Upserts land
1309// in the active buffer (and are re-sealed at the next checkpoint); deletes remove
1310// from the primary index and are recorded for tombstoning.
1311fn apply_wal_entry(
1312    collections: &mut HashMap<CollectionId, CollectionState>,
1313    name_index: &mut HashMap<String, CollectionId>,
1314    entry: &WalEntry,
1315    keyring: &dyn KeyRing,
1316) -> Result<()> {
1317    match &entry.op {
1318        WalOp::CreateCollection {
1319            collection_id,
1320            name,
1321            descriptor,
1322        } => {
1323            let descriptor = Descriptor::decode(descriptor)?;
1324            // The key material was provisioned before this record was made
1325            // durable, so the collection's codec is available on replay.
1326            let codec = keyring.collection_codec(*collection_id)?;
1327            name_index.insert(name.clone(), *collection_id);
1328            collections.insert(
1329                *collection_id,
1330                CollectionState::new(*collection_id, name.clone(), descriptor, codec),
1331            );
1332        }
1333        WalOp::DropCollection { collection_id } => {
1334            if let Some(state) = collections.remove(collection_id) {
1335                name_index.remove(&state.name);
1336            }
1337        }
1338        WalOp::Upsert {
1339            collection_id,
1340            external_id,
1341            vector,
1342            payload,
1343        } => {
1344            if let Some(state) = collections.get_mut(collection_id) {
1345                state.apply_upsert(external_id, vector.clone(), payload.clone());
1346            }
1347        }
1348        WalOp::Delete {
1349            collection_id,
1350            external_id,
1351        } => {
1352            if let Some(state) = collections.get_mut(collection_id) {
1353                state.apply_delete(external_id);
1354            }
1355        }
1356        // The manifest is the authoritative checkpoint record; explicit
1357        // Checkpoint WAL records are not emitted and are a no-op here.
1358        WalOp::Checkpoint { .. } => {}
1359    }
1360    Ok(())
1361}
1362
1363// Delete superseded segment files (and whole dropped-collection directories)
1364// that the manifest no longer references.
1365fn gc_orphan_segments(dir: &Path, mfst: &Manifest, keyring: &dyn KeyRing) -> Result<()> {
1366    let collections_dir = dir.join("collections");
1367    if !collections_dir.exists() {
1368        return Ok(());
1369    }
1370    let mut referenced: HashSet<(u64, u64)> = HashSet::new();
1371    let mut live_collections: HashSet<u64> = HashSet::new();
1372    for c in &mfst.collections {
1373        live_collections.insert(c.id.value());
1374        for s in &c.segments {
1375            referenced.insert((c.id.value(), s.id));
1376        }
1377    }
1378    for entry in fs::read_dir(&collections_dir).map_err(|e| CoreError::io(&collections_dir, e))? {
1379        let entry = entry.map_err(|e| CoreError::io(&collections_dir, e))?;
1380        let cdir = entry.path();
1381        let Some(cid) = entry
1382            .file_name()
1383            .to_str()
1384            .and_then(|n| n.parse::<u64>().ok())
1385        else {
1386            continue;
1387        };
1388        if !live_collections.contains(&cid) {
1389            // A dropped collection: crypto-shred its key first (so a crash before
1390            // the files are reclaimed still leaves them unrecoverable), then
1391            // reclaim its whole directory.
1392            keyring.shred_collection(CollectionId(cid))?;
1393            if cdir.is_dir() {
1394                fs::remove_dir_all(&cdir).map_err(|e| CoreError::io(&cdir, e))?;
1395            }
1396            continue;
1397        }
1398        let seg_dir = cdir.join("segments");
1399        if !seg_dir.is_dir() {
1400            continue;
1401        }
1402        for seg in fs::read_dir(&seg_dir).map_err(|e| CoreError::io(&seg_dir, e))? {
1403            let seg = seg.map_err(|e| CoreError::io(&seg_dir, e))?;
1404            let path = seg.path();
1405            let Some(name) = seg.file_name().to_str().map(str::to_owned) else {
1406                continue;
1407            };
1408            // A crash-leftover temp (an interrupted `.del` rewrite) is always junk.
1409            if segment::is_temp_file(&name) {
1410                remove_file_if_present(&path)?;
1411                continue;
1412            }
1413            let Some(seg_id) = segment::seg_id_of_file(&name) else {
1414                continue;
1415            };
1416            if !referenced.contains(&(cid, seg_id)) {
1417                remove_file_if_present(&path)?;
1418            }
1419        }
1420    }
1421    Ok(())
1422}
1423
1424// Delete stale or orphaned index snapshot files (`idx-*`) that a live
1425// collection's manifest entry no longer references — a superseded snapshot, or
1426// one written by a checkpoint that crashed before its manifest swap (ADR-0025).
1427// Non-snapshot index artifacts (e.g. the disk graph) are left untouched; dropped
1428// collections are reclaimed wholesale by `gc_orphan_segments`.
1429fn gc_orphan_index_snapshots(dir: &Path, mfst: &Manifest) -> Result<()> {
1430    for c in &mfst.collections {
1431        let index_dir = collection_dir(dir, c.id).join("index");
1432        if !index_dir.is_dir() {
1433            continue;
1434        }
1435        let keep = c.index_snapshot.as_ref().map(|r| r.id);
1436        for entry in fs::read_dir(&index_dir).map_err(|e| CoreError::io(&index_dir, e))? {
1437            let entry = entry.map_err(|e| CoreError::io(&index_dir, e))?;
1438            let Some(name) = entry.file_name().to_str().map(str::to_owned) else {
1439                continue;
1440            };
1441            let Some(id) = index_snapshot_id_of_file(&name) else {
1442                continue; // not a snapshot file
1443            };
1444            if Some(id) != keep {
1445                remove_file_if_present(&entry.path())?;
1446            }
1447        }
1448    }
1449    Ok(())
1450}
1451
1452fn remove_file_if_present(path: &Path) -> Result<()> {
1453    match fs::remove_file(path) {
1454        Ok(()) => Ok(()),
1455        Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
1456        Err(e) => Err(CoreError::io(path, e)),
1457    }
1458}
1459
1460fn collection_dir(dir: &Path, cid: CollectionId) -> PathBuf {
1461    dir.join("collections").join(format!("{:010}", cid.value()))
1462}
1463
1464fn segments_dir(dir: &Path, cid: CollectionId) -> PathBuf {
1465    collection_dir(dir, cid).join("segments")
1466}
1467
1468// Name of a collection's index snapshot file for snapshot id `id` (ADR-0025);
1469// zero-padded so lexical order matches numeric order.
1470fn index_snapshot_file_name(id: u64) -> String {
1471    format!("idx-{id:010}")
1472}
1473
1474// Parse a snapshot id from an `idx-NNNNNNNNNN` file name, or `None` for any other
1475// file (so non-snapshot index artifacts are ignored by snapshot GC).
1476fn index_snapshot_id_of_file(name: &str) -> Option<u64> {
1477    name.strip_prefix("idx-")
1478        .and_then(|s| s.parse::<u64>().ok())
1479}
1480
1481fn wal_file_path(wal_dir: &Path, seq: u64) -> PathBuf {
1482    wal_dir.join(format!("wal-{seq:010}.log"))
1483}
1484
1485fn list_wal_files(wal_dir: &Path) -> Result<Vec<(u64, PathBuf)>> {
1486    let mut out = Vec::new();
1487    for entry in fs::read_dir(wal_dir).map_err(|e| CoreError::io(wal_dir, e))? {
1488        let entry = entry.map_err(|e| CoreError::io(wal_dir, e))?;
1489        if let Some(seq) = entry.file_name().to_str().and_then(parse_wal_file_name) {
1490            out.push((seq, entry.path()));
1491        }
1492    }
1493    out.sort_by_key(|(seq, _)| *seq);
1494    Ok(out)
1495}
1496
1497fn parse_wal_file_name(name: &str) -> Option<u64> {
1498    name.strip_prefix("wal-")
1499        .and_then(|s| s.strip_suffix(".log"))
1500        .and_then(|s| s.parse::<u64>().ok())
1501}
1502
1503fn f32_to_le_bytes(v: &[f32]) -> Vec<u8> {
1504    let mut out = Vec::with_capacity(v.len() * 4);
1505    for &x in v {
1506        out.extend_from_slice(&x.to_le_bytes());
1507    }
1508    out
1509}
1510
1511fn le_bytes_to_f32(bytes: &[u8]) -> Vec<f32> {
1512    bytes
1513        .chunks_exact(4)
1514        .map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
1515        .collect()
1516}
1517
1518#[cfg(test)]
1519mod tests {
1520    use super::*;
1521    use crate::descriptor::{DistanceMetric, Dtype};
1522
1523    fn desc() -> Descriptor {
1524        Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
1525    }
1526
1527    fn open(dir: &Path) -> Store {
1528        Store::open(dir).unwrap()
1529    }
1530
1531    // Path to a segment's row-directory file, for corruption/orphan tests.
1532    fn seg_dir_file(dir: &Path, cid: CollectionId, seg_id: u64) -> PathBuf {
1533        segments_dir(dir, cid).join(format!("seg-{seg_id:010}.dir"))
1534    }
1535
1536    #[test]
1537    fn upsert_get_delete_in_memory() {
1538        let tmp = tempfile::tempdir().unwrap();
1539        let mut s = open(tmp.path());
1540        let c = s.create_collection("c", desc()).unwrap();
1541        s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
1542        let got = s.get(c, "a").unwrap().unwrap();
1543        assert_eq!(got.vector, vec![1.0, 2.0, 3.0, 4.0]);
1544        assert_eq!(got.payload, b"{}");
1545        assert!(s.delete(c, "a").unwrap());
1546        assert!(s.get(c, "a").unwrap().is_none());
1547        assert!(!s.delete(c, "a").unwrap());
1548    }
1549
1550    #[test]
1551    fn dim_mismatch_is_rejected() {
1552        let tmp = tempfile::tempdir().unwrap();
1553        let mut s = open(tmp.path());
1554        let c = s.create_collection("c", desc()).unwrap();
1555        assert!(matches!(
1556            s.upsert(c, "a", &[1.0, 2.0], b"{}"),
1557            Err(CoreError::InvalidArgument(_))
1558        ));
1559    }
1560
1561    #[test]
1562    fn upsert_batch_commits_all_on_sync() {
1563        let tmp = tempfile::tempdir().unwrap();
1564        {
1565            let mut s = open(tmp.path());
1566            let c = s.create_collection("c", desc()).unwrap();
1567            let vecs: Vec<([f32; 4], String)> = (0..8u32)
1568                .map(|i| ([i as f32; 4], format!("k{i}")))
1569                .collect();
1570            let payload = b"{}";
1571            let records: Vec<(&str, &[f32], &[u8])> = vecs
1572                .iter()
1573                .map(|(v, id)| (id.as_str(), v.as_slice(), payload.as_slice()))
1574                .collect();
1575            let n = s.upsert_batch(c, &records).unwrap();
1576            assert_eq!(n, 8);
1577            // All points readable from in-memory state immediately.
1578            for (_, id) in &vecs {
1579                assert!(s.get(c, id).unwrap().is_some(), "missing {id}");
1580            }
1581        }
1582        // Re-open: WAL replay must restore all 8 points.
1583        let s = open(tmp.path());
1584        let c = s.collection_id("c").unwrap();
1585        assert_eq!(s.len(c).unwrap(), 8);
1586        for i in 0..8u32 {
1587            let got = s.get(c, &format!("k{i}")).unwrap().unwrap();
1588            assert_eq!(got.vector, vec![i as f32; 4]);
1589        }
1590    }
1591
1592    #[test]
1593    fn upsert_batch_dim_mismatch_writes_nothing() {
1594        let tmp = tempfile::tempdir().unwrap();
1595        let mut s = open(tmp.path());
1596        let c = s.create_collection("c", desc()).unwrap();
1597        // First record correct, second has wrong dim — the whole batch must fail.
1598        let bad: &[(&str, &[f32], &[u8])] = &[
1599            ("a", &[1.0, 2.0, 3.0, 4.0], b"{}"),
1600            ("b", &[1.0, 2.0], b"{}"), // wrong dim
1601        ];
1602        assert!(matches!(
1603            s.upsert_batch(c, bad),
1604            Err(CoreError::InvalidArgument(_))
1605        ));
1606        // Neither point was written.
1607        assert!(s.get(c, "a").unwrap().is_none());
1608    }
1609
1610    #[test]
1611    fn duplicate_collection_is_rejected() {
1612        let tmp = tempfile::tempdir().unwrap();
1613        let mut s = open(tmp.path());
1614        s.create_collection("c", desc()).unwrap();
1615        assert!(matches!(
1616            s.create_collection("c", desc()),
1617            Err(CoreError::AlreadyExists(_))
1618        ));
1619    }
1620
1621    #[test]
1622    fn recovers_without_checkpoint_via_wal_replay() {
1623        let tmp = tempfile::tempdir().unwrap();
1624        {
1625            let mut s = open(tmp.path());
1626            let c = s.create_collection("c", desc()).unwrap();
1627            for i in 0..10u32 {
1628                let v = [i as f32; 4];
1629                s.upsert(c, &format!("k{i}"), &v, b"{}").unwrap();
1630            }
1631        }
1632        let s = open(tmp.path());
1633        let c = s.collection_id("c").unwrap();
1634        assert_eq!(s.len(c).unwrap(), 10);
1635        let got = s.get(c, "k7").unwrap().unwrap();
1636        assert_eq!(got.vector, vec![7.0; 4]);
1637    }
1638
1639    #[test]
1640    fn recovers_across_checkpoint_and_wal_tail() {
1641        let tmp = tempfile::tempdir().unwrap();
1642        {
1643            let mut s = open(tmp.path());
1644            let c = s.create_collection("c", desc()).unwrap();
1645            for i in 0..5u32 {
1646                s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"{}")
1647                    .unwrap();
1648            }
1649            s.checkpoint().unwrap();
1650            // Post-checkpoint writes live only in the WAL until recovery.
1651            for i in 5..8u32 {
1652                s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"{}")
1653                    .unwrap();
1654            }
1655            s.delete(c, "k0").unwrap();
1656        }
1657        let s = open(tmp.path());
1658        let c = s.collection_id("c").unwrap();
1659        assert_eq!(s.len(c).unwrap(), 7); // k1..k7
1660        assert!(s.get(c, "k0").unwrap().is_none());
1661        assert_eq!(s.get(c, "k6").unwrap().unwrap().vector, vec![6.0; 4]);
1662    }
1663
1664    #[test]
1665    fn open_with_keyring_round_trips_through_checkpoint() {
1666        let tmp = tempfile::tempdir().unwrap();
1667        {
1668            let mut s =
1669                Store::open_with_keyring(tmp.path(), Box::new(SingleCodecKeyRing::plaintext()))
1670                    .unwrap();
1671            let c = s.create_collection("c", desc()).unwrap();
1672            s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
1673            s.checkpoint().unwrap();
1674            s.upsert(c, "b", &[5.0; 4], b"{}").unwrap();
1675        }
1676        // Reopen through the same key-ring: data recovers from the sealed segment
1677        // and the WAL tail, each opened with the collection's own codec.
1678        let s = Store::open_with_keyring(tmp.path(), Box::new(SingleCodecKeyRing::plaintext()))
1679            .unwrap();
1680        let c = s.collection_id("c").unwrap();
1681        assert_eq!(s.len(c).unwrap(), 2);
1682        assert_eq!(
1683            s.get(c, "a").unwrap().unwrap().vector,
1684            vec![1.0, 2.0, 3.0, 4.0]
1685        );
1686        assert_eq!(s.get(c, "b").unwrap().unwrap().vector, vec![5.0; 4]);
1687    }
1688
1689    #[test]
1690    fn delete_survives_checkpoint() {
1691        let tmp = tempfile::tempdir().unwrap();
1692        {
1693            let mut s = open(tmp.path());
1694            let c = s.create_collection("c", desc()).unwrap();
1695            s.upsert(c, "a", &[1.0; 4], b"{}").unwrap();
1696            s.upsert(c, "b", &[2.0; 4], b"{}").unwrap();
1697            s.checkpoint().unwrap();
1698            s.delete(c, "a").unwrap();
1699            s.checkpoint().unwrap(); // tombstone sealed into a new segment
1700        }
1701        let s = open(tmp.path());
1702        let c = s.collection_id("c").unwrap();
1703        assert!(s.get(c, "a").unwrap().is_none());
1704        assert!(s.get(c, "b").unwrap().is_some());
1705        assert_eq!(s.len(c).unwrap(), 1);
1706    }
1707
1708    #[test]
1709    fn reopen_is_idempotent() {
1710        let tmp = tempfile::tempdir().unwrap();
1711        {
1712            let mut s = open(tmp.path());
1713            let c = s.create_collection("c", desc()).unwrap();
1714            s.upsert(c, "a", &[1.0; 4], b"{}").unwrap();
1715            s.checkpoint().unwrap();
1716            s.upsert(c, "b", &[2.0; 4], b"{}").unwrap();
1717        }
1718        let snapshot = |dir: &Path| {
1719            let s = open(dir);
1720            let c = s.collection_id("c").unwrap();
1721            s.scan(c).unwrap()
1722        };
1723        assert_eq!(snapshot(tmp.path()), snapshot(tmp.path()));
1724    }
1725
1726    #[test]
1727    fn update_then_checkpoint_keeps_latest_value() {
1728        let tmp = tempfile::tempdir().unwrap();
1729        {
1730            let mut s = open(tmp.path());
1731            let c = s.create_collection("c", desc()).unwrap();
1732            s.upsert(c, "a", &[1.0; 4], b"v1").unwrap();
1733            s.checkpoint().unwrap();
1734            s.upsert(c, "a", &[9.0; 4], b"v2").unwrap(); // shadow the sealed row
1735            s.checkpoint().unwrap();
1736        }
1737        let s = open(tmp.path());
1738        let c = s.collection_id("c").unwrap();
1739        let got = s.get(c, "a").unwrap().unwrap();
1740        assert_eq!(got.vector, vec![9.0; 4]);
1741        assert_eq!(got.payload, b"v2");
1742        assert_eq!(s.len(c).unwrap(), 1);
1743    }
1744
1745    #[test]
1746    fn update_within_one_window_seals_latest() {
1747        // Re-upsert the same id several times before any checkpoint: only the
1748        // latest active row must be sealed and recoverable.
1749        let tmp = tempfile::tempdir().unwrap();
1750        {
1751            let mut s = open(tmp.path());
1752            let c = s.create_collection("c", desc()).unwrap();
1753            s.upsert(c, "a", &[1.0; 4], b"v1").unwrap();
1754            s.upsert(c, "a", &[2.0; 4], b"v2").unwrap();
1755            s.upsert(c, "a", &[3.0; 4], b"v3").unwrap();
1756            s.checkpoint().unwrap();
1757        }
1758        let s = open(tmp.path());
1759        let c = s.collection_id("c").unwrap();
1760        assert_eq!(s.len(c).unwrap(), 1);
1761        let got = s.get(c, "a").unwrap().unwrap();
1762        assert_eq!(got.vector, vec![3.0; 4]);
1763        assert_eq!(got.payload, b"v3");
1764    }
1765
1766    #[test]
1767    fn dropped_collection_is_gone_after_reopen() {
1768        let tmp = tempfile::tempdir().unwrap();
1769        {
1770            let mut s = open(tmp.path());
1771            let c = s.create_collection("c", desc()).unwrap();
1772            s.upsert(c, "a", &[1.0; 4], b"{}").unwrap();
1773            s.checkpoint().unwrap();
1774            assert!(s.drop_collection("c").unwrap());
1775            s.checkpoint().unwrap();
1776        }
1777        let s = open(tmp.path());
1778        assert!(s.collection_id("c").is_none());
1779        assert!(s.collection_names().is_empty());
1780    }
1781
1782    #[test]
1783    fn orphan_segment_is_garbage_collected() {
1784        let tmp = tempfile::tempdir().unwrap();
1785        let cid;
1786        {
1787            let mut s = open(tmp.path());
1788            let c = s.create_collection("c", desc()).unwrap();
1789            cid = c;
1790            s.upsert(c, "a", &[1.0; 4], b"{}").unwrap();
1791            s.checkpoint().unwrap();
1792        }
1793        // Drop a stray segment file the manifest does not reference.
1794        let stray = segments_dir(tmp.path(), cid).join("seg-0000009999.vec");
1795        fs::write(&stray, b"junk").unwrap();
1796        assert!(stray.exists());
1797        let _s = open(tmp.path());
1798        assert!(!stray.exists(), "orphan segment should be GC'd on open");
1799    }
1800
1801    #[test]
1802    fn corrupt_segment_is_detected_not_served() {
1803        let tmp = tempfile::tempdir().unwrap();
1804        let cid;
1805        {
1806            let mut s = open(tmp.path());
1807            let c = s.create_collection("c", desc()).unwrap();
1808            cid = c;
1809            s.upsert(c, "a", &[1.0; 4], b"{}").unwrap();
1810            s.checkpoint().unwrap();
1811        }
1812        // Corrupt the sealed segment's row directory (read and verified at open).
1813        // Flip a byte in page 0's live body (the 8-byte length prefix), which the
1814        // CRC covers — a small directory's postcard body does not reach far into
1815        // the 16 KiB page, so a deep offset would land in uncovered padding.
1816        let path = seg_dir_file(tmp.path(), cid, 0);
1817        let mut bytes = fs::read(&path).unwrap();
1818        bytes[33] ^= 0xFF;
1819        fs::write(&path, &bytes).unwrap();
1820        assert!(matches!(
1821            Store::open(tmp.path()),
1822            Err(CoreError::PageCorrupt { .. })
1823        ));
1824    }
1825
1826    #[test]
1827    fn torn_wal_tail_drops_only_unacked_record() {
1828        let tmp = tempfile::tempdir().unwrap();
1829        let wal_path;
1830        {
1831            let mut s = open(tmp.path());
1832            let c = s.create_collection("c", desc()).unwrap();
1833            for i in 0..3u32 {
1834                s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"{}")
1835                    .unwrap();
1836            }
1837            wal_path = wal_file_path(&tmp.path().join("wal"), s.wal_seq);
1838        }
1839        // Append a torn (partial) frame to the tail of the active WAL.
1840        {
1841            use std::io::Write as _;
1842            let mut f = fs::OpenOptions::new().append(true).open(&wal_path).unwrap();
1843            f.write_all(&[0xAA, 0xBB, 0xCC]).unwrap();
1844            f.sync_data().unwrap();
1845        }
1846        let s = open(tmp.path());
1847        let c = s.collection_id("c").unwrap();
1848        assert_eq!(s.len(c).unwrap(), 3); // the 3 acked upserts recovered intact
1849    }
1850
1851    #[test]
1852    fn reads_served_from_disk_after_checkpoint() {
1853        // After a checkpoint the active buffer is cleared, so a get must come
1854        // from the sealed segment's mmap'd columns — exercising the disk path.
1855        let tmp = tempfile::tempdir().unwrap();
1856        let mut s = open(tmp.path());
1857        let c = s.create_collection("c", desc()).unwrap();
1858        s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], br#"{"k":1}"#)
1859            .unwrap();
1860        s.checkpoint().unwrap();
1861        let got = s.get(c, "a").unwrap().unwrap();
1862        assert_eq!(got.vector, vec![1.0, 2.0, 3.0, 4.0]);
1863        assert_eq!(got.payload, br#"{"k":1}"#);
1864    }
1865
1866    #[test]
1867    fn high_dim_vectors_straddle_pages() {
1868        // A dimensionality whose stride does not divide the page body, forcing
1869        // vectors to straddle 16 KiB block boundaries in the .vec column.
1870        let tmp = tempfile::tempdir().unwrap();
1871        let mut s = open(tmp.path());
1872        let dim = 1000usize; // stride = 4000 B; ~4 vectors per 16352-B page body
1873        let c = s
1874            .create_collection(
1875                "c",
1876                Descriptor::new(dim as u32, Dtype::F32, DistanceMetric::L2),
1877            )
1878            .unwrap();
1879        for i in 0..20u32 {
1880            let v: Vec<f32> = (0..dim).map(|j| (i as f32) * 1000.0 + j as f32).collect();
1881            s.upsert(c, &format!("k{i}"), &v, b"{}").unwrap();
1882        }
1883        s.checkpoint().unwrap();
1884        let s = open(tmp.path());
1885        let c = s.collection_id("c").unwrap();
1886        for i in 0..20u32 {
1887            let got = s.get(c, &format!("k{i}")).unwrap().unwrap();
1888            let want: Vec<f32> = (0..dim).map(|j| (i as f32) * 1000.0 + j as f32).collect();
1889            assert_eq!(
1890                got.vector, want,
1891                "vector k{i} mismatch after straddling read"
1892            );
1893        }
1894    }
1895
1896    #[test]
1897    fn delete_persists_via_del_bitmap_across_reopen() {
1898        // Five rows in one segment; deleting one is 20% dead with a single
1899        // segment, so auto-compaction does not fire — the delete must survive
1900        // purely via the persisted `.del` tombstone bitmap.
1901        let tmp = tempfile::tempdir().unwrap();
1902        let cid;
1903        {
1904            let mut s = open(tmp.path());
1905            let c = s.create_collection("c", desc()).unwrap();
1906            cid = c;
1907            for i in 0..5u32 {
1908                s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"{}")
1909                    .unwrap();
1910            }
1911            s.checkpoint().unwrap();
1912            s.delete(c, "k2").unwrap();
1913            s.checkpoint().unwrap();
1914            assert_eq!(
1915                s.collections[&c].sealed.len(),
1916                1,
1917                "no new segment for a delete-only window"
1918            );
1919        }
1920        // The tombstone bitmap was written for segment 0.
1921        assert!(
1922            segments_dir(tmp.path(), cid)
1923                .join("seg-0000000000.del")
1924                .exists(),
1925            ".del must be persisted for the deleted row"
1926        );
1927        let s = open(tmp.path());
1928        let c = s.collection_id("c").unwrap();
1929        assert!(s.get(c, "k2").unwrap().is_none());
1930        assert_eq!(s.len(c).unwrap(), 4);
1931        for i in [0u32, 1, 3, 4] {
1932            assert!(s.get(c, &format!("k{i}")).unwrap().is_some());
1933        }
1934    }
1935
1936    #[test]
1937    fn shadowed_row_is_tombstoned_and_latest_wins() {
1938        let tmp = tempfile::tempdir().unwrap();
1939        {
1940            let mut s = open(tmp.path());
1941            let c = s.create_collection("c", desc()).unwrap();
1942            for i in 0..5u32 {
1943                s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"v1")
1944                    .unwrap();
1945            }
1946            s.checkpoint().unwrap(); // seg 0
1947            s.upsert(c, "k2", &[99.0; 4], b"v2").unwrap();
1948            s.checkpoint().unwrap(); // seg 1 holds the new k2; seg 0 row tombstoned
1949        }
1950        let s = open(tmp.path());
1951        let c = s.collection_id("c").unwrap();
1952        assert_eq!(s.len(c).unwrap(), 5); // k2 counted once
1953        let got = s.get(c, "k2").unwrap().unwrap();
1954        assert_eq!(got.vector, vec![99.0; 4]);
1955        assert_eq!(got.payload, b"v2");
1956    }
1957
1958    #[test]
1959    fn compaction_merges_segments_reclaims_and_keeps_active_rows() {
1960        let tmp = tempfile::tempdir().unwrap();
1961        let cid;
1962        {
1963            let mut s = open(tmp.path());
1964            let c = s.create_collection("c", desc()).unwrap();
1965            cid = c;
1966            for i in 0..6u32 {
1967                s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"{}")
1968                    .unwrap();
1969            }
1970            s.checkpoint().unwrap(); // seg 0: k0..k5
1971            for i in 6..12u32 {
1972                s.upsert(c, &format!("k{i}"), &[i as f32; 4], b"{}")
1973                    .unwrap();
1974            }
1975            s.checkpoint().unwrap(); // seg 1: k6..k11
1976            s.delete(c, "k0").unwrap();
1977            s.delete(c, "k6").unwrap();
1978            s.checkpoint().unwrap(); // tombstones only; still two segments
1979            assert_eq!(s.collections[&c].sealed.len(), 2);
1980
1981            // An un-checkpointed row must survive the compaction untouched.
1982            s.upsert(c, "fresh", &[7.0; 4], b"new").unwrap();
1983            s.compact().unwrap();
1984            assert_eq!(s.collections[&c].sealed.len(), 1, "segments merged to one");
1985            assert!(
1986                !segments_dir(tmp.path(), cid)
1987                    .join("seg-0000000000.dir")
1988                    .exists(),
1989                "old segment files reclaimed"
1990            );
1991            assert_eq!(s.len(c).unwrap(), 11); // 10 live sealed + 1 active
1992            assert!(s.get(c, "k0").unwrap().is_none());
1993            assert!(s.get(c, "k6").unwrap().is_none());
1994            assert_eq!(s.get(c, "k5").unwrap().unwrap().vector, vec![5.0; 4]);
1995            assert_eq!(s.get(c, "fresh").unwrap().unwrap().payload, b"new");
1996        }
1997        // Everything survives a reopen, including the active row via WAL replay.
1998        let s = open(tmp.path());
1999        let c = s.collection_id("c").unwrap();
2000        assert_eq!(s.collections[&c].sealed.len(), 1);
2001        assert_eq!(s.len(c).unwrap(), 11);
2002        assert!(s.get(c, "k0").unwrap().is_none());
2003        assert_eq!(s.get(c, "fresh").unwrap().unwrap().vector, vec![7.0; 4]);
2004        assert_eq!(s.get(c, "k11").unwrap().unwrap().vector, vec![11.0; 4]);
2005    }
2006
2007    #[test]
2008    fn auto_compaction_merges_many_segments() {
2009        let tmp = tempfile::tempdir().unwrap();
2010        let mut s = open(tmp.path());
2011        let c = s.create_collection("c", desc()).unwrap();
2012        // Eight checkpoints create eight segments; the eighth checkpoint's
2013        // auto-compaction merges them.
2014        for ck in 0..8u32 {
2015            for i in 0..3u32 {
2016                let n = ck * 3 + i;
2017                s.upsert(c, &format!("k{n}"), &[n as f32; 4], b"{}")
2018                    .unwrap();
2019            }
2020            s.checkpoint().unwrap();
2021        }
2022        assert!(
2023            s.collections[&c].sealed.len() < COMPACT_MIN_SEGMENTS,
2024            "auto-compaction should have merged the segments"
2025        );
2026        assert_eq!(s.len(c).unwrap(), 24);
2027        assert_eq!(s.get(c, "k0").unwrap().unwrap().vector, vec![0.0; 4]);
2028        assert_eq!(s.get(c, "k23").unwrap().unwrap().vector, vec![23.0; 4]);
2029    }
2030
2031    #[test]
2032    fn matching_ids_spans_secondary_index_and_active_buffer() {
2033        use crate::descriptor::FilterableField;
2034        use crate::sec::SecValue;
2035
2036        let tmp = tempfile::tempdir().unwrap();
2037        let mut s = open(tmp.path());
2038        let descriptor = Descriptor::new(4, Dtype::F32, DistanceMetric::L2).with_filterable(vec![
2039            FilterableField::keyword("city"),
2040            FilterableField::numeric("age"),
2041        ]);
2042        let c = s.create_collection("c", descriptor).unwrap();
2043        s.upsert(c, "a", &[0.0; 4], br#"{"city":"paris","age":30}"#)
2044            .unwrap();
2045        s.upsert(c, "b", &[0.0; 4], br#"{"city":"lyon","age":25}"#)
2046            .unwrap();
2047        s.upsert(c, "d", &[0.0; 4], br#"{"city":"paris","age":40}"#)
2048            .unwrap();
2049        s.checkpoint().unwrap();
2050        // An active (un-checkpointed) row, matched by scanning the buffer.
2051        s.upsert(c, "e", &[0.0; 4], br#"{"city":"paris","age":22}"#)
2052            .unwrap();
2053
2054        let paris = || SecPredicate::Eq {
2055            field: "city".into(),
2056            value: SecValue::Keyword("paris".into()),
2057        };
2058        assert_eq!(s.matching_ids(c, &paris()).unwrap(), ["a", "d", "e"]);
2059
2060        // Numeric range [25, 35]: 30 (a, sealed) and 25 (b, sealed); not 40 or 22.
2061        assert_eq!(
2062            s.matching_ids(
2063                c,
2064                &SecPredicate::Range {
2065                    field: "age".into(),
2066                    lo: Some(SecValue::Numeric(25.0)),
2067                    hi: Some(SecValue::Numeric(35.0)),
2068                    lo_inclusive: true,
2069                    hi_inclusive: true,
2070                }
2071            )
2072            .unwrap(),
2073            ["a", "b"]
2074        );
2075
2076        // Deleting a sealed row drops it via the primary-consistency check.
2077        s.delete(c, "a").unwrap();
2078        assert_eq!(s.matching_ids(c, &paris()).unwrap(), ["d", "e"]);
2079
2080        // A non-filterable field is rejected (the planner must post-filter it).
2081        assert!(matches!(
2082            s.matching_ids(
2083                c,
2084                &SecPredicate::Eq {
2085                    field: "country".into(),
2086                    value: SecValue::Keyword("fr".into()),
2087                }
2088            ),
2089            Err(CoreError::InvalidArgument(_))
2090        ));
2091
2092        // Checkpoint seals the active row and the deletion; results survive reopen.
2093        s.checkpoint().unwrap();
2094        let s = open(tmp.path());
2095        let c = s.collection_id("c").unwrap();
2096        assert_eq!(s.matching_ids(c, &paris()).unwrap(), ["d", "e"]);
2097    }
2098
2099    // ----- durable index snapshots (ADR-0025) -----
2100
2101    // The `idx-*` snapshot files currently on disk for a collection, sorted.
2102    fn index_snapshot_files(dir: &Path, cid: CollectionId) -> Vec<String> {
2103        let idx = collection_dir(dir, cid).join("index");
2104        let mut names: Vec<String> = fs::read_dir(&idx)
2105            .map(|rd| {
2106                rd.filter_map(std::result::Result::ok)
2107                    .filter_map(|e| e.file_name().to_str().map(str::to_owned))
2108                    .filter(|n| n.starts_with("idx-"))
2109                    .collect()
2110            })
2111            .unwrap_or_default();
2112        names.sort();
2113        names
2114    }
2115
2116    #[test]
2117    fn index_snapshot_round_trips_through_checkpoint_and_reopen() {
2118        let tmp = tempfile::tempdir().unwrap();
2119        let blob = b"opaque-index-bytes".to_vec();
2120        let cid = {
2121            let mut s = open(tmp.path());
2122            let c = s.create_collection("c", desc()).unwrap();
2123            s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
2124            s.checkpoint_with_index_snapshots(&HashMap::from([(c, blob.clone())]))
2125                .unwrap();
2126            // Available immediately, exactly one snapshot file on disk.
2127            assert_eq!(s.read_index_snapshot(c).unwrap(), Some(blob.clone()));
2128            assert_eq!(index_snapshot_files(tmp.path(), c).len(), 1);
2129            c
2130        };
2131        // Survives reopen, loaded via the manifest reference.
2132        let s = open(tmp.path());
2133        assert_eq!(s.read_index_snapshot(cid).unwrap(), Some(blob));
2134    }
2135
2136    #[test]
2137    fn checkpoint_without_a_snapshot_clears_and_reclaims_it() {
2138        let tmp = tempfile::tempdir().unwrap();
2139        let mut s = open(tmp.path());
2140        let c = s.create_collection("c", desc()).unwrap();
2141        s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
2142        s.checkpoint_with_index_snapshots(&HashMap::from([(c, b"blob".to_vec())]))
2143            .unwrap();
2144        assert!(s.read_index_snapshot(c).unwrap().is_some());
2145
2146        // A later plain checkpoint (with new data) carries no snapshot → cleared.
2147        s.upsert(c, "b", &[5.0, 6.0, 7.0, 8.0], b"{}").unwrap();
2148        s.checkpoint().unwrap();
2149        assert_eq!(s.read_index_snapshot(c).unwrap(), None);
2150        assert!(index_snapshot_files(tmp.path(), c).is_empty());
2151
2152        let s = open(tmp.path());
2153        assert_eq!(s.read_index_snapshot(c).unwrap(), None);
2154    }
2155
2156    #[test]
2157    fn a_new_snapshot_supersedes_and_reclaims_the_old_one() {
2158        let tmp = tempfile::tempdir().unwrap();
2159        let mut s = open(tmp.path());
2160        let c = s.create_collection("c", desc()).unwrap();
2161        s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
2162        s.checkpoint_with_index_snapshots(&HashMap::from([(c, b"first".to_vec())]))
2163            .unwrap();
2164        s.upsert(c, "b", &[5.0, 6.0, 7.0, 8.0], b"{}").unwrap();
2165        s.checkpoint_with_index_snapshots(&HashMap::from([(c, b"second".to_vec())]))
2166            .unwrap();
2167
2168        assert_eq!(s.read_index_snapshot(c).unwrap(), Some(b"second".to_vec()));
2169        assert_eq!(index_snapshot_files(tmp.path(), c).len(), 1);
2170    }
2171
2172    #[test]
2173    fn compaction_preserves_the_index_snapshot() {
2174        let tmp = tempfile::tempdir().unwrap();
2175        let mut s = open(tmp.path());
2176        let c = s.create_collection("c", desc()).unwrap();
2177        s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
2178        s.checkpoint_with_index_snapshots(&HashMap::from([(c, b"keep".to_vec())]))
2179            .unwrap();
2180        // More changes, then re-snapshot at the new floor and compact.
2181        s.upsert(c, "b", &[5.0, 6.0, 7.0, 8.0], b"{}").unwrap();
2182        s.delete(c, "a").unwrap();
2183        s.checkpoint_with_index_snapshots(&HashMap::from([(c, b"keep".to_vec())]))
2184            .unwrap();
2185        s.compact().unwrap();
2186        assert_eq!(s.read_index_snapshot(c).unwrap(), Some(b"keep".to_vec()));
2187
2188        let s = open(tmp.path());
2189        assert_eq!(s.read_index_snapshot(c).unwrap(), Some(b"keep".to_vec()));
2190    }
2191
2192    #[test]
2193    fn orphan_index_snapshot_is_reclaimed_on_open() {
2194        let tmp = tempfile::tempdir().unwrap();
2195        let cid = {
2196            let mut s = open(tmp.path());
2197            let c = s.create_collection("c", desc()).unwrap();
2198            s.upsert(c, "a", &[1.0, 2.0, 3.0, 4.0], b"{}").unwrap();
2199            s.checkpoint_with_index_snapshots(&HashMap::from([(c, b"live".to_vec())]))
2200                .unwrap();
2201            // Simulate a checkpoint that wrote a snapshot file but crashed before
2202            // the manifest swap: an unreferenced idx-* in the index dir.
2203            let stray = s.index_dir(c).join("idx-9999999999");
2204            fs::write(&stray, b"orphan").unwrap();
2205            c
2206        };
2207        let s = open(tmp.path());
2208        // Recovery reclaims the orphan but keeps the referenced snapshot.
2209        assert!(!s.index_dir(cid).join("idx-9999999999").exists());
2210        assert_eq!(s.read_index_snapshot(cid).unwrap(), Some(b"live".to_vec()));
2211    }
2212}