Skip to main content

reddb_server/runtime/
vector_turbo_kind.rs

1//! Foundation for `KIND vector.turbo` collections (PRD #668, issue #693).
2//!
3//! Mirrors `blockchain_kind` for the marker side of the contract:
4//! `red.collection.{name}.kind = "turbo"` is the durable signal that
5//! distinguishes a TurboQuant-backed vector collection from the legacy
6//! `vector` kind. Routing in `impl_ddl`, `create_vector`, and the
7//! vector-search executor reads `is_turbo` to branch.
8//!
9//! The per-collection runtime state (`TurboCollectionState`) owns the
10//! in-memory `TurboQuantIndex` and a `TurboExtent` when a pager is
11//! available, plus the codec dimension / metric inherited from the
12//! collection contract.
13
14use std::path::PathBuf;
15use std::sync::Arc;
16use std::time::Duration;
17
18use parking_lot::{Condvar, Mutex};
19
20use crate::storage::engine::distance::DistanceMetric;
21use crate::storage::engine::turboquant::extent::TurboExtent;
22use crate::storage::engine::turboquant::index::TurboQuantIndex;
23use crate::storage::engine::turboquant::snapshot::{read_snapshot, write_snapshot, SnapshotError};
24use crate::storage::engine::Pager;
25use crate::storage::schema::Value;
26use crate::storage::unified::{EntityData, UnifiedStore};
27use crate::storage::EntityId;
28
29/// Value stored under `red.collection.{name}.kind` for vector.turbo
30/// collections. Must be distinct from `blockchain_kind::CHAIN_KIND_TAG`
31/// and from any future kind marker.
32pub const TURBO_KIND_TAG: &str = "turbo";
33
34/// Deterministic codec seed shared by every turbo collection in this
35/// process. The seed feeds the rotation matrix + codebook; a fixed seed
36/// guarantees that re-encoding the same vector across restarts produces
37/// the same code, which is what makes lazy re-population from the
38/// persisted vector entities safe.
39pub const TURBO_CODEC_SEED: u64 = 0x7155_8807_FED4_2913;
40
41fn kind_key(collection: &str) -> String {
42    format!("red.collection.{collection}.kind")
43}
44
45/// Persist the `turbo` kind marker. Called once at collection creation.
46/// Idempotent against `IF NOT EXISTS`: re-stamping the same value is a
47/// no-op at the config-tree layer.
48pub fn mark_as_turbo(store: &UnifiedStore, collection: &str) {
49    store.set_config_tree(
50        &kind_key(collection),
51        &crate::serde_json::Value::String(TURBO_KIND_TAG.to_string()),
52    );
53}
54
55/// True if `mark_as_turbo` was ever called for this collection. Cheap
56/// (one config-tree read) and safe to call on every INSERT/SEARCH —
57/// the legacy `vector` path takes the `false` branch with no extra
58/// cost beyond the lookup.
59pub fn is_turbo(store: &UnifiedStore, collection: &str) -> bool {
60    match store.get_config(&kind_key(collection)) {
61        Some(Value::Text(s)) => s.as_ref() == TURBO_KIND_TAG,
62        _ => false,
63    }
64}
65
66/// Per-collection runtime state for a `vector.turbo` collection.
67///
68/// `index` is the in-memory TurboQuant index that owns the encoded
69/// codes + raw vectors; `extent` is the per-collection page-backed
70/// payload buffer when the store is in paged mode (None for in-memory
71/// runtimes). Both are wrapped in `Mutex` because INSERTs serialize on
72/// the per-collection state — the contention point lives here rather
73/// than on the global store lock.
74pub struct TurboCollectionState {
75    pub dim: usize,
76    pub metric: DistanceMetric,
77    pub index: Mutex<TurboQuantIndex>,
78    pub extent: Mutex<Option<TurboExtent>>,
79    /// Set once the lazy rebuild from persisted vector entities has
80    /// happened. Subsequent INSERT/SEARCH calls skip the scan.
81    populated: std::sync::atomic::AtomicBool,
82    /// Issue #673 — per-collection readiness flag. Flips to `true`
83    /// only after the background rebuild completes. SEARCH against a
84    /// not-yet-ready collection waits (with bounded timeout) or
85    /// returns a structured NOT_READY response.
86    ready: std::sync::atomic::AtomicBool,
87    /// Condvar-paired wait surface for SEARCH callers that want to
88    /// block (bounded) until ready instead of immediately failing.
89    ready_signal: Arc<(Mutex<bool>, Condvar)>,
90    /// `.tv` snapshot file path for this collection (#674). `None` when
91    /// the active `StorageLayout` opts out of snapshot files
92    /// (`Minimal` / embedded). When `Some`, the checkpoint cycle dumps
93    /// here and boot prefers loading from here over rebuilding from
94    /// the extent.
95    snapshot_path: Mutex<Option<PathBuf>>,
96    /// Async-barrier handle for the most-recently-spawned snapshot
97    /// dump worker (#674). The next checkpoint cycle joins the
98    /// previous worker before starting a new one — bounds backpressure
99    /// to at most one in-flight dump per collection.
100    prev_snapshot_join: Mutex<Option<std::thread::JoinHandle<()>>>,
101}
102
103impl TurboCollectionState {
104    pub fn new(dim: usize, metric: DistanceMetric, pager: Option<&Arc<Pager>>) -> Self {
105        let index = TurboQuantIndex::new(dim, TURBO_CODEC_SEED);
106        let extent = pager
107            .and_then(|p| TurboExtent::new(Arc::clone(p)).ok())
108            .map(Some)
109            .unwrap_or(None);
110        Self {
111            dim,
112            metric,
113            index: Mutex::new(index),
114            extent: Mutex::new(extent),
115            populated: std::sync::atomic::AtomicBool::new(false),
116            ready: std::sync::atomic::AtomicBool::new(false),
117            ready_signal: Arc::new((Mutex::new(false), Condvar::new())),
118            snapshot_path: Mutex::new(None),
119            prev_snapshot_join: Mutex::new(None),
120        }
121    }
122
123    /// Set (or clear) the `.tv` snapshot path for this collection.
124    /// Called by `RedDB::turbo_state` once the resolved
125    /// `TieredLayoutPaths` is known.
126    pub fn set_snapshot_path(&self, path: Option<PathBuf>) {
127        *self.snapshot_path.lock() = path;
128    }
129
130    /// Current `.tv` snapshot path, if any.
131    pub fn snapshot_path(&self) -> Option<PathBuf> {
132        self.snapshot_path.lock().clone()
133    }
134
135    /// Returns the current readiness flag. `true` means the
136    /// background rebuild (or lazy populate) has completed and the
137    /// in-memory index reflects every WAL-acked INSERT.
138    pub fn is_ready(&self) -> bool {
139        self.ready.load(std::sync::atomic::Ordering::Acquire)
140    }
141
142    /// Mark the collection as ready. Called at the end of
143    /// `ensure_populated` and from the background rebuild worker.
144    /// Wakes any SEARCH callers parked on `wait_until_ready`.
145    fn mark_ready(&self) {
146        self.ready.store(true, std::sync::atomic::Ordering::Release);
147        let (lock, cv) = &*self.ready_signal;
148        let mut flag = lock.lock();
149        *flag = true;
150        cv.notify_all();
151    }
152
153    /// Block the caller (with a bounded timeout) until the
154    /// collection becomes ready. Returns `true` if ready within the
155    /// timeout, `false` if the timeout fired. A zero-or-negative
156    /// timeout still does one fast-path check.
157    pub fn wait_until_ready(&self, timeout: Duration) -> bool {
158        if self.is_ready() {
159            return true;
160        }
161        if timeout.is_zero() {
162            return self.is_ready();
163        }
164        let (lock, cv) = &*self.ready_signal;
165        let mut flag = lock.lock();
166        if *flag {
167            return true;
168        }
169        let _ = cv.wait_for(&mut flag, timeout);
170        *flag
171    }
172
173    /// Lazily populate the in-memory index from any vector entities
174    /// already persisted in the collection, then drain any WAL-replayed
175    /// `VectorInsert` records captured at store-open time (issue #694).
176    ///
177    /// Boot-time recovery: the WAL `VectorInsert` records are the
178    /// authoritative source for vectors that may not have made it into
179    /// the entity manager's persisted state (e.g. a crash between WAL
180    /// fsync and the next paged flush). Replaying them in WAL order
181    /// under a fixed codec seed reconstructs the in-memory
182    /// `TurboQuantIndex` deterministically — including the
183    /// partial-block tail introduced by ADR 0024.
184    ///
185    /// Non-vector traffic does not block on this rebuild: the runtime
186    /// only takes this path on the first turbo INSERT/SEARCH after
187    /// boot. `#673` wires the per-collection `ready: bool` flag on
188    /// top of this hook to keep vector traffic from observing a
189    /// half-built index while the rebuild is in flight.
190    pub fn ensure_populated(&self, store: &UnifiedStore, collection: &str) {
191        use std::sync::atomic::Ordering;
192        if self.populated.load(Ordering::Acquire) {
193            return;
194        }
195        let mut index = self.index.lock();
196        // Double-checked: another writer may have populated while we
197        // were waiting for the lock.
198        if self.populated.load(Ordering::Acquire) {
199            return;
200        }
201        // Snapshot-first boot (#674). When a valid `.tv` exists at
202        // the layout-derived path, replay its `(id, vector)` pairs in
203        // stored order. The deterministic codec seed reproduces
204        // byte-identical block/lane placement, so the index ends up
205        // identical to what a from-scratch entity scan would build —
206        // without walking the entity manager. Snapshots are purely a
207        // cache: any failure (missing, truncated, crc-bad, dim/seed
208        // drift) falls through to the legacy entity-scan path so
209        // boot still succeeds.
210        let mut snapshot_loaded = false;
211        if let Some(path) = self.snapshot_path.lock().clone() {
212            if path.exists() {
213                match read_snapshot(&path, self.dim as u32, TURBO_CODEC_SEED) {
214                    Ok(payload) => {
215                        for (raw_id, vector) in payload.vectors {
216                            index.insert(EntityId::new(raw_id), vector);
217                        }
218                        snapshot_loaded = true;
219                    }
220                    Err(err) => {
221                        tracing::warn!(
222                            target: "reddb::turbo::snapshot",
223                            collection,
224                            path = %path.display(),
225                            error = %err,
226                            "vector.turbo snapshot unusable; falling back to extent/WAL rebuild",
227                        );
228                    }
229                }
230            }
231        }
232
233        if !snapshot_loaded {
234            if let Some(manager) = store.get_collection(collection) {
235                for entity in manager.query_all(|_| true) {
236                    if let EntityData::Vector(data) = &entity.data {
237                        if data.dense.len() == self.dim {
238                            index.insert(entity.id, data.dense.clone());
239                        }
240                    }
241                }
242            }
243        }
244        // Drain WAL-replayed VectorInsert records (#694). Apply in WAL
245        // order so the resulting block/lane placement matches the
246        // pre-restart state byte-for-byte. Duplicate ids (same vector
247        // also present in the entity manager) overwrite via
248        // `TurboQuantIndex::insert`'s id-replace branch, which is safe
249        // and idempotent because the codec seed is constant.
250        if let Some(records) = store.take_replayed_turbo_inserts(collection) {
251            for (raw_id, vector) in records {
252                if vector.len() == self.dim {
253                    index.insert(EntityId::new(raw_id), vector);
254                }
255            }
256        }
257        self.populated.store(true, Ordering::Release);
258        self.mark_ready();
259    }
260
261    /// Capture the current in-memory index state and dump it to the
262    /// configured `.tv` path on a worker thread (#674). The next
263    /// caller (typically the next WAL checkpoint cycle) blocks on the
264    /// previous worker before starting a new one — bounded
265    /// backpressure of at most one dump in flight per collection. The
266    /// WAL checkpoint itself never waits for the snapshot fsync.
267    ///
268    /// No-op when `snapshot_path` is `None` (`StorageLayout::Minimal`
269    /// or embedded mode) — preserves the single-file portability
270    /// story.
271    pub fn dump_snapshot_async(self: &Arc<Self>, lsn: u64) {
272        let Some(path) = self.snapshot_path.lock().clone() else {
273            return;
274        };
275
276        // Async barrier: wait for any previous dump to finish before
277        // taking a fresh snapshot. Joining here (and not inside the
278        // new worker) keeps the work serialized per collection and
279        // ensures the on-disk file moves monotonically forward.
280        if let Some(prev) = self.prev_snapshot_join.lock().take() {
281            let _ = prev.join();
282        }
283
284        // Capture state under the index lock — the encode/decode
285        // path uses the same lock — then drop the lock and serialize
286        // off-thread so the checkpoint completion latency is not
287        // blocked on snapshot fsync (acceptance criterion #674.5).
288        let dim = self.dim as u32;
289        let captured: Vec<(u64, Vec<f32>)> = {
290            let guard = self.index.lock();
291            guard
292                .iter_persisted()
293                .map(|(id, v)| (id.raw(), v.to_vec()))
294                .collect()
295        };
296
297        let path_for_worker = path.clone();
298        let handle = std::thread::Builder::new()
299            .name("turbo-snapshot-dump".to_string())
300            .spawn(move || {
301                if let Some(parent) = path_for_worker.parent() {
302                    let _ = std::fs::create_dir_all(parent);
303                }
304                if let Err(err) =
305                    write_snapshot(&path_for_worker, dim, TURBO_CODEC_SEED, lsn, &captured)
306                {
307                    tracing::warn!(
308                        target: "reddb::turbo::snapshot",
309                        path = %path_for_worker.display(),
310                        error = %err,
311                        "vector.turbo snapshot dump failed; cache will be rebuilt on next checkpoint",
312                    );
313                }
314            })
315            .ok();
316
317        *self.prev_snapshot_join.lock() = handle;
318    }
319
320    /// Join the in-flight snapshot worker, if any. Used by `RedDB::Drop`
321    /// to make sure no `.tv` write outlives the runtime, and by tests
322    /// that want to assert the on-disk state synchronously.
323    pub fn wait_snapshot(&self) {
324        if let Some(prev) = self.prev_snapshot_join.lock().take() {
325            let _ = prev.join();
326        }
327    }
328
329    /// Surface `SnapshotError::Io` to callers that need it. Currently
330    /// unused outside of tests but kept on the public path so the
331    /// snapshot module stays an implementation detail of this state.
332    #[allow(dead_code)]
333    pub(crate) fn snapshot_error_is_fatal(err: &SnapshotError) -> bool {
334        matches!(err, SnapshotError::Io(_))
335    }
336
337    /// Background-rebuild hook (#694 / #673). Drives the same code
338    /// path as `ensure_populated`; safe to call from a worker thread.
339    /// On completion the per-collection readiness flag flips and any
340    /// SEARCH callers parked on `wait_until_ready` are woken.
341    pub fn background_rebuild(&self, store: &UnifiedStore, collection: &str) {
342        self.ensure_populated(store, collection);
343    }
344}
345
346/// Public hook (#673) — issued by `RedDB::turbo_state` the first
347/// time a turbo collection's state is materialised. Spawns a worker
348/// thread that drains the WAL replay buffer + persisted vector
349/// entities and flips the readiness flag. Non-vector traffic never
350/// blocks on this; vector SEARCH/INSERT against a not-yet-ready
351/// collection observes the flag via `is_ready` / `wait_until_ready`.
352pub fn spawn_background_rebuild(
353    store: Arc<UnifiedStore>,
354    collection: String,
355    state: Arc<TurboCollectionState>,
356) -> std::thread::JoinHandle<()> {
357    // Worker holds a *weak* handle to the store so it can detect
358    // shutdown when the upgrade fails. The runtime registers the
359    // returned `JoinHandle` and joins it in `RedDB::Drop` before
360    // releasing its own `Arc<UnifiedStore>`, which is what
361    // guarantees a clean restart on the same database path.
362    let store_weak = Arc::downgrade(&store);
363    drop(store);
364    std::thread::Builder::new()
365        .name(format!("turbo-rebuild-{collection}"))
366        .spawn(move || {
367            let Some(store) = store_weak.upgrade() else {
368                return;
369            };
370            state.background_rebuild(&store, &collection);
371            store.set_config_tree(
372                &format!("red.collection.{collection}.vector.turbo.ready"),
373                &crate::serde_json::Value::Bool(true),
374            );
375        })
376        .expect("spawn turbo background rebuild thread")
377}
378
379/// Read the persisted readiness flag from the catalog. Used by
380/// admin/introspection paths that don't hold the runtime
381/// `TurboCollectionState` (e.g. SHOW COLLECTION metadata, cross-
382/// process tooling). The in-memory `TurboCollectionState::is_ready`
383/// is the authoritative live signal; this is the persisted shadow.
384pub fn ready_flag_from_catalog(store: &UnifiedStore, collection: &str) -> bool {
385    let key = format!("red.collection.{collection}.vector.turbo.ready");
386    match store.get_config(&key) {
387        Some(Value::Boolean(b)) => b,
388        _ => false,
389    }
390}
391
392#[cfg(test)]
393mod tests {
394    use super::*;
395    use crate::storage::unified::UnifiedStore;
396
397    #[test]
398    fn mark_and_detect_turbo_kind() {
399        let store = UnifiedStore::new();
400        assert!(!is_turbo(&store, "v"));
401        mark_as_turbo(&store, "v");
402        assert!(is_turbo(&store, "v"));
403    }
404}