Skip to main content

dynomite/vector/
registry.rs

1//! Vector index registry.
2//!
3//! [`VectorRegistry`] is the per-server map of index name to
4//! [`VectorTable`]. It is the single source of truth that the
5//! FT.* command handlers (Phase C) will consult to dispatch
6//! `FT.CREATE` / `FT.SEARCH` / `FT.INFO` / `FT.DROPINDEX`.
7//!
8//! Concurrency model:
9//!
10//! * The registry is held behind a [`parking_lot::RwLock`] over
11//!   a [`BTreeMap`] keyed by index name. Reads (lookups, FT.LIST)
12//!   take the read lock; mutations (create / drop) take the
13//!   write lock.
14//! * Each [`VectorTable`] is wrapped in [`Arc`] so a reader can
15//!   drop the lock immediately after a lookup and continue to
16//!   work against a stable handle.
17//! * The underlying [`dynvec::Engine`] inside each table is
18//!   itself an [`Arc`]-wrapped storage handle; read paths
19//!   (FT.SEARCH) do not block write paths (HSET / FT.ADD).
20
21use std::collections::{BTreeMap, BTreeSet};
22use std::sync::Arc;
23
24use parking_lot::{Mutex, RwLock};
25use thiserror::Error;
26
27use crate::vector::schema::{IndexAlgorithm, MetadataFieldType, VectorSchema};
28use dyntext::TextIndex;
29use dynvec::Engine;
30
31/// Errors returned by the registry.
32#[derive(Debug, Error)]
33#[non_exhaustive]
34pub enum RegistryError {
35    /// An index with that name already exists.
36    #[error("index already exists: {0}")]
37    AlreadyExists(String),
38    /// No index registered under that name.
39    #[error("index not found: {0}")]
40    NotFound(String),
41    /// The schema asks for an algorithm the engine does not
42    /// implement yet (today: [`IndexAlgorithm::Flat`]).
43    #[error("unsupported index algorithm: {0:?}")]
44    UnsupportedAlgorithm(IndexAlgorithm),
45    /// Engine-level failure during [`Engine::in_memory`].
46    #[error("engine: {0}")]
47    Engine(#[from] dynvec::storage::StoreError),
48}
49
50/// Per-`TEXT` schema field state.
51///
52/// Couples one [`dyntext::TextIndex`] (trigram + bloom
53/// inverted index) with the bookkeeping the FT.* surface
54/// needs to map between user-visible document keys and the
55/// internal monotonic doc ids the text index hands back. The
56/// pairing is one [`TextFieldIndex`] per `TEXT` schema field
57/// per registered index; the registry initialises one for
58/// every metadata field of type [`MetadataFieldType::Text`]
59/// at FT.CREATE time.
60#[derive(Debug, Default)]
61pub struct TextFieldIndex {
62    /// Trigram + bloom inverted index over the field's bytes.
63    pub index: TextIndex,
64    /// Internal text-doc-id -> user-visible document key.
65    pub doc_to_key: BTreeMap<u32, Vec<u8>>,
66    /// User-visible document key -> internal text-doc-id.
67    /// Used to evict the prior entry when the same key is
68    /// re-HSET-ed under an updated field value.
69    pub key_to_doc: BTreeMap<Vec<u8>, u32>,
70}
71
72/// Pair of (document key, raw text bytes) returned by the
73/// per-text-field search helpers on [`VectorTable`]. Each
74/// hit echoes the user-visible document key plus the
75/// original bytes the FT.* surface stored under the queried
76/// `TEXT` field, so callers can render the response without
77/// a second round trip to the dynvec engine.
78pub type TextHit = (Vec<u8>, Vec<u8>);
79
80/// Result of a regex query through the trigram-backed text
81/// index. The outer [`Option`] is `None` when no `TEXT`
82/// field by that name is declared; the inner [`Result`]
83/// surfaces a regex compilation error.
84pub type TextRegexResult = Option<Result<Vec<TextHit>, dyntext::regex_ast::RegexError>>;
85
86/// Result of an approximate-regex query through the TRE
87/// engine. The outer [`Option`] is `None` when no `TEXT`
88/// field by that name is declared; the inner [`Result`]
89/// surfaces a TRE-engine compilation or matching error.
90pub type TextRegexApproxResult = Option<Result<Vec<TextHit>, dyntext::TreError>>;
91
92/// One registered vector index.
93///
94/// A [`VectorTable`] couples the protocol-level [`VectorSchema`]
95/// (what the client asked for) with the storage-level
96/// [`Engine`] (what is actually persisted). The pair is
97/// immutable for the lifetime of the index; rebuilding a
98/// schema means dropping and recreating the table.
99///
100/// Alongside the schema and engine, the table tracks the set
101/// of document keys that the FT.* surface has indexed via
102/// HSET interception. The set is used by
103/// [`VectorRegistry::drop_with_dd`] to enumerate the
104/// underlying hash documents that should also be removed.
105#[derive(Debug)]
106pub struct VectorTable {
107    /// Index name (the FT.CREATE first argument).
108    pub name: String,
109    /// Compiled schema.
110    pub schema: VectorSchema,
111    /// Storage + index engine.
112    pub engine: Engine,
113    /// Document keys observed by the HSET interception path.
114    indexed_keys: Mutex<BTreeSet<Vec<u8>>>,
115    /// Per-`TEXT`-field trigram index map. The map is keyed
116    /// by schema field name and is initialised with one
117    /// entry per `TEXT` field declared in the schema. The
118    /// keys are stable for the lifetime of the table; only
119    /// the per-entry [`TextFieldIndex`] state mutates as
120    /// HSETs land.
121    text_indexes: Mutex<BTreeMap<String, TextFieldIndex>>,
122}
123
124impl VectorTable {
125    /// Record `key` as having been indexed. Idempotent.
126    pub fn record_indexed_key(&self, key: Vec<u8>) {
127        self.indexed_keys.lock().insert(key);
128    }
129
130    /// Snapshot the set of indexed keys.
131    #[must_use]
132    pub fn indexed_keys(&self) -> Vec<Vec<u8>> {
133        self.indexed_keys.lock().iter().cloned().collect()
134    }
135
136    /// True when the schema declares a `TEXT` field named
137    /// `field`. The check is case-sensitive (the FT.CREATE
138    /// parser preserves the field name verbatim). After an
139    /// `FT.ALTER ADD <field> TEXT` the schema vector remains
140    /// frozen (it lives on an immutable `Arc<VectorTable>`),
141    /// so this method also consults the runtime
142    /// [`TextFieldIndex`] map: a field that the registry has
143    /// provisioned a trigram index for is treated as a TEXT
144    /// field for the lifetime of the table.
145    #[must_use]
146    pub fn has_text_field(&self, field: &str) -> bool {
147        let in_schema = self
148            .schema
149            .metadata_fields
150            .iter()
151            .any(|f| f.field_type == MetadataFieldType::Text && f.name == field);
152        if in_schema {
153            return true;
154        }
155        self.text_indexes.lock().contains_key(field)
156    }
157
158    /// Provision a runtime [`TextFieldIndex`] for `field`.
159    ///
160    /// Used by `FT.ALTER ADD <field> TEXT` to extend an
161    /// already-registered table with a new text-indexed
162    /// field. Idempotent: a second call for the same field
163    /// is a no-op and returns `false`.
164    ///
165    /// Returns `true` when a new index slot was provisioned,
166    /// `false` when the field was already known (either as
167    /// part of the original schema or because a prior
168    /// `FT.ALTER` provisioned it).
169    pub fn add_text_field(&self, field: &str) -> bool {
170        let mut guard = self.text_indexes.lock();
171        if guard.contains_key(field) {
172            return false;
173        }
174        guard.insert(field.to_string(), TextFieldIndex::default());
175        true
176    }
177
178    /// Snapshot the set of TEXT fields known to this table:
179    /// the original `SCHEMA` declarations plus anything
180    /// provisioned later through [`Self::add_text_field`].
181    /// Names are returned in lexicographic order.
182    #[must_use]
183    pub fn text_field_names(&self) -> Vec<String> {
184        let mut names: BTreeSet<String> = BTreeSet::new();
185        for f in &self.schema.metadata_fields {
186            if f.field_type == MetadataFieldType::Text {
187                names.insert(f.name.clone());
188            }
189        }
190        for k in self.text_indexes.lock().keys() {
191            names.insert(k.clone());
192        }
193        names.into_iter().collect()
194    }
195
196    /// True when the registry has provisioned a [`TextIndex`]
197    /// for `field`. The check returns `true` exactly when
198    /// [`Self::has_text_field`] returns `true`; exposed
199    /// separately so wire-level tests can assert that the
200    /// FT.CREATE path actually populated the registry rather
201    /// than just recorded the schema.
202    #[must_use]
203    pub fn has_text_index(&self, field: &str) -> bool {
204        self.text_indexes.lock().contains_key(field)
205    }
206
207    /// Number of documents currently indexed under `field`.
208    /// Returns `None` when no `TEXT` field by that name is
209    /// declared in the schema.
210    #[must_use]
211    pub fn text_index_doc_count(&self, field: &str) -> Option<usize> {
212        self.text_indexes
213            .lock()
214            .get(field)
215            .map(|state| state.index.doc_count())
216    }
217
218    /// Insert `text` into the [`TextIndex`] for `field`,
219    /// associating it with the user-visible `key`. If the
220    /// same `key` had a prior entry under this field it is
221    /// removed first so the postings index never accumulates
222    /// stale doc ids.
223    ///
224    /// No-op when the schema has no `TEXT` field by that
225    /// name; callers can therefore call this for every
226    /// HSET field/value pair without prior schema lookup.
227    pub fn upsert_text_field(&self, field: &str, key: &[u8], text: &[u8]) {
228        let mut guard = self.text_indexes.lock();
229        let Some(state) = guard.get_mut(field) else {
230            return;
231        };
232        if let Some(prev_id) = state.key_to_doc.remove(key) {
233            state.doc_to_key.remove(&prev_id);
234            state.index.remove(prev_id);
235        }
236        let doc_id = state.index.insert(text.to_vec());
237        state.doc_to_key.insert(doc_id, key.to_vec());
238        state.key_to_doc.insert(key.to_vec(), doc_id);
239    }
240
241    /// Run an exact-substring lookup against the [`TextIndex`]
242    /// registered under `field`. Returns the user-visible
243    /// keys whose stored text contains `query` as a contiguous
244    /// byte substring, paired with the original text bytes.
245    ///
246    /// Returns `None` when no `TEXT` field by that name is
247    /// declared in the schema. Callers translate that into a
248    /// `-ERR` reply.
249    #[must_use]
250    pub fn search_text_substring(&self, field: &str, query: &[u8]) -> Option<Vec<TextHit>> {
251        let guard = self.text_indexes.lock();
252        let state = guard.get(field)?;
253        let mut hits: Vec<TextHit> = Vec::new();
254        for doc_id in state.index.search_substring(query) {
255            let Some(key) = state.doc_to_key.get(&doc_id) else {
256                continue;
257            };
258            let Some(doc) = state.index.docs().get(&doc_id) else {
259                continue;
260            };
261            hits.push((key.clone(), doc.text.clone()));
262        }
263        Some(hits)
264    }
265
266    /// Run an exact-regex lookup against the [`TextIndex`]
267    /// registered under `field`. Returns the user-visible
268    /// keys whose stored text matches `pattern`, paired with
269    /// the original text bytes.
270    ///
271    /// Returns `None` when no `TEXT` field by that name is
272    /// declared in the schema, or `Some(Err(...))` when the
273    /// pattern fails to compile.
274    pub fn search_text_regex(&self, field: &str, pattern: &str) -> TextRegexResult {
275        let guard = self.text_indexes.lock();
276        let state = guard.get(field)?;
277        let result = state.index.search_regex(pattern).map(|ids| {
278            let mut out: Vec<TextHit> = Vec::new();
279            for doc_id in ids {
280                let Some(key) = state.doc_to_key.get(&doc_id) else {
281                    continue;
282                };
283                let Some(doc) = state.index.docs().get(&doc_id) else {
284                    continue;
285                };
286                out.push((key.clone(), doc.text.clone()));
287            }
288            out
289        });
290        Some(result)
291    }
292
293    /// Run an approximate-regex lookup against the
294    /// [`TextIndex`] registered under `field` with up to
295    /// `max_errors` edit operations. Returns the user-visible
296    /// keys whose stored text approximately matches `pattern`,
297    /// paired with the original text bytes.
298    ///
299    /// Returns `None` when no `TEXT` field by that name is
300    /// declared in the schema, or `Some(Err(...))` when the
301    /// pattern fails to compile through the TRE engine.
302    pub fn search_text_regex_approx(
303        &self,
304        field: &str,
305        pattern: &str,
306        max_errors: u16,
307    ) -> TextRegexApproxResult {
308        let guard = self.text_indexes.lock();
309        let state = guard.get(field)?;
310        let result = state
311            .index
312            .search_regex_approx(pattern, max_errors)
313            .map(|ids| {
314                let mut out: Vec<TextHit> = Vec::new();
315                for doc_id in ids {
316                    let Some(key) = state.doc_to_key.get(&doc_id) else {
317                        continue;
318                    };
319                    let Some(doc) = state.index.docs().get(&doc_id) else {
320                        continue;
321                    };
322                    out.push((key.clone(), doc.text.clone()));
323                }
324                out
325            });
326        Some(result)
327    }
328}
329
330/// Snapshot view of one registered index.
331///
332/// Returned by [`VectorRegistry::info`] for the FT.INFO command
333/// path. Kept distinct from [`VectorTable`] so the FT.INFO
334/// handler can serialise a stable, copy-safe summary without
335/// locking the registry across the response.
336#[derive(Clone, Debug, PartialEq)]
337pub struct VectorTableInfo {
338    /// Index name.
339    pub name: String,
340    /// Frozen vector dimension.
341    pub dim: u16,
342    /// Distance metric.
343    pub distance: crate::vector::schema::DistanceMetric,
344    /// Index algorithm.
345    pub algorithm: IndexAlgorithm,
346    /// Live (non-tombstoned) row count.
347    pub live_rows: usize,
348    /// Number of tracked rows (live + soft-deleted).
349    pub tracked_rows: usize,
350}
351
352/// Per-server vector index registry.
353///
354/// The registry owns the [`VectorTable`] map; FT.* command
355/// handlers consult it on every command. Construct one with
356/// [`VectorRegistry::new`] (typically as a field on the
357/// dynomite [`crate::core::context::Context`]) and clone the
358/// returned handle freely; clones share state.
359#[derive(Clone, Default)]
360pub struct VectorRegistry {
361    inner: Arc<RwLock<BTreeMap<String, Arc<VectorTable>>>>,
362}
363
364impl std::fmt::Debug for VectorRegistry {
365    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
366        let names: Vec<String> = self.inner.read().keys().cloned().collect();
367        f.debug_struct("VectorRegistry")
368            .field("indexes", &names)
369            .finish()
370    }
371}
372
373impl VectorRegistry {
374    /// Build an empty registry.
375    #[must_use]
376    pub fn new() -> Self {
377        Self::default()
378    }
379
380    /// Register a new index.
381    ///
382    /// The schema's [`IndexAlgorithm`] is validated against the
383    /// engine's capabilities; today only
384    /// [`IndexAlgorithm::Hnsw`] is supported. The engine is
385    /// instantiated as an in-memory [`dynvec::Engine`]; on-disk
386    /// backends will plug in once the Noxu storage path lands.
387    ///
388    /// # Errors
389    ///
390    /// * [`RegistryError::AlreadyExists`] when `name` is in use.
391    /// * [`RegistryError::UnsupportedAlgorithm`] when the
392    ///   schema selects an algorithm we do not implement.
393    /// * [`RegistryError::Engine`] when the underlying engine
394    ///   refuses the schema.
395    pub fn create(&self, name: String, schema: VectorSchema) -> Result<(), RegistryError> {
396        if !matches!(schema.algorithm, IndexAlgorithm::Hnsw) {
397            return Err(RegistryError::UnsupportedAlgorithm(schema.algorithm));
398        }
399        let mut guard = self.inner.write();
400        if guard.contains_key(&name) {
401            return Err(RegistryError::AlreadyExists(name));
402        }
403        let engine_schema = schema.to_engine_schema(&name);
404        let engine = Engine::in_memory(engine_schema)?;
405        let mut text_indexes: BTreeMap<String, TextFieldIndex> = BTreeMap::new();
406        for f in &schema.metadata_fields {
407            if f.field_type == MetadataFieldType::Text {
408                text_indexes.insert(f.name.clone(), TextFieldIndex::default());
409            }
410        }
411        let table = VectorTable {
412            name: name.clone(),
413            schema,
414            engine,
415            indexed_keys: Mutex::new(BTreeSet::new()),
416            text_indexes: Mutex::new(text_indexes),
417        };
418        guard.insert(name, Arc::new(table));
419        Ok(())
420    }
421
422    /// Drop the index `name`.
423    ///
424    /// Returns the prior table (so callers can decide whether
425    /// to also delete underlying documents, mimicking the
426    /// `FT.DROPINDEX ... DD` flag).
427    ///
428    /// # Errors
429    ///
430    /// [`RegistryError::NotFound`] when no index is registered
431    /// under `name`.
432    pub fn drop(&self, name: &str) -> Result<Arc<VectorTable>, RegistryError> {
433        let mut guard = self.inner.write();
434        guard
435            .remove(name)
436            .ok_or_else(|| RegistryError::NotFound(name.to_string()))
437    }
438
439    /// Drop the index `name` and return the set of document
440    /// keys that the FT.* surface had observed under it.
441    ///
442    /// Used by `FT.DROPINDEX ... DD` to enumerate the hash
443    /// documents the caller should also delete from the
444    /// underlying datastore.
445    ///
446    /// # Errors
447    ///
448    /// [`RegistryError::NotFound`] when no index is registered
449    /// under `name`.
450    pub fn drop_with_dd(&self, name: &str) -> Result<Vec<Vec<u8>>, RegistryError> {
451        let table = self.drop(name)?;
452        Ok(table.indexed_keys())
453    }
454
455    /// Look up a registered table by name.
456    ///
457    /// Returns a cloned [`Arc`] so the caller can drop the
458    /// registry lock immediately after the lookup.
459    #[must_use]
460    pub fn get(&self, name: &str) -> Option<Arc<VectorTable>> {
461        self.inner.read().get(name).cloned()
462    }
463
464    /// List every registered index by name, sorted
465    /// alphabetically.
466    #[must_use]
467    pub fn list(&self) -> Vec<String> {
468        self.inner.read().keys().cloned().collect()
469    }
470
471    /// Snapshot the FT.INFO view of `name`.
472    #[must_use]
473    pub fn info(&self, name: &str) -> Option<VectorTableInfo> {
474        let table = self.get(name)?;
475        let stats = table.engine.stats().ok()?;
476        Some(VectorTableInfo {
477            name: table.name.clone(),
478            dim: table.schema.dim,
479            distance: table.schema.distance,
480            algorithm: table.schema.algorithm,
481            live_rows: stats.live_rows,
482            tracked_rows: stats.tracked_rows,
483        })
484    }
485}
486
487#[cfg(test)]
488mod tests {
489    use super::*;
490    use crate::vector::schema::{DistanceMetric, IndexAlgorithm, VectorType};
491
492    fn schema(algorithm: IndexAlgorithm) -> VectorSchema {
493        VectorSchema {
494            vector_field: "vec".to_string(),
495            vector_type: VectorType::Float32,
496            dim: 4,
497            distance: DistanceMetric::Cosine,
498            algorithm,
499            prefixes: Vec::new(),
500            metadata_fields: Vec::new(),
501        }
502    }
503
504    #[test]
505    fn create_and_get_returns_table() {
506        let reg = VectorRegistry::new();
507        reg.create("idx".to_string(), schema(IndexAlgorithm::Hnsw))
508            .unwrap();
509        let table = reg.get("idx").expect("table present");
510        assert_eq!(table.name, "idx");
511        assert_eq!(table.schema.dim, 4);
512    }
513
514    #[test]
515    fn duplicate_name_errors() {
516        let reg = VectorRegistry::new();
517        reg.create("idx".to_string(), schema(IndexAlgorithm::Hnsw))
518            .unwrap();
519        let err = reg
520            .create("idx".to_string(), schema(IndexAlgorithm::Hnsw))
521            .unwrap_err();
522        assert!(matches!(err, RegistryError::AlreadyExists(_)));
523    }
524
525    #[test]
526    fn unsupported_algorithm_errors() {
527        let reg = VectorRegistry::new();
528        let err = reg
529            .create("idx".to_string(), schema(IndexAlgorithm::Flat))
530            .unwrap_err();
531        assert!(matches!(err, RegistryError::UnsupportedAlgorithm(_)));
532    }
533}