Skip to main content

reddb_server/runtime/
vector_turbo_kind.rs

1//! Foundation for `KIND vector.turbo` collections (PRD #668, issue #693).
2//!
3//! Mirrors `blockchain_kind` for the marker side of the contract:
4//! `red.collection.{name}.kind = "turbo"` is the durable signal that
5//! distinguishes a TurboQuant-backed vector collection from the legacy
6//! `vector` kind. Routing in `impl_ddl`, `create_vector`, and the
7//! vector-search executor reads `is_turbo` to branch.
8//!
9//! The per-collection runtime state (`TurboCollectionState`) owns the
10//! in-memory `TurboQuantIndex` and a `TurboExtent` when a pager is
11//! available, plus the codec dimension / metric inherited from the
12//! collection contract.
13
14use std::path::PathBuf;
15use std::sync::Arc;
16use std::time::Duration;
17
18use parking_lot::{Condvar, Mutex};
19
20use crate::storage::engine::distance::DistanceMetric;
21use crate::storage::engine::turboquant::extent::TurboExtent;
22use crate::storage::engine::turboquant::index::TurboQuantIndex;
23use crate::storage::engine::Pager;
24use crate::storage::schema::Value;
25use crate::storage::unified::{EntityData, UnifiedStore};
26use crate::storage::EntityId;
27use reddb_file::{
28    read_turboquant_snapshot as read_snapshot, write_turboquant_snapshot as write_snapshot,
29    TurboQuantSnapshotError as SnapshotError,
30};
31
32/// Value stored under `red.collection.{name}.kind` for vector.turbo
33/// collections. Must be distinct from `blockchain_kind::CHAIN_KIND_TAG`
34/// and from any future kind marker.
35pub const TURBO_KIND_TAG: &str = "turbo";
36
37/// Deterministic codec seed shared by every turbo collection in this
38/// process. The seed feeds the rotation matrix + codebook; a fixed seed
39/// guarantees that re-encoding the same vector across restarts produces
40/// the same code, which is what makes lazy re-population from the
41/// persisted vector entities safe.
42pub const TURBO_CODEC_SEED: u64 = 0x7155_8807_FED4_2913;
43
44fn kind_key(collection: &str) -> String {
45    format!("red.collection.{collection}.kind")
46}
47
48/// Persist the `turbo` kind marker. Called once at collection creation.
49/// Idempotent against `IF NOT EXISTS`: re-stamping the same value is a
50/// no-op at the config-tree layer.
51pub fn mark_as_turbo(store: &UnifiedStore, collection: &str) {
52    store.set_config_tree(
53        &kind_key(collection),
54        &crate::serde_json::Value::String(TURBO_KIND_TAG.to_string()),
55    );
56}
57
58/// True if `mark_as_turbo` was ever called for this collection. Cheap
59/// (one config-tree read) and safe to call on every INSERT/SEARCH —
60/// the legacy `vector` path takes the `false` branch with no extra
61/// cost beyond the lookup.
62pub fn is_turbo(store: &UnifiedStore, collection: &str) -> bool {
63    match store.get_config(&kind_key(collection)) {
64        Some(Value::Text(s)) => s.as_ref() == TURBO_KIND_TAG,
65        _ => false,
66    }
67}
68
69/// Per-collection runtime state for a `vector.turbo` collection.
70///
71/// `index` is the in-memory TurboQuant index that owns the encoded
72/// codes + raw vectors; `extent` is the per-collection page-backed
73/// payload buffer when the store is in paged mode (None for in-memory
74/// runtimes). Both are wrapped in `Mutex` because INSERTs serialize on
75/// the per-collection state — the contention point lives here rather
76/// than on the global store lock.
77pub struct TurboCollectionState {
78    pub dim: usize,
79    pub metric: DistanceMetric,
80    pub index: Mutex<TurboQuantIndex>,
81    pub extent: Mutex<Option<TurboExtent>>,
82    /// Set once the lazy rebuild from persisted vector entities has
83    /// happened. Subsequent INSERT/SEARCH calls skip the scan.
84    populated: std::sync::atomic::AtomicBool,
85    /// Issue #673 — per-collection readiness flag. Flips to `true`
86    /// only after the background rebuild completes. SEARCH against a
87    /// not-yet-ready collection waits (with bounded timeout) or
88    /// returns a structured NOT_READY response.
89    ready: std::sync::atomic::AtomicBool,
90    /// Condvar-paired wait surface for SEARCH callers that want to
91    /// block (bounded) until ready instead of immediately failing.
92    ready_signal: Arc<(Mutex<bool>, Condvar)>,
93    /// `.tv` snapshot file path for this collection (#674). `None` when
94    /// the active `StorageLayout` opts out of snapshot files
95    /// (`Minimal` / embedded). When `Some`, the checkpoint cycle dumps
96    /// here and boot prefers loading from here over rebuilding from
97    /// the extent.
98    snapshot_path: Mutex<Option<PathBuf>>,
99    /// Async-barrier handle for the most-recently-spawned snapshot
100    /// dump worker (#674). The next checkpoint cycle joins the
101    /// previous worker before starting a new one — bounds backpressure
102    /// to at most one in-flight dump per collection.
103    prev_snapshot_join: Mutex<Option<std::thread::JoinHandle<()>>>,
104}
105
106impl TurboCollectionState {
107    pub fn new(dim: usize, metric: DistanceMetric, pager: Option<&Arc<Pager>>) -> Self {
108        let index = TurboQuantIndex::new(dim, TURBO_CODEC_SEED);
109        let extent = pager
110            .and_then(|p| TurboExtent::new(Arc::clone(p)).ok())
111            .map(Some)
112            .unwrap_or(None);
113        Self {
114            dim,
115            metric,
116            index: Mutex::new(index),
117            extent: Mutex::new(extent),
118            populated: std::sync::atomic::AtomicBool::new(false),
119            ready: std::sync::atomic::AtomicBool::new(false),
120            ready_signal: Arc::new((Mutex::new(false), Condvar::new())),
121            snapshot_path: Mutex::new(None),
122            prev_snapshot_join: Mutex::new(None),
123        }
124    }
125
126    /// Set (or clear) the `.tv` snapshot path for this collection.
127    /// Called by `RedDB::turbo_state` once the resolved
128    /// `TieredLayoutPaths` is known.
129    pub fn set_snapshot_path(&self, path: Option<PathBuf>) {
130        *self.snapshot_path.lock() = path;
131    }
132
133    /// Current `.tv` snapshot path, if any.
134    pub fn snapshot_path(&self) -> Option<PathBuf> {
135        self.snapshot_path.lock().clone()
136    }
137
138    /// Returns the current readiness flag. `true` means the
139    /// background rebuild (or lazy populate) has completed and the
140    /// in-memory index reflects every WAL-acked INSERT.
141    pub fn is_ready(&self) -> bool {
142        self.ready.load(std::sync::atomic::Ordering::Acquire)
143    }
144
145    /// Mark the collection as ready. Called at the end of
146    /// `ensure_populated` and from the background rebuild worker.
147    /// Wakes any SEARCH callers parked on `wait_until_ready`.
148    fn mark_ready(&self) {
149        self.ready.store(true, std::sync::atomic::Ordering::Release);
150        let (lock, cv) = &*self.ready_signal;
151        let mut flag = lock.lock();
152        *flag = true;
153        cv.notify_all();
154    }
155
156    /// Block the caller (with a bounded timeout) until the
157    /// collection becomes ready. Returns `true` if ready within the
158    /// timeout, `false` if the timeout fired. A zero-or-negative
159    /// timeout still does one fast-path check.
160    pub fn wait_until_ready(&self, timeout: Duration) -> bool {
161        if self.is_ready() {
162            return true;
163        }
164        if timeout.is_zero() {
165            return self.is_ready();
166        }
167        let (lock, cv) = &*self.ready_signal;
168        let mut flag = lock.lock();
169        if *flag {
170            return true;
171        }
172        let _ = cv.wait_for(&mut flag, timeout);
173        *flag
174    }
175
176    /// Lazily populate the in-memory index from any vector entities
177    /// already persisted in the collection, then drain any WAL-replayed
178    /// `VectorInsert` records captured at store-open time (issue #694).
179    ///
180    /// Boot-time recovery: the WAL `VectorInsert` records are the
181    /// authoritative source for vectors that may not have made it into
182    /// the entity manager's persisted state (e.g. a crash between WAL
183    /// fsync and the next paged flush). Replaying them in WAL order
184    /// under a fixed codec seed reconstructs the in-memory
185    /// `TurboQuantIndex` deterministically — including the
186    /// partial-block tail introduced by ADR 0024.
187    ///
188    /// Non-vector traffic does not block on this rebuild: the runtime
189    /// only takes this path on the first turbo INSERT/SEARCH after
190    /// boot. `#673` wires the per-collection `ready: bool` flag on
191    /// top of this hook to keep vector traffic from observing a
192    /// half-built index while the rebuild is in flight.
193    pub fn ensure_populated(&self, store: &UnifiedStore, collection: &str) {
194        use std::sync::atomic::Ordering;
195        if self.populated.load(Ordering::Acquire) {
196            return;
197        }
198        let mut index = self.index.lock();
199        // Double-checked: another writer may have populated while we
200        // were waiting for the lock.
201        if self.populated.load(Ordering::Acquire) {
202            return;
203        }
204        // Snapshot-first boot (#674). When a valid `.tv` exists at
205        // the layout-derived path, replay its `(id, vector)` pairs in
206        // stored order. The deterministic codec seed reproduces
207        // byte-identical block/lane placement, so the index ends up
208        // identical to what a from-scratch entity scan would build —
209        // without walking the entity manager. Snapshots are purely a
210        // cache: any failure (missing, truncated, crc-bad, dim/seed
211        // drift) falls through to the legacy entity-scan path so
212        // boot still succeeds.
213        let mut snapshot_loaded = false;
214        if let Some(path) = self.snapshot_path.lock().clone() {
215            if path.exists() {
216                match read_snapshot(&path, self.dim as u32, TURBO_CODEC_SEED) {
217                    Ok(payload) => {
218                        for (raw_id, vector) in payload.vectors {
219                            index.insert(EntityId::new(raw_id), vector);
220                        }
221                        snapshot_loaded = true;
222                    }
223                    Err(err) => {
224                        tracing::warn!(
225                            target: "reddb::turbo::snapshot",
226                            collection,
227                            path = %path.display(),
228                            error = %err,
229                            "vector.turbo snapshot unusable; falling back to extent/WAL rebuild",
230                        );
231                    }
232                }
233            }
234        }
235
236        if !snapshot_loaded {
237            if let Some(manager) = store.get_collection(collection) {
238                for entity in manager.query_all(|_| true) {
239                    if let EntityData::Vector(data) = &entity.data {
240                        if data.dense.len() == self.dim {
241                            index.insert(entity.id, data.dense.clone());
242                        }
243                    }
244                }
245            }
246        }
247        // Drain WAL-replayed VectorInsert records (#694). Apply in WAL
248        // order so the resulting block/lane placement matches the
249        // pre-restart state byte-for-byte. Duplicate ids (same vector
250        // also present in the entity manager) overwrite via
251        // `TurboQuantIndex::insert`'s id-replace branch, which is safe
252        // and idempotent because the codec seed is constant.
253        if let Some(records) = store.take_replayed_turbo_inserts(collection) {
254            for (raw_id, vector) in records {
255                if vector.len() == self.dim {
256                    index.insert(EntityId::new(raw_id), vector);
257                }
258            }
259        }
260        self.populated.store(true, Ordering::Release);
261        self.mark_ready();
262    }
263
264    /// Capture the current in-memory index state and dump it to the
265    /// configured `.tv` path on a worker thread (#674). The next
266    /// caller (typically the next WAL checkpoint cycle) blocks on the
267    /// previous worker before starting a new one — bounded
268    /// backpressure of at most one dump in flight per collection. The
269    /// WAL checkpoint itself never waits for the snapshot fsync.
270    ///
271    /// No-op when `snapshot_path` is `None` (`StorageLayout::Minimal`
272    /// or embedded mode) — preserves the single-file portability
273    /// story.
274    pub fn dump_snapshot_async(self: &Arc<Self>, lsn: u64) {
275        let Some(path) = self.snapshot_path.lock().clone() else {
276            return;
277        };
278
279        // Async barrier: wait for any previous dump to finish before
280        // taking a fresh snapshot. Joining here (and not inside the
281        // new worker) keeps the work serialized per collection and
282        // ensures the on-disk file moves monotonically forward.
283        if let Some(prev) = self.prev_snapshot_join.lock().take() {
284            let _ = prev.join();
285        }
286
287        // Capture state under the index lock — the encode/decode
288        // path uses the same lock — then drop the lock and serialize
289        // off-thread so the checkpoint completion latency is not
290        // blocked on snapshot fsync (acceptance criterion #674.5).
291        let dim = self.dim as u32;
292        let captured: Vec<(u64, Vec<f32>)> = {
293            let guard = self.index.lock();
294            guard
295                .iter_persisted()
296                .map(|(id, v)| (id.raw(), v.to_vec()))
297                .collect()
298        };
299
300        let path_for_worker = path.clone();
301        let handle = std::thread::Builder::new()
302            .name("turbo-snapshot-dump".to_string())
303            .spawn(move || {
304                if let Some(parent) = path_for_worker.parent() {
305                    let _ = std::fs::create_dir_all(parent);
306                }
307                if let Err(err) =
308                    write_snapshot(&path_for_worker, dim, TURBO_CODEC_SEED, lsn, &captured)
309                {
310                    tracing::warn!(
311                        target: "reddb::turbo::snapshot",
312                        path = %path_for_worker.display(),
313                        error = %err,
314                        "vector.turbo snapshot dump failed; cache will be rebuilt on next checkpoint",
315                    );
316                }
317            })
318            .ok();
319
320        *self.prev_snapshot_join.lock() = handle;
321    }
322
323    /// Join the in-flight snapshot worker, if any. Used by `RedDB::Drop`
324    /// to make sure no `.tv` write outlives the runtime, and by tests
325    /// that want to assert the on-disk state synchronously.
326    pub fn wait_snapshot(&self) {
327        if let Some(prev) = self.prev_snapshot_join.lock().take() {
328            let _ = prev.join();
329        }
330    }
331
332    /// Surface `SnapshotError::Io` to callers that need it. Currently
333    /// unused outside of tests but kept on the public path so the
334    /// snapshot module stays an implementation detail of this state.
335    #[allow(dead_code)]
336    pub(crate) fn snapshot_error_is_fatal(err: &SnapshotError) -> bool {
337        matches!(err, SnapshotError::Io(_))
338    }
339
340    /// Background-rebuild hook (#694 / #673). Drives the same code
341    /// path as `ensure_populated`; safe to call from a worker thread.
342    /// On completion the per-collection readiness flag flips and any
343    /// SEARCH callers parked on `wait_until_ready` are woken.
344    pub fn background_rebuild(&self, store: &UnifiedStore, collection: &str) {
345        self.ensure_populated(store, collection);
346    }
347}
348
349/// Public hook (#673) — issued by `RedDB::turbo_state` the first
350/// time a turbo collection's state is materialised. Spawns a worker
351/// thread that drains the WAL replay buffer + persisted vector
352/// entities and flips the readiness flag. Non-vector traffic never
353/// blocks on this; vector SEARCH/INSERT against a not-yet-ready
354/// collection observes the flag via `is_ready` / `wait_until_ready`.
355pub fn spawn_background_rebuild(
356    store: Arc<UnifiedStore>,
357    collection: String,
358    state: Arc<TurboCollectionState>,
359) -> std::thread::JoinHandle<()> {
360    // Worker holds a *weak* handle to the store so it can detect
361    // shutdown when the upgrade fails. The runtime registers the
362    // returned `JoinHandle` and joins it in `RedDB::Drop` before
363    // releasing its own `Arc<UnifiedStore>`, which is what
364    // guarantees a clean restart on the same database path.
365    let store_weak = Arc::downgrade(&store);
366    drop(store);
367    std::thread::Builder::new()
368        .name(format!("turbo-rebuild-{collection}"))
369        .spawn(move || {
370            let Some(store) = store_weak.upgrade() else {
371                return;
372            };
373            state.background_rebuild(&store, &collection);
374            store.set_config_tree(
375                &format!("red.collection.{collection}.vector.turbo.ready"),
376                &crate::serde_json::Value::Bool(true),
377            );
378        })
379        .expect("spawn turbo background rebuild thread")
380}
381
382/// Read the persisted readiness flag from the catalog. Used by
383/// admin/introspection paths that don't hold the runtime
384/// `TurboCollectionState` (e.g. SHOW COLLECTION metadata, cross-
385/// process tooling). The in-memory `TurboCollectionState::is_ready`
386/// is the authoritative live signal; this is the persisted shadow.
387pub fn ready_flag_from_catalog(store: &UnifiedStore, collection: &str) -> bool {
388    let key = format!("red.collection.{collection}.vector.turbo.ready");
389    match store.get_config(&key) {
390        Some(Value::Boolean(b)) => b,
391        _ => false,
392    }
393}
394
395#[cfg(test)]
396mod tests {
397    use super::*;
398    use crate::storage::unified::UnifiedStore;
399
400    #[test]
401    fn mark_and_detect_turbo_kind() {
402        let store = UnifiedStore::new();
403        assert!(!is_turbo(&store, "v"));
404        mark_as_turbo(&store, "v");
405        assert!(is_turbo(&store, "v"));
406    }
407}