reddb_server/runtime/vector_turbo_kind.rs
1//! Foundation for `KIND vector.turbo` collections (PRD #668, issue #693).
2//!
3//! Mirrors `blockchain_kind` for the marker side of the contract:
4//! `red.collection.{name}.kind = "turbo"` is the durable signal that
5//! distinguishes a TurboQuant-backed vector collection from the legacy
6//! `vector` kind. Routing in `impl_ddl`, `create_vector`, and the
7//! vector-search executor reads `is_turbo` to branch.
8//!
9//! The per-collection runtime state (`TurboCollectionState`) owns the
10//! in-memory `TurboQuantIndex` and a `TurboExtent` when a pager is
11//! available, plus the codec dimension / metric inherited from the
12//! collection contract.
13
14use std::path::PathBuf;
15use std::sync::Arc;
16use std::time::Duration;
17
18use parking_lot::{Condvar, Mutex};
19
20use crate::storage::engine::distance::DistanceMetric;
21use crate::storage::engine::turboquant::extent::TurboExtent;
22use crate::storage::engine::turboquant::index::TurboQuantIndex;
23use crate::storage::engine::turboquant::snapshot::{read_snapshot, write_snapshot, SnapshotError};
24use crate::storage::engine::Pager;
25use crate::storage::schema::Value;
26use crate::storage::unified::{EntityData, UnifiedStore};
27use crate::storage::EntityId;
28
29/// Value stored under `red.collection.{name}.kind` for vector.turbo
30/// collections. Must be distinct from `blockchain_kind::CHAIN_KIND_TAG`
31/// and from any future kind marker.
32pub const TURBO_KIND_TAG: &str = "turbo";
33
34/// Deterministic codec seed shared by every turbo collection in this
35/// process. The seed feeds the rotation matrix + codebook; a fixed seed
36/// guarantees that re-encoding the same vector across restarts produces
37/// the same code, which is what makes lazy re-population from the
38/// persisted vector entities safe.
39pub const TURBO_CODEC_SEED: u64 = 0x7155_8807_FED4_2913;
40
41fn kind_key(collection: &str) -> String {
42 format!("red.collection.{collection}.kind")
43}
44
45/// Persist the `turbo` kind marker. Called once at collection creation.
46/// Idempotent against `IF NOT EXISTS`: re-stamping the same value is a
47/// no-op at the config-tree layer.
48pub fn mark_as_turbo(store: &UnifiedStore, collection: &str) {
49 store.set_config_tree(
50 &kind_key(collection),
51 &crate::serde_json::Value::String(TURBO_KIND_TAG.to_string()),
52 );
53}
54
55/// True if `mark_as_turbo` was ever called for this collection. Cheap
56/// (one config-tree read) and safe to call on every INSERT/SEARCH —
57/// the legacy `vector` path takes the `false` branch with no extra
58/// cost beyond the lookup.
59pub fn is_turbo(store: &UnifiedStore, collection: &str) -> bool {
60 match store.get_config(&kind_key(collection)) {
61 Some(Value::Text(s)) => s.as_ref() == TURBO_KIND_TAG,
62 _ => false,
63 }
64}
65
66/// Per-collection runtime state for a `vector.turbo` collection.
67///
68/// `index` is the in-memory TurboQuant index that owns the encoded
69/// codes + raw vectors; `extent` is the per-collection page-backed
70/// payload buffer when the store is in paged mode (None for in-memory
71/// runtimes). Both are wrapped in `Mutex` because INSERTs serialize on
72/// the per-collection state — the contention point lives here rather
73/// than on the global store lock.
74pub struct TurboCollectionState {
75 pub dim: usize,
76 pub metric: DistanceMetric,
77 pub index: Mutex<TurboQuantIndex>,
78 pub extent: Mutex<Option<TurboExtent>>,
79 /// Set once the lazy rebuild from persisted vector entities has
80 /// happened. Subsequent INSERT/SEARCH calls skip the scan.
81 populated: std::sync::atomic::AtomicBool,
82 /// Issue #673 — per-collection readiness flag. Flips to `true`
83 /// only after the background rebuild completes. SEARCH against a
84 /// not-yet-ready collection waits (with bounded timeout) or
85 /// returns a structured NOT_READY response.
86 ready: std::sync::atomic::AtomicBool,
87 /// Condvar-paired wait surface for SEARCH callers that want to
88 /// block (bounded) until ready instead of immediately failing.
89 ready_signal: Arc<(Mutex<bool>, Condvar)>,
90 /// `.tv` snapshot file path for this collection (#674). `None` when
91 /// the active `StorageLayout` opts out of snapshot files
92 /// (`Minimal` / embedded). When `Some`, the checkpoint cycle dumps
93 /// here and boot prefers loading from here over rebuilding from
94 /// the extent.
95 snapshot_path: Mutex<Option<PathBuf>>,
96 /// Async-barrier handle for the most-recently-spawned snapshot
97 /// dump worker (#674). The next checkpoint cycle joins the
98 /// previous worker before starting a new one — bounds backpressure
99 /// to at most one in-flight dump per collection.
100 prev_snapshot_join: Mutex<Option<std::thread::JoinHandle<()>>>,
101}
102
103impl TurboCollectionState {
104 pub fn new(dim: usize, metric: DistanceMetric, pager: Option<&Arc<Pager>>) -> Self {
105 let index = TurboQuantIndex::new(dim, TURBO_CODEC_SEED);
106 let extent = pager
107 .and_then(|p| TurboExtent::new(Arc::clone(p)).ok())
108 .map(Some)
109 .unwrap_or(None);
110 Self {
111 dim,
112 metric,
113 index: Mutex::new(index),
114 extent: Mutex::new(extent),
115 populated: std::sync::atomic::AtomicBool::new(false),
116 ready: std::sync::atomic::AtomicBool::new(false),
117 ready_signal: Arc::new((Mutex::new(false), Condvar::new())),
118 snapshot_path: Mutex::new(None),
119 prev_snapshot_join: Mutex::new(None),
120 }
121 }
122
123 /// Set (or clear) the `.tv` snapshot path for this collection.
124 /// Called by `RedDB::turbo_state` once the resolved
125 /// `TieredLayoutPaths` is known.
126 pub fn set_snapshot_path(&self, path: Option<PathBuf>) {
127 *self.snapshot_path.lock() = path;
128 }
129
130 /// Current `.tv` snapshot path, if any.
131 pub fn snapshot_path(&self) -> Option<PathBuf> {
132 self.snapshot_path.lock().clone()
133 }
134
135 /// Returns the current readiness flag. `true` means the
136 /// background rebuild (or lazy populate) has completed and the
137 /// in-memory index reflects every WAL-acked INSERT.
138 pub fn is_ready(&self) -> bool {
139 self.ready.load(std::sync::atomic::Ordering::Acquire)
140 }
141
142 /// Mark the collection as ready. Called at the end of
143 /// `ensure_populated` and from the background rebuild worker.
144 /// Wakes any SEARCH callers parked on `wait_until_ready`.
145 fn mark_ready(&self) {
146 self.ready.store(true, std::sync::atomic::Ordering::Release);
147 let (lock, cv) = &*self.ready_signal;
148 let mut flag = lock.lock();
149 *flag = true;
150 cv.notify_all();
151 }
152
153 /// Block the caller (with a bounded timeout) until the
154 /// collection becomes ready. Returns `true` if ready within the
155 /// timeout, `false` if the timeout fired. A zero-or-negative
156 /// timeout still does one fast-path check.
157 pub fn wait_until_ready(&self, timeout: Duration) -> bool {
158 if self.is_ready() {
159 return true;
160 }
161 if timeout.is_zero() {
162 return self.is_ready();
163 }
164 let (lock, cv) = &*self.ready_signal;
165 let mut flag = lock.lock();
166 if *flag {
167 return true;
168 }
169 let _ = cv.wait_for(&mut flag, timeout);
170 *flag
171 }
172
173 /// Lazily populate the in-memory index from any vector entities
174 /// already persisted in the collection, then drain any WAL-replayed
175 /// `VectorInsert` records captured at store-open time (issue #694).
176 ///
177 /// Boot-time recovery: the WAL `VectorInsert` records are the
178 /// authoritative source for vectors that may not have made it into
179 /// the entity manager's persisted state (e.g. a crash between WAL
180 /// fsync and the next paged flush). Replaying them in WAL order
181 /// under a fixed codec seed reconstructs the in-memory
182 /// `TurboQuantIndex` deterministically — including the
183 /// partial-block tail introduced by ADR 0024.
184 ///
185 /// Non-vector traffic does not block on this rebuild: the runtime
186 /// only takes this path on the first turbo INSERT/SEARCH after
187 /// boot. `#673` wires the per-collection `ready: bool` flag on
188 /// top of this hook to keep vector traffic from observing a
189 /// half-built index while the rebuild is in flight.
190 pub fn ensure_populated(&self, store: &UnifiedStore, collection: &str) {
191 use std::sync::atomic::Ordering;
192 if self.populated.load(Ordering::Acquire) {
193 return;
194 }
195 let mut index = self.index.lock();
196 // Double-checked: another writer may have populated while we
197 // were waiting for the lock.
198 if self.populated.load(Ordering::Acquire) {
199 return;
200 }
201 // Snapshot-first boot (#674). When a valid `.tv` exists at
202 // the layout-derived path, replay its `(id, vector)` pairs in
203 // stored order. The deterministic codec seed reproduces
204 // byte-identical block/lane placement, so the index ends up
205 // identical to what a from-scratch entity scan would build —
206 // without walking the entity manager. Snapshots are purely a
207 // cache: any failure (missing, truncated, crc-bad, dim/seed
208 // drift) falls through to the legacy entity-scan path so
209 // boot still succeeds.
210 let mut snapshot_loaded = false;
211 if let Some(path) = self.snapshot_path.lock().clone() {
212 if path.exists() {
213 match read_snapshot(&path, self.dim as u32, TURBO_CODEC_SEED) {
214 Ok(payload) => {
215 for (raw_id, vector) in payload.vectors {
216 index.insert(EntityId::new(raw_id), vector);
217 }
218 snapshot_loaded = true;
219 }
220 Err(err) => {
221 tracing::warn!(
222 target: "reddb::turbo::snapshot",
223 collection,
224 path = %path.display(),
225 error = %err,
226 "vector.turbo snapshot unusable; falling back to extent/WAL rebuild",
227 );
228 }
229 }
230 }
231 }
232
233 if !snapshot_loaded {
234 if let Some(manager) = store.get_collection(collection) {
235 for entity in manager.query_all(|_| true) {
236 if let EntityData::Vector(data) = &entity.data {
237 if data.dense.len() == self.dim {
238 index.insert(entity.id, data.dense.clone());
239 }
240 }
241 }
242 }
243 }
244 // Drain WAL-replayed VectorInsert records (#694). Apply in WAL
245 // order so the resulting block/lane placement matches the
246 // pre-restart state byte-for-byte. Duplicate ids (same vector
247 // also present in the entity manager) overwrite via
248 // `TurboQuantIndex::insert`'s id-replace branch, which is safe
249 // and idempotent because the codec seed is constant.
250 if let Some(records) = store.take_replayed_turbo_inserts(collection) {
251 for (raw_id, vector) in records {
252 if vector.len() == self.dim {
253 index.insert(EntityId::new(raw_id), vector);
254 }
255 }
256 }
257 self.populated.store(true, Ordering::Release);
258 self.mark_ready();
259 }
260
261 /// Capture the current in-memory index state and dump it to the
262 /// configured `.tv` path on a worker thread (#674). The next
263 /// caller (typically the next WAL checkpoint cycle) blocks on the
264 /// previous worker before starting a new one — bounded
265 /// backpressure of at most one dump in flight per collection. The
266 /// WAL checkpoint itself never waits for the snapshot fsync.
267 ///
268 /// No-op when `snapshot_path` is `None` (`StorageLayout::Minimal`
269 /// or embedded mode) — preserves the single-file portability
270 /// story.
271 pub fn dump_snapshot_async(self: &Arc<Self>, lsn: u64) {
272 let Some(path) = self.snapshot_path.lock().clone() else {
273 return;
274 };
275
276 // Async barrier: wait for any previous dump to finish before
277 // taking a fresh snapshot. Joining here (and not inside the
278 // new worker) keeps the work serialized per collection and
279 // ensures the on-disk file moves monotonically forward.
280 if let Some(prev) = self.prev_snapshot_join.lock().take() {
281 let _ = prev.join();
282 }
283
284 // Capture state under the index lock — the encode/decode
285 // path uses the same lock — then drop the lock and serialize
286 // off-thread so the checkpoint completion latency is not
287 // blocked on snapshot fsync (acceptance criterion #674.5).
288 let dim = self.dim as u32;
289 let captured: Vec<(u64, Vec<f32>)> = {
290 let guard = self.index.lock();
291 guard
292 .iter_persisted()
293 .map(|(id, v)| (id.raw(), v.to_vec()))
294 .collect()
295 };
296
297 let path_for_worker = path.clone();
298 let handle = std::thread::Builder::new()
299 .name("turbo-snapshot-dump".to_string())
300 .spawn(move || {
301 if let Some(parent) = path_for_worker.parent() {
302 let _ = std::fs::create_dir_all(parent);
303 }
304 if let Err(err) =
305 write_snapshot(&path_for_worker, dim, TURBO_CODEC_SEED, lsn, &captured)
306 {
307 tracing::warn!(
308 target: "reddb::turbo::snapshot",
309 path = %path_for_worker.display(),
310 error = %err,
311 "vector.turbo snapshot dump failed; cache will be rebuilt on next checkpoint",
312 );
313 }
314 })
315 .ok();
316
317 *self.prev_snapshot_join.lock() = handle;
318 }
319
320 /// Join the in-flight snapshot worker, if any. Used by `RedDB::Drop`
321 /// to make sure no `.tv` write outlives the runtime, and by tests
322 /// that want to assert the on-disk state synchronously.
323 pub fn wait_snapshot(&self) {
324 if let Some(prev) = self.prev_snapshot_join.lock().take() {
325 let _ = prev.join();
326 }
327 }
328
329 /// Surface `SnapshotError::Io` to callers that need it. Currently
330 /// unused outside of tests but kept on the public path so the
331 /// snapshot module stays an implementation detail of this state.
332 #[allow(dead_code)]
333 pub(crate) fn snapshot_error_is_fatal(err: &SnapshotError) -> bool {
334 matches!(err, SnapshotError::Io(_))
335 }
336
337 /// Background-rebuild hook (#694 / #673). Drives the same code
338 /// path as `ensure_populated`; safe to call from a worker thread.
339 /// On completion the per-collection readiness flag flips and any
340 /// SEARCH callers parked on `wait_until_ready` are woken.
341 pub fn background_rebuild(&self, store: &UnifiedStore, collection: &str) {
342 self.ensure_populated(store, collection);
343 }
344}
345
346/// Public hook (#673) — issued by `RedDB::turbo_state` the first
347/// time a turbo collection's state is materialised. Spawns a worker
348/// thread that drains the WAL replay buffer + persisted vector
349/// entities and flips the readiness flag. Non-vector traffic never
350/// blocks on this; vector SEARCH/INSERT against a not-yet-ready
351/// collection observes the flag via `is_ready` / `wait_until_ready`.
352pub fn spawn_background_rebuild(
353 store: Arc<UnifiedStore>,
354 collection: String,
355 state: Arc<TurboCollectionState>,
356) -> std::thread::JoinHandle<()> {
357 // Worker holds a *weak* handle to the store so it can detect
358 // shutdown when the upgrade fails. The runtime registers the
359 // returned `JoinHandle` and joins it in `RedDB::Drop` before
360 // releasing its own `Arc<UnifiedStore>`, which is what
361 // guarantees a clean restart on the same database path.
362 let store_weak = Arc::downgrade(&store);
363 drop(store);
364 std::thread::Builder::new()
365 .name(format!("turbo-rebuild-{collection}"))
366 .spawn(move || {
367 let Some(store) = store_weak.upgrade() else {
368 return;
369 };
370 state.background_rebuild(&store, &collection);
371 store.set_config_tree(
372 &format!("red.collection.{collection}.vector.turbo.ready"),
373 &crate::serde_json::Value::Bool(true),
374 );
375 })
376 .expect("spawn turbo background rebuild thread")
377}
378
379/// Read the persisted readiness flag from the catalog. Used by
380/// admin/introspection paths that don't hold the runtime
381/// `TurboCollectionState` (e.g. SHOW COLLECTION metadata, cross-
382/// process tooling). The in-memory `TurboCollectionState::is_ready`
383/// is the authoritative live signal; this is the persisted shadow.
384pub fn ready_flag_from_catalog(store: &UnifiedStore, collection: &str) -> bool {
385 let key = format!("red.collection.{collection}.vector.turbo.ready");
386 match store.get_config(&key) {
387 Some(Value::Boolean(b)) => b,
388 _ => false,
389 }
390}
391
392#[cfg(test)]
393mod tests {
394 use super::*;
395 use crate::storage::unified::UnifiedStore;
396
397 #[test]
398 fn mark_and_detect_turbo_kind() {
399 let store = UnifiedStore::new();
400 assert!(!is_turbo(&store, "v"));
401 mark_as_turbo(&store, "v");
402 assert!(is_turbo(&store, "v"));
403 }
404}