Skip to main content

dynvec/
engine.rs

1//! Per-table vector engine handle.
2//!
3//! [`Engine`] is the public unit the Redis-Stack-style FT.*
4//! command handlers in `dynomite::vector` hand out. Each engine
5//! owns one [`VectorStore`] table; concurrent engines for
6//! different tables are independent.
7//!
8//! The shape is intentionally narrow: the FT.* command
9//! pathway only needs upsert / get / delete / search / stats /
10//! drop, plus a few accessors for introspection.
11
12use std::collections::HashMap;
13use std::sync::Arc;
14
15use serde_json::Value;
16
17use crate::storage::{RowKey, StoreError, TableSchema, TableStats, VectorRow, VectorStore};
18
19/// Per-table engine handle.
20///
21/// One engine wraps one [`VectorStore`] table. The store is
22/// held behind an [`Arc`] so multiple handles can share the
23/// same underlying state (the registry never duplicates a
24/// store; it just hands out further [`Engine`] clones).
25#[derive(Clone)]
26pub struct Engine {
27    store: Arc<VectorStore>,
28    table: String,
29}
30
31impl std::fmt::Debug for Engine {
32    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33        f.debug_struct("Engine")
34            .field("table", &self.table)
35            .finish_non_exhaustive()
36    }
37}
38
39impl Engine {
40    /// Build a fresh in-memory engine for `schema`.
41    ///
42    /// The schema's `name` becomes the engine's table name. The
43    /// returned engine is the only handle on the new store; if
44    /// the registry wants a second handle it should
45    /// [`Engine::clone`] this one.
46    ///
47    /// # Errors
48    ///
49    /// Surfaces any [`StoreError`] from
50    /// [`VectorStore::create_table`].
51    pub fn in_memory(schema: TableSchema) -> Result<Self, StoreError> {
52        let store = Arc::new(VectorStore::in_memory());
53        let table = schema.name.clone();
54        store.create_table(schema)?;
55        Ok(Self { store, table })
56    }
57
58    /// Wrap an existing [`VectorStore`] that already holds the
59    /// table. Used by embedders that want to share a store
60    /// across many engines.
61    #[must_use]
62    pub fn with_store(store: Arc<VectorStore>, table: String) -> Self {
63        Self { store, table }
64    }
65
66    /// Bound table name.
67    #[must_use]
68    pub fn table_name(&self) -> &str {
69        &self.table
70    }
71
72    /// Borrow the underlying [`VectorStore`].
73    #[must_use]
74    pub fn store(&self) -> &Arc<VectorStore> {
75        &self.store
76    }
77
78    /// Insert or overwrite a vector row.
79    ///
80    /// # Errors
81    ///
82    /// Forwards every [`StoreError`] from
83    /// [`VectorStore::upsert`].
84    pub fn upsert(
85        &self,
86        key: RowKey,
87        vector: &[f32],
88        metadata: HashMap<String, Value>,
89    ) -> Result<(), StoreError> {
90        self.store.upsert(&self.table, key, vector, metadata)
91    }
92
93    /// Fetch the row at `key`.
94    ///
95    /// # Errors
96    ///
97    /// Forwards every [`StoreError`] from [`VectorStore::get`].
98    pub fn get(&self, key: &[u8]) -> Result<Option<VectorRow>, StoreError> {
99        self.store.get(&self.table, key)
100    }
101
102    /// Delete the row at `key`.
103    ///
104    /// # Errors
105    ///
106    /// Forwards every [`StoreError`] from
107    /// [`VectorStore::delete`].
108    pub fn delete(&self, key: &[u8]) -> Result<bool, StoreError> {
109        self.store.delete(&self.table, key)
110    }
111
112    /// Run a top-`k` ANN search.
113    ///
114    /// # Errors
115    ///
116    /// Forwards every [`StoreError`] from
117    /// [`VectorStore::search`].
118    pub fn search(
119        &self,
120        query: &[f32],
121        k: usize,
122        ef: Option<usize>,
123    ) -> Result<Vec<(VectorRow, f32)>, StoreError> {
124        self.store.search(&self.table, query, k, ef)
125    }
126
127    /// Per-table snapshot statistics.
128    ///
129    /// # Errors
130    ///
131    /// Forwards every [`StoreError`] from
132    /// [`VectorStore::stats`].
133    pub fn stats(&self) -> Result<TableStats, StoreError> {
134        self.store.stats(&self.table)
135    }
136}
137
138#[cfg(test)]
139mod tests {
140    use super::*;
141    use crate::distance::Distance;
142    use crate::encoding::Codec;
143    use crate::index::HnswParams;
144    use crate::storage::IndexAlgorithm;
145
146    fn schema(name: &str, dim: u16) -> TableSchema {
147        TableSchema {
148            name: name.to_string(),
149            dim,
150            codec: Codec::Int8Quantized,
151            distance: Distance::Euclidean,
152            hnsw: HnswParams::default(),
153            algorithm: IndexAlgorithm::Hnsw,
154        }
155    }
156
157    #[test]
158    fn engine_round_trips_a_row() {
159        let engine = Engine::in_memory(schema("t", 3)).unwrap();
160        engine
161            .upsert(b"a".to_vec(), &[1.0, 2.0, 3.0], HashMap::new())
162            .unwrap();
163        let row = engine.get(b"a").unwrap().expect("row present");
164        assert_eq!(row.key, b"a");
165        assert_eq!(row.vector.dim, 3);
166        assert_eq!(engine.table_name(), "t");
167        let stats = engine.stats().unwrap();
168        assert_eq!(stats.live_rows, 1);
169    }
170
171    #[test]
172    fn engine_search_returns_nearest_first() {
173        let engine = Engine::in_memory(schema("t", 2)).unwrap();
174        for (k, v) in [
175            (&b"origin"[..], [0.0_f32, 0.0]),
176            (&b"unit_x"[..], [1.0, 0.0]),
177            (&b"unit_y"[..], [0.0, 1.0]),
178        ] {
179            engine.upsert(k.to_vec(), &v, HashMap::new()).unwrap();
180        }
181        let res = engine.search(&[0.05, 0.05], 1, None).unwrap();
182        assert_eq!(res.len(), 1);
183        assert_eq!(res[0].0.key, b"origin");
184    }
185}