dynomite/vector/registry.rs
1//! Vector index registry.
2//!
3//! [`VectorRegistry`] is the per-server map of index name to
4//! [`VectorTable`]. It is the single source of truth that the
5//! FT.* command handlers (Phase C) will consult to dispatch
6//! `FT.CREATE` / `FT.SEARCH` / `FT.INFO` / `FT.DROPINDEX`.
7//!
8//! Concurrency model:
9//!
10//! * The registry is held behind a [`parking_lot::RwLock`] over
11//! a [`BTreeMap`] keyed by index name. Reads (lookups, FT.LIST)
12//! take the read lock; mutations (create / drop) take the
13//! write lock.
14//! * Each [`VectorTable`] is wrapped in [`Arc`] so a reader can
15//! drop the lock immediately after a lookup and continue to
16//! work against a stable handle.
17//! * The underlying [`dynvec::Engine`] inside each table is
18//! itself an [`Arc`]-wrapped storage handle; read paths
19//! (FT.SEARCH) do not block write paths (HSET / FT.ADD).
20
21use std::collections::{BTreeMap, BTreeSet};
22use std::sync::Arc;
23
24use parking_lot::{Mutex, RwLock};
25use thiserror::Error;
26
27use crate::vector::schema::{IndexAlgorithm, MetadataFieldType, VectorSchema};
28use dyntext::TextIndex;
29use dynvec::Engine;
30
31/// Errors returned by the registry.
32#[derive(Debug, Error)]
33#[non_exhaustive]
34pub enum RegistryError {
35 /// An index with that name already exists.
36 #[error("index already exists: {0}")]
37 AlreadyExists(String),
38 /// No index registered under that name.
39 #[error("index not found: {0}")]
40 NotFound(String),
41 /// The schema asks for an algorithm the engine does not
42 /// implement yet (today: [`IndexAlgorithm::Flat`]).
43 #[error("unsupported index algorithm: {0:?}")]
44 UnsupportedAlgorithm(IndexAlgorithm),
45 /// Engine-level failure during [`Engine::in_memory`].
46 #[error("engine: {0}")]
47 Engine(#[from] dynvec::storage::StoreError),
48}
49
50/// Per-`TEXT` schema field state.
51///
52/// Couples one [`dyntext::TextIndex`] (trigram + bloom
53/// inverted index) with the bookkeeping the FT.* surface
54/// needs to map between user-visible document keys and the
55/// internal monotonic doc ids the text index hands back. The
56/// pairing is one [`TextFieldIndex`] per `TEXT` schema field
57/// per registered index; the registry initialises one for
58/// every metadata field of type [`MetadataFieldType::Text`]
59/// at FT.CREATE time.
60#[derive(Debug, Default)]
61pub struct TextFieldIndex {
62 /// Trigram + bloom inverted index over the field's bytes.
63 pub index: TextIndex,
64 /// Internal text-doc-id -> user-visible document key.
65 pub doc_to_key: BTreeMap<u32, Vec<u8>>,
66 /// User-visible document key -> internal text-doc-id.
67 /// Used to evict the prior entry when the same key is
68 /// re-HSET-ed under an updated field value.
69 pub key_to_doc: BTreeMap<Vec<u8>, u32>,
70}
71
72/// Pair of (document key, raw text bytes) returned by the
73/// per-text-field search helpers on [`VectorTable`]. Each
74/// hit echoes the user-visible document key plus the
75/// original bytes the FT.* surface stored under the queried
76/// `TEXT` field, so callers can render the response without
77/// a second round trip to the dynvec engine.
78pub type TextHit = (Vec<u8>, Vec<u8>);
79
80/// Result of a regex query through the trigram-backed text
81/// index. The outer [`Option`] is `None` when no `TEXT`
82/// field by that name is declared; the inner [`Result`]
83/// surfaces a regex compilation error.
84pub type TextRegexResult = Option<Result<Vec<TextHit>, dyntext::regex_ast::RegexError>>;
85
86/// Result of an approximate-regex query through the TRE
87/// engine. The outer [`Option`] is `None` when no `TEXT`
88/// field by that name is declared; the inner [`Result`]
89/// surfaces a TRE-engine compilation or matching error.
90pub type TextRegexApproxResult = Option<Result<Vec<TextHit>, dyntext::TreError>>;
91
92/// One registered vector index.
93///
94/// A [`VectorTable`] couples the protocol-level [`VectorSchema`]
95/// (what the client asked for) with the storage-level
96/// [`Engine`] (what is actually persisted). The pair is
97/// immutable for the lifetime of the index; rebuilding a
98/// schema means dropping and recreating the table.
99///
100/// Alongside the schema and engine, the table tracks the set
101/// of document keys that the FT.* surface has indexed via
102/// HSET interception. The set is used by
103/// [`VectorRegistry::drop_with_dd`] to enumerate the
104/// underlying hash documents that should also be removed.
105#[derive(Debug)]
106pub struct VectorTable {
107 /// Index name (the FT.CREATE first argument).
108 pub name: String,
109 /// Compiled schema.
110 pub schema: VectorSchema,
111 /// Storage + index engine.
112 pub engine: Engine,
113 /// Document keys observed by the HSET interception path.
114 indexed_keys: Mutex<BTreeSet<Vec<u8>>>,
115 /// Per-`TEXT`-field trigram index map. The map is keyed
116 /// by schema field name and is initialised with one
117 /// entry per `TEXT` field declared in the schema. The
118 /// keys are stable for the lifetime of the table; only
119 /// the per-entry [`TextFieldIndex`] state mutates as
120 /// HSETs land.
121 text_indexes: Mutex<BTreeMap<String, TextFieldIndex>>,
122}
123
124impl VectorTable {
125 /// Record `key` as having been indexed. Idempotent.
126 pub fn record_indexed_key(&self, key: Vec<u8>) {
127 self.indexed_keys.lock().insert(key);
128 }
129
130 /// Snapshot the set of indexed keys.
131 #[must_use]
132 pub fn indexed_keys(&self) -> Vec<Vec<u8>> {
133 self.indexed_keys.lock().iter().cloned().collect()
134 }
135
136 /// True when the schema declares a `TEXT` field named
137 /// `field`. The check is case-sensitive (the FT.CREATE
138 /// parser preserves the field name verbatim). After an
139 /// `FT.ALTER ADD <field> TEXT` the schema vector remains
140 /// frozen (it lives on an immutable `Arc<VectorTable>`),
141 /// so this method also consults the runtime
142 /// [`TextFieldIndex`] map: a field that the registry has
143 /// provisioned a trigram index for is treated as a TEXT
144 /// field for the lifetime of the table.
145 #[must_use]
146 pub fn has_text_field(&self, field: &str) -> bool {
147 let in_schema = self
148 .schema
149 .metadata_fields
150 .iter()
151 .any(|f| f.field_type == MetadataFieldType::Text && f.name == field);
152 if in_schema {
153 return true;
154 }
155 self.text_indexes.lock().contains_key(field)
156 }
157
158 /// Provision a runtime [`TextFieldIndex`] for `field`.
159 ///
160 /// Used by `FT.ALTER ADD <field> TEXT` to extend an
161 /// already-registered table with a new text-indexed
162 /// field. Idempotent: a second call for the same field
163 /// is a no-op and returns `false`.
164 ///
165 /// Returns `true` when a new index slot was provisioned,
166 /// `false` when the field was already known (either as
167 /// part of the original schema or because a prior
168 /// `FT.ALTER` provisioned it).
169 pub fn add_text_field(&self, field: &str) -> bool {
170 let mut guard = self.text_indexes.lock();
171 if guard.contains_key(field) {
172 return false;
173 }
174 guard.insert(field.to_string(), TextFieldIndex::default());
175 true
176 }
177
178 /// Snapshot the set of TEXT fields known to this table:
179 /// the original `SCHEMA` declarations plus anything
180 /// provisioned later through [`Self::add_text_field`].
181 /// Names are returned in lexicographic order.
182 #[must_use]
183 pub fn text_field_names(&self) -> Vec<String> {
184 let mut names: BTreeSet<String> = BTreeSet::new();
185 for f in &self.schema.metadata_fields {
186 if f.field_type == MetadataFieldType::Text {
187 names.insert(f.name.clone());
188 }
189 }
190 for k in self.text_indexes.lock().keys() {
191 names.insert(k.clone());
192 }
193 names.into_iter().collect()
194 }
195
196 /// True when the registry has provisioned a [`TextIndex`]
197 /// for `field`. The check returns `true` exactly when
198 /// [`Self::has_text_field`] returns `true`; exposed
199 /// separately so wire-level tests can assert that the
200 /// FT.CREATE path actually populated the registry rather
201 /// than just recorded the schema.
202 #[must_use]
203 pub fn has_text_index(&self, field: &str) -> bool {
204 self.text_indexes.lock().contains_key(field)
205 }
206
207 /// Number of documents currently indexed under `field`.
208 /// Returns `None` when no `TEXT` field by that name is
209 /// declared in the schema.
210 #[must_use]
211 pub fn text_index_doc_count(&self, field: &str) -> Option<usize> {
212 self.text_indexes
213 .lock()
214 .get(field)
215 .map(|state| state.index.doc_count())
216 }
217
218 /// Insert `text` into the [`TextIndex`] for `field`,
219 /// associating it with the user-visible `key`. If the
220 /// same `key` had a prior entry under this field it is
221 /// removed first so the postings index never accumulates
222 /// stale doc ids.
223 ///
224 /// No-op when the schema has no `TEXT` field by that
225 /// name; callers can therefore call this for every
226 /// HSET field/value pair without prior schema lookup.
227 pub fn upsert_text_field(&self, field: &str, key: &[u8], text: &[u8]) {
228 let mut guard = self.text_indexes.lock();
229 let Some(state) = guard.get_mut(field) else {
230 return;
231 };
232 if let Some(prev_id) = state.key_to_doc.remove(key) {
233 state.doc_to_key.remove(&prev_id);
234 state.index.remove(prev_id);
235 }
236 let doc_id = state.index.insert(text.to_vec());
237 state.doc_to_key.insert(doc_id, key.to_vec());
238 state.key_to_doc.insert(key.to_vec(), doc_id);
239 }
240
241 /// Run an exact-substring lookup against the [`TextIndex`]
242 /// registered under `field`. Returns the user-visible
243 /// keys whose stored text contains `query` as a contiguous
244 /// byte substring, paired with the original text bytes.
245 ///
246 /// Returns `None` when no `TEXT` field by that name is
247 /// declared in the schema. Callers translate that into a
248 /// `-ERR` reply.
249 #[must_use]
250 pub fn search_text_substring(&self, field: &str, query: &[u8]) -> Option<Vec<TextHit>> {
251 let guard = self.text_indexes.lock();
252 let state = guard.get(field)?;
253 let mut hits: Vec<TextHit> = Vec::new();
254 for doc_id in state.index.search_substring(query) {
255 let Some(key) = state.doc_to_key.get(&doc_id) else {
256 continue;
257 };
258 let Some(doc) = state.index.docs().get(&doc_id) else {
259 continue;
260 };
261 hits.push((key.clone(), doc.text.clone()));
262 }
263 Some(hits)
264 }
265
266 /// Run an exact-regex lookup against the [`TextIndex`]
267 /// registered under `field`. Returns the user-visible
268 /// keys whose stored text matches `pattern`, paired with
269 /// the original text bytes.
270 ///
271 /// Returns `None` when no `TEXT` field by that name is
272 /// declared in the schema, or `Some(Err(...))` when the
273 /// pattern fails to compile.
274 pub fn search_text_regex(&self, field: &str, pattern: &str) -> TextRegexResult {
275 let guard = self.text_indexes.lock();
276 let state = guard.get(field)?;
277 let result = state.index.search_regex(pattern).map(|ids| {
278 let mut out: Vec<TextHit> = Vec::new();
279 for doc_id in ids {
280 let Some(key) = state.doc_to_key.get(&doc_id) else {
281 continue;
282 };
283 let Some(doc) = state.index.docs().get(&doc_id) else {
284 continue;
285 };
286 out.push((key.clone(), doc.text.clone()));
287 }
288 out
289 });
290 Some(result)
291 }
292
293 /// Run an approximate-regex lookup against the
294 /// [`TextIndex`] registered under `field` with up to
295 /// `max_errors` edit operations. Returns the user-visible
296 /// keys whose stored text approximately matches `pattern`,
297 /// paired with the original text bytes.
298 ///
299 /// Returns `None` when no `TEXT` field by that name is
300 /// declared in the schema, or `Some(Err(...))` when the
301 /// pattern fails to compile through the TRE engine.
302 pub fn search_text_regex_approx(
303 &self,
304 field: &str,
305 pattern: &str,
306 max_errors: u16,
307 ) -> TextRegexApproxResult {
308 let guard = self.text_indexes.lock();
309 let state = guard.get(field)?;
310 let result = state
311 .index
312 .search_regex_approx(pattern, max_errors)
313 .map(|ids| {
314 let mut out: Vec<TextHit> = Vec::new();
315 for doc_id in ids {
316 let Some(key) = state.doc_to_key.get(&doc_id) else {
317 continue;
318 };
319 let Some(doc) = state.index.docs().get(&doc_id) else {
320 continue;
321 };
322 out.push((key.clone(), doc.text.clone()));
323 }
324 out
325 });
326 Some(result)
327 }
328}
329
330/// Snapshot view of one registered index.
331///
332/// Returned by [`VectorRegistry::info`] for the FT.INFO command
333/// path. Kept distinct from [`VectorTable`] so the FT.INFO
334/// handler can serialise a stable, copy-safe summary without
335/// locking the registry across the response.
336#[derive(Clone, Debug, PartialEq)]
337pub struct VectorTableInfo {
338 /// Index name.
339 pub name: String,
340 /// Frozen vector dimension.
341 pub dim: u16,
342 /// Distance metric.
343 pub distance: crate::vector::schema::DistanceMetric,
344 /// Index algorithm.
345 pub algorithm: IndexAlgorithm,
346 /// Live (non-tombstoned) row count.
347 pub live_rows: usize,
348 /// Number of tracked rows (live + soft-deleted).
349 pub tracked_rows: usize,
350}
351
352/// Per-server vector index registry.
353///
354/// The registry owns the [`VectorTable`] map; FT.* command
355/// handlers consult it on every command. Construct one with
356/// [`VectorRegistry::new`] (typically as a field on the
357/// dynomite [`crate::core::context::Context`]) and clone the
358/// returned handle freely; clones share state.
359#[derive(Clone, Default)]
360pub struct VectorRegistry {
361 inner: Arc<RwLock<BTreeMap<String, Arc<VectorTable>>>>,
362}
363
364impl std::fmt::Debug for VectorRegistry {
365 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
366 let names: Vec<String> = self.inner.read().keys().cloned().collect();
367 f.debug_struct("VectorRegistry")
368 .field("indexes", &names)
369 .finish()
370 }
371}
372
373impl VectorRegistry {
374 /// Build an empty registry.
375 #[must_use]
376 pub fn new() -> Self {
377 Self::default()
378 }
379
380 /// Register a new index.
381 ///
382 /// The schema's [`IndexAlgorithm`] is validated against the
383 /// engine's capabilities; today only
384 /// [`IndexAlgorithm::Hnsw`] is supported. The engine is
385 /// instantiated as an in-memory [`dynvec::Engine`]; on-disk
386 /// backends will plug in once the Noxu storage path lands.
387 ///
388 /// # Errors
389 ///
390 /// * [`RegistryError::AlreadyExists`] when `name` is in use.
391 /// * [`RegistryError::UnsupportedAlgorithm`] when the
392 /// schema selects an algorithm we do not implement.
393 /// * [`RegistryError::Engine`] when the underlying engine
394 /// refuses the schema.
395 pub fn create(&self, name: String, schema: VectorSchema) -> Result<(), RegistryError> {
396 if !matches!(schema.algorithm, IndexAlgorithm::Hnsw) {
397 return Err(RegistryError::UnsupportedAlgorithm(schema.algorithm));
398 }
399 let mut guard = self.inner.write();
400 if guard.contains_key(&name) {
401 return Err(RegistryError::AlreadyExists(name));
402 }
403 let engine_schema = schema.to_engine_schema(&name);
404 let engine = Engine::in_memory(engine_schema)?;
405 let mut text_indexes: BTreeMap<String, TextFieldIndex> = BTreeMap::new();
406 for f in &schema.metadata_fields {
407 if f.field_type == MetadataFieldType::Text {
408 text_indexes.insert(f.name.clone(), TextFieldIndex::default());
409 }
410 }
411 let table = VectorTable {
412 name: name.clone(),
413 schema,
414 engine,
415 indexed_keys: Mutex::new(BTreeSet::new()),
416 text_indexes: Mutex::new(text_indexes),
417 };
418 guard.insert(name, Arc::new(table));
419 Ok(())
420 }
421
422 /// Drop the index `name`.
423 ///
424 /// Returns the prior table (so callers can decide whether
425 /// to also delete underlying documents, mimicking the
426 /// `FT.DROPINDEX ... DD` flag).
427 ///
428 /// # Errors
429 ///
430 /// [`RegistryError::NotFound`] when no index is registered
431 /// under `name`.
432 pub fn drop(&self, name: &str) -> Result<Arc<VectorTable>, RegistryError> {
433 let mut guard = self.inner.write();
434 guard
435 .remove(name)
436 .ok_or_else(|| RegistryError::NotFound(name.to_string()))
437 }
438
439 /// Drop the index `name` and return the set of document
440 /// keys that the FT.* surface had observed under it.
441 ///
442 /// Used by `FT.DROPINDEX ... DD` to enumerate the hash
443 /// documents the caller should also delete from the
444 /// underlying datastore.
445 ///
446 /// # Errors
447 ///
448 /// [`RegistryError::NotFound`] when no index is registered
449 /// under `name`.
450 pub fn drop_with_dd(&self, name: &str) -> Result<Vec<Vec<u8>>, RegistryError> {
451 let table = self.drop(name)?;
452 Ok(table.indexed_keys())
453 }
454
455 /// Look up a registered table by name.
456 ///
457 /// Returns a cloned [`Arc`] so the caller can drop the
458 /// registry lock immediately after the lookup.
459 #[must_use]
460 pub fn get(&self, name: &str) -> Option<Arc<VectorTable>> {
461 self.inner.read().get(name).cloned()
462 }
463
464 /// List every registered index by name, sorted
465 /// alphabetically.
466 #[must_use]
467 pub fn list(&self) -> Vec<String> {
468 self.inner.read().keys().cloned().collect()
469 }
470
471 /// Snapshot the FT.INFO view of `name`.
472 #[must_use]
473 pub fn info(&self, name: &str) -> Option<VectorTableInfo> {
474 let table = self.get(name)?;
475 let stats = table.engine.stats().ok()?;
476 Some(VectorTableInfo {
477 name: table.name.clone(),
478 dim: table.schema.dim,
479 distance: table.schema.distance,
480 algorithm: table.schema.algorithm,
481 live_rows: stats.live_rows,
482 tracked_rows: stats.tracked_rows,
483 })
484 }
485}
486
487#[cfg(test)]
488mod tests {
489 use super::*;
490 use crate::vector::schema::{DistanceMetric, IndexAlgorithm, VectorType};
491
492 fn schema(algorithm: IndexAlgorithm) -> VectorSchema {
493 VectorSchema {
494 vector_field: "vec".to_string(),
495 vector_type: VectorType::Float32,
496 dim: 4,
497 distance: DistanceMetric::Cosine,
498 algorithm,
499 prefixes: Vec::new(),
500 metadata_fields: Vec::new(),
501 }
502 }
503
504 #[test]
505 fn create_and_get_returns_table() {
506 let reg = VectorRegistry::new();
507 reg.create("idx".to_string(), schema(IndexAlgorithm::Hnsw))
508 .unwrap();
509 let table = reg.get("idx").expect("table present");
510 assert_eq!(table.name, "idx");
511 assert_eq!(table.schema.dim, 4);
512 }
513
514 #[test]
515 fn duplicate_name_errors() {
516 let reg = VectorRegistry::new();
517 reg.create("idx".to_string(), schema(IndexAlgorithm::Hnsw))
518 .unwrap();
519 let err = reg
520 .create("idx".to_string(), schema(IndexAlgorithm::Hnsw))
521 .unwrap_err();
522 assert!(matches!(err, RegistryError::AlreadyExists(_)));
523 }
524
525 #[test]
526 fn unsupported_algorithm_errors() {
527 let reg = VectorRegistry::new();
528 let err = reg
529 .create("idx".to_string(), schema(IndexAlgorithm::Flat))
530 .unwrap_err();
531 assert!(matches!(err, RegistryError::UnsupportedAlgorithm(_)));
532 }
533}