reddb_server/runtime/vector_turbo_kind.rs
1//! Foundation for `KIND vector.turbo` collections (PRD #668, issue #693).
2//!
3//! Mirrors `blockchain_kind` for the marker side of the contract:
4//! `red.collection.{name}.kind = "turbo"` is the durable signal that
5//! distinguishes a TurboQuant-backed vector collection from the legacy
6//! `vector` kind. Routing in `impl_ddl`, `create_vector`, and the
7//! vector-search executor reads `is_turbo` to branch.
8//!
9//! The per-collection runtime state (`TurboCollectionState`) owns the
10//! in-memory `TurboQuantIndex` and a `TurboExtent` when a pager is
11//! available, plus the codec dimension / metric inherited from the
12//! collection contract.
13
14use std::path::PathBuf;
15use std::sync::Arc;
16use std::time::Duration;
17
18use parking_lot::{Condvar, Mutex};
19
20use crate::storage::engine::distance::DistanceMetric;
21use crate::storage::engine::turboquant::extent::TurboExtent;
22use crate::storage::engine::turboquant::index::TurboQuantIndex;
23use crate::storage::engine::Pager;
24use crate::storage::schema::Value;
25use crate::storage::unified::{EntityData, UnifiedStore};
26use crate::storage::EntityId;
27use reddb_file::{
28 read_turboquant_snapshot as read_snapshot, write_turboquant_snapshot as write_snapshot,
29 TurboQuantSnapshotError as SnapshotError,
30};
31
32/// Value stored under `red.collection.{name}.kind` for vector.turbo
33/// collections. Must be distinct from `blockchain_kind::CHAIN_KIND_TAG`
34/// and from any future kind marker.
35pub const TURBO_KIND_TAG: &str = "turbo";
36
37/// Deterministic codec seed shared by every turbo collection in this
38/// process. The seed feeds the rotation matrix + codebook; a fixed seed
39/// guarantees that re-encoding the same vector across restarts produces
40/// the same code, which is what makes lazy re-population from the
41/// persisted vector entities safe.
42pub const TURBO_CODEC_SEED: u64 = 0x7155_8807_FED4_2913;
43
44fn kind_key(collection: &str) -> String {
45 format!("red.collection.{collection}.kind")
46}
47
48/// Persist the `turbo` kind marker. Called once at collection creation.
49/// Idempotent against `IF NOT EXISTS`: re-stamping the same value is a
50/// no-op at the config-tree layer.
51pub fn mark_as_turbo(store: &UnifiedStore, collection: &str) {
52 store.set_config_tree(
53 &kind_key(collection),
54 &crate::serde_json::Value::String(TURBO_KIND_TAG.to_string()),
55 );
56}
57
58/// True if `mark_as_turbo` was ever called for this collection. Cheap
59/// (one config-tree read) and safe to call on every INSERT/SEARCH —
60/// the legacy `vector` path takes the `false` branch with no extra
61/// cost beyond the lookup.
62pub fn is_turbo(store: &UnifiedStore, collection: &str) -> bool {
63 match store.get_config(&kind_key(collection)) {
64 Some(Value::Text(s)) => s.as_ref() == TURBO_KIND_TAG,
65 _ => false,
66 }
67}
68
69/// Per-collection runtime state for a `vector.turbo` collection.
70///
71/// `index` is the in-memory TurboQuant index that owns the encoded
72/// codes + raw vectors; `extent` is the per-collection page-backed
73/// payload buffer when the store is in paged mode (None for in-memory
74/// runtimes). Both are wrapped in `Mutex` because INSERTs serialize on
75/// the per-collection state — the contention point lives here rather
76/// than on the global store lock.
77pub struct TurboCollectionState {
78 pub dim: usize,
79 pub metric: DistanceMetric,
80 pub index: Mutex<TurboQuantIndex>,
81 pub extent: Mutex<Option<TurboExtent>>,
82 /// Set once the lazy rebuild from persisted vector entities has
83 /// happened. Subsequent INSERT/SEARCH calls skip the scan.
84 populated: std::sync::atomic::AtomicBool,
85 /// Issue #673 — per-collection readiness flag. Flips to `true`
86 /// only after the background rebuild completes. SEARCH against a
87 /// not-yet-ready collection waits (with bounded timeout) or
88 /// returns a structured NOT_READY response.
89 ready: std::sync::atomic::AtomicBool,
90 /// Condvar-paired wait surface for SEARCH callers that want to
91 /// block (bounded) until ready instead of immediately failing.
92 ready_signal: Arc<(Mutex<bool>, Condvar)>,
93 /// `.tv` snapshot file path for this collection (#674). `None` when
94 /// the active `StorageLayout` opts out of snapshot files
95 /// (`Minimal` / embedded). When `Some`, the checkpoint cycle dumps
96 /// here and boot prefers loading from here over rebuilding from
97 /// the extent.
98 snapshot_path: Mutex<Option<PathBuf>>,
99 /// Async-barrier handle for the most-recently-spawned snapshot
100 /// dump worker (#674). The next checkpoint cycle joins the
101 /// previous worker before starting a new one — bounds backpressure
102 /// to at most one in-flight dump per collection.
103 prev_snapshot_join: Mutex<Option<std::thread::JoinHandle<()>>>,
104}
105
106impl TurboCollectionState {
107 pub fn new(dim: usize, metric: DistanceMetric, pager: Option<&Arc<Pager>>) -> Self {
108 let index = TurboQuantIndex::new(dim, TURBO_CODEC_SEED);
109 let extent = pager
110 .and_then(|p| TurboExtent::new(Arc::clone(p)).ok())
111 .map(Some)
112 .unwrap_or(None);
113 Self {
114 dim,
115 metric,
116 index: Mutex::new(index),
117 extent: Mutex::new(extent),
118 populated: std::sync::atomic::AtomicBool::new(false),
119 ready: std::sync::atomic::AtomicBool::new(false),
120 ready_signal: Arc::new((Mutex::new(false), Condvar::new())),
121 snapshot_path: Mutex::new(None),
122 prev_snapshot_join: Mutex::new(None),
123 }
124 }
125
126 /// Set (or clear) the `.tv` snapshot path for this collection.
127 /// Called by `RedDB::turbo_state` once the resolved
128 /// `TieredLayoutPaths` is known.
129 pub fn set_snapshot_path(&self, path: Option<PathBuf>) {
130 *self.snapshot_path.lock() = path;
131 }
132
133 /// Current `.tv` snapshot path, if any.
134 pub fn snapshot_path(&self) -> Option<PathBuf> {
135 self.snapshot_path.lock().clone()
136 }
137
138 /// Returns the current readiness flag. `true` means the
139 /// background rebuild (or lazy populate) has completed and the
140 /// in-memory index reflects every WAL-acked INSERT.
141 pub fn is_ready(&self) -> bool {
142 self.ready.load(std::sync::atomic::Ordering::Acquire)
143 }
144
145 /// Mark the collection as ready. Called at the end of
146 /// `ensure_populated` and from the background rebuild worker.
147 /// Wakes any SEARCH callers parked on `wait_until_ready`.
148 fn mark_ready(&self) {
149 self.ready.store(true, std::sync::atomic::Ordering::Release);
150 let (lock, cv) = &*self.ready_signal;
151 let mut flag = lock.lock();
152 *flag = true;
153 cv.notify_all();
154 }
155
156 /// Block the caller (with a bounded timeout) until the
157 /// collection becomes ready. Returns `true` if ready within the
158 /// timeout, `false` if the timeout fired. A zero-or-negative
159 /// timeout still does one fast-path check.
160 pub fn wait_until_ready(&self, timeout: Duration) -> bool {
161 if self.is_ready() {
162 return true;
163 }
164 if timeout.is_zero() {
165 return self.is_ready();
166 }
167 let (lock, cv) = &*self.ready_signal;
168 let mut flag = lock.lock();
169 if *flag {
170 return true;
171 }
172 let _ = cv.wait_for(&mut flag, timeout);
173 *flag
174 }
175
176 /// Lazily populate the in-memory index from any vector entities
177 /// already persisted in the collection, then drain any WAL-replayed
178 /// `VectorInsert` records captured at store-open time (issue #694).
179 ///
180 /// Boot-time recovery: the WAL `VectorInsert` records are the
181 /// authoritative source for vectors that may not have made it into
182 /// the entity manager's persisted state (e.g. a crash between WAL
183 /// fsync and the next paged flush). Replaying them in WAL order
184 /// under a fixed codec seed reconstructs the in-memory
185 /// `TurboQuantIndex` deterministically — including the
186 /// partial-block tail introduced by ADR 0024.
187 ///
188 /// Non-vector traffic does not block on this rebuild: the runtime
189 /// only takes this path on the first turbo INSERT/SEARCH after
190 /// boot. `#673` wires the per-collection `ready: bool` flag on
191 /// top of this hook to keep vector traffic from observing a
192 /// half-built index while the rebuild is in flight.
193 pub fn ensure_populated(&self, store: &UnifiedStore, collection: &str) {
194 use std::sync::atomic::Ordering;
195 if self.populated.load(Ordering::Acquire) {
196 return;
197 }
198 let mut index = self.index.lock();
199 // Double-checked: another writer may have populated while we
200 // were waiting for the lock.
201 if self.populated.load(Ordering::Acquire) {
202 return;
203 }
204 // Snapshot-first boot (#674). When a valid `.tv` exists at
205 // the layout-derived path, replay its `(id, vector)` pairs in
206 // stored order. The deterministic codec seed reproduces
207 // byte-identical block/lane placement, so the index ends up
208 // identical to what a from-scratch entity scan would build —
209 // without walking the entity manager. Snapshots are purely a
210 // cache: any failure (missing, truncated, crc-bad, dim/seed
211 // drift) falls through to the legacy entity-scan path so
212 // boot still succeeds.
213 let mut snapshot_loaded = false;
214 if let Some(path) = self.snapshot_path.lock().clone() {
215 if path.exists() {
216 match read_snapshot(&path, self.dim as u32, TURBO_CODEC_SEED) {
217 Ok(payload) => {
218 for (raw_id, vector) in payload.vectors {
219 index.insert(EntityId::new(raw_id), vector);
220 }
221 snapshot_loaded = true;
222 }
223 Err(err) => {
224 tracing::warn!(
225 target: "reddb::turbo::snapshot",
226 collection,
227 path = %path.display(),
228 error = %err,
229 "vector.turbo snapshot unusable; falling back to extent/WAL rebuild",
230 );
231 }
232 }
233 }
234 }
235
236 if !snapshot_loaded {
237 if let Some(manager) = store.get_collection(collection) {
238 for entity in manager.query_all(|_| true) {
239 if let EntityData::Vector(data) = &entity.data {
240 if data.dense.len() == self.dim {
241 index.insert(entity.id, data.dense.clone());
242 }
243 }
244 }
245 }
246 }
247 // Drain WAL-replayed VectorInsert records (#694). Apply in WAL
248 // order so the resulting block/lane placement matches the
249 // pre-restart state byte-for-byte. Duplicate ids (same vector
250 // also present in the entity manager) overwrite via
251 // `TurboQuantIndex::insert`'s id-replace branch, which is safe
252 // and idempotent because the codec seed is constant.
253 if let Some(records) = store.take_replayed_turbo_inserts(collection) {
254 for (raw_id, vector) in records {
255 if vector.len() == self.dim {
256 index.insert(EntityId::new(raw_id), vector);
257 }
258 }
259 }
260 self.populated.store(true, Ordering::Release);
261 self.mark_ready();
262 }
263
264 /// Capture the current in-memory index state and dump it to the
265 /// configured `.tv` path on a worker thread (#674). The next
266 /// caller (typically the next WAL checkpoint cycle) blocks on the
267 /// previous worker before starting a new one — bounded
268 /// backpressure of at most one dump in flight per collection. The
269 /// WAL checkpoint itself never waits for the snapshot fsync.
270 ///
271 /// No-op when `snapshot_path` is `None` (`StorageLayout::Minimal`
272 /// or embedded mode) — preserves the single-file portability
273 /// story.
274 pub fn dump_snapshot_async(self: &Arc<Self>, lsn: u64) {
275 let Some(path) = self.snapshot_path.lock().clone() else {
276 return;
277 };
278
279 // Async barrier: wait for any previous dump to finish before
280 // taking a fresh snapshot. Joining here (and not inside the
281 // new worker) keeps the work serialized per collection and
282 // ensures the on-disk file moves monotonically forward.
283 if let Some(prev) = self.prev_snapshot_join.lock().take() {
284 let _ = prev.join();
285 }
286
287 // Capture state under the index lock — the encode/decode
288 // path uses the same lock — then drop the lock and serialize
289 // off-thread so the checkpoint completion latency is not
290 // blocked on snapshot fsync (acceptance criterion #674.5).
291 let dim = self.dim as u32;
292 let captured: Vec<(u64, Vec<f32>)> = {
293 let guard = self.index.lock();
294 guard
295 .iter_persisted()
296 .map(|(id, v)| (id.raw(), v.to_vec()))
297 .collect()
298 };
299
300 let path_for_worker = path.clone();
301 let handle = std::thread::Builder::new()
302 .name("turbo-snapshot-dump".to_string())
303 .spawn(move || {
304 if let Some(parent) = path_for_worker.parent() {
305 let _ = std::fs::create_dir_all(parent);
306 }
307 if let Err(err) =
308 write_snapshot(&path_for_worker, dim, TURBO_CODEC_SEED, lsn, &captured)
309 {
310 tracing::warn!(
311 target: "reddb::turbo::snapshot",
312 path = %path_for_worker.display(),
313 error = %err,
314 "vector.turbo snapshot dump failed; cache will be rebuilt on next checkpoint",
315 );
316 }
317 })
318 .ok();
319
320 *self.prev_snapshot_join.lock() = handle;
321 }
322
323 /// Join the in-flight snapshot worker, if any. Used by `RedDB::Drop`
324 /// to make sure no `.tv` write outlives the runtime, and by tests
325 /// that want to assert the on-disk state synchronously.
326 pub fn wait_snapshot(&self) {
327 if let Some(prev) = self.prev_snapshot_join.lock().take() {
328 let _ = prev.join();
329 }
330 }
331
332 /// Surface `SnapshotError::Io` to callers that need it. Currently
333 /// unused outside of tests but kept on the public path so the
334 /// snapshot module stays an implementation detail of this state.
335 #[allow(dead_code)]
336 pub(crate) fn snapshot_error_is_fatal(err: &SnapshotError) -> bool {
337 matches!(err, SnapshotError::Io(_))
338 }
339
340 /// Background-rebuild hook (#694 / #673). Drives the same code
341 /// path as `ensure_populated`; safe to call from a worker thread.
342 /// On completion the per-collection readiness flag flips and any
343 /// SEARCH callers parked on `wait_until_ready` are woken.
344 pub fn background_rebuild(&self, store: &UnifiedStore, collection: &str) {
345 self.ensure_populated(store, collection);
346 }
347}
348
349/// Public hook (#673) — issued by `RedDB::turbo_state` the first
350/// time a turbo collection's state is materialised. Spawns a worker
351/// thread that drains the WAL replay buffer + persisted vector
352/// entities and flips the readiness flag. Non-vector traffic never
353/// blocks on this; vector SEARCH/INSERT against a not-yet-ready
354/// collection observes the flag via `is_ready` / `wait_until_ready`.
355pub fn spawn_background_rebuild(
356 store: Arc<UnifiedStore>,
357 collection: String,
358 state: Arc<TurboCollectionState>,
359) -> std::thread::JoinHandle<()> {
360 // Worker holds a *weak* handle to the store so it can detect
361 // shutdown when the upgrade fails. The runtime registers the
362 // returned `JoinHandle` and joins it in `RedDB::Drop` before
363 // releasing its own `Arc<UnifiedStore>`, which is what
364 // guarantees a clean restart on the same database path.
365 let store_weak = Arc::downgrade(&store);
366 drop(store);
367 std::thread::Builder::new()
368 .name(format!("turbo-rebuild-{collection}"))
369 .spawn(move || {
370 let Some(store) = store_weak.upgrade() else {
371 return;
372 };
373 state.background_rebuild(&store, &collection);
374 store.set_config_tree(
375 &format!("red.collection.{collection}.vector.turbo.ready"),
376 &crate::serde_json::Value::Bool(true),
377 );
378 })
379 .expect("spawn turbo background rebuild thread")
380}
381
382/// Read the persisted readiness flag from the catalog. Used by
383/// admin/introspection paths that don't hold the runtime
384/// `TurboCollectionState` (e.g. SHOW COLLECTION metadata, cross-
385/// process tooling). The in-memory `TurboCollectionState::is_ready`
386/// is the authoritative live signal; this is the persisted shadow.
387pub fn ready_flag_from_catalog(store: &UnifiedStore, collection: &str) -> bool {
388 let key = format!("red.collection.{collection}.vector.turbo.ready");
389 match store.get_config(&key) {
390 Some(Value::Boolean(b)) => b,
391 _ => false,
392 }
393}
394
395#[cfg(test)]
396mod tests {
397 use super::*;
398 use crate::storage::unified::UnifiedStore;
399
400 #[test]
401 fn mark_and_detect_turbo_kind() {
402 let store = UnifiedStore::new();
403 assert!(!is_turbo(&store, "v"));
404 mark_as_turbo(&store, "v");
405 assert!(is_turbo(&store, "v"));
406 }
407}