Skip to main content

ringdb/
engine.rs

1use std::path::Path;
2use std::time::Instant;
3
4use serde::{Serialize, de::DeserializeOwned};
5
6use crate::backend::{CpuBackend, RingComputeBackend};
7use crate::config::RingDbConfig;
8use crate::error::{Result, RingDbError};
9use crate::payload::{PayloadStore, PayloadStoreBuilder};
10use crate::persist::{read_f32_file, read_meta, write_f32_file, write_meta};
11use crate::query::{DiskQuery, QueryResult, RangeQuery, RingQuery};
12
13/// Builder for a ring-query vector database.
14///
15/// Insert vectors (and their associated payloads) with
16/// [`add_vector()`](Self::add_vector), then call [`build()`](Self::build) to
17/// transfer ownership to the compute backend and obtain a [`SealedRingDb`]
18/// that can be queried.
19///
20/// `T` is the payload type stored alongside each vector. Use `T = ()` when
21/// no payload is needed.
22///
23/// # Example — no payload
24///
25/// ```
26/// use ringdb::{RingDb, RingDbConfig, RingQuery};
27///
28/// let config = RingDbConfig::new(4);
29/// let mut db = RingDb::new(config).unwrap();
30///
31/// db.add_vector(&[1.0, 0.0, 0.0, 0.0], ()).unwrap();
32/// db.add_vector(&[0.0, 1.0, 0.0, 0.0], ()).unwrap();
33///
34/// let db = db.build().unwrap();
35/// let result = db.query(&RingQuery { query: &[1.0f32, 0.0, 0.0, 0.0], d: 1.0, lambda: 0.1 }).unwrap();
36/// println!("hits: {:?}", result.ids);
37/// ```
38///
39/// # Example — with payload
40///
41/// ```
42/// use ringdb::{RingDb, RingDbConfig, RingQuery};
43/// use serde::{Serialize, Deserialize};
44///
45/// #[derive(Serialize, Deserialize)]
46/// struct Meta { label: String }
47///
48/// let mut db: RingDb<Meta> = RingDb::new(RingDbConfig::new(2)).unwrap();
49/// db.add_vector(&[1.0, 0.0], Meta { label: "dog".into() }).unwrap();
50/// db.add_vector(&[0.0, 1.0], Meta { label: "cat".into() }).unwrap();
51///
52/// let db = db.build().unwrap();
53/// let result = db.query(&RingQuery { query: &[1.0f32, 0.0], d: 1.0, lambda: 0.1 }).unwrap();
54/// let payloads = db.fetch_payloads(&result.ids).unwrap();
55/// ```
56pub struct RingDb<T = ()> {
57    config: RingDbConfig,
58    backend: Box<dyn RingComputeBackend>,
59    n_vectors: usize,
60
61    /// Staging buffer: f32 vectors, row-major, `n_vectors × dims`.
62    vectors: Vec<f32>,
63
64    /// Staging buffer: per-vector squared L2 norm.
65    norms_sq: Vec<f32>,
66
67    /// Streams payloads to a temp file as they arrive; never accumulates in RAM.
68    payload_builder: PayloadStoreBuilder<T>,
69}
70
71impl<T: Serialize + DeserializeOwned> RingDb<T> {
72    /// Create a new empty `RingDb` with the given configuration.
73    pub fn new(config: RingDbConfig) -> Result<Self> {
74        Ok(Self {
75            config,
76            backend: Box::new(CpuBackend::new()),
77            n_vectors: 0,
78            vectors: Vec::new(),
79            norms_sq: Vec::new(),
80            payload_builder: PayloadStoreBuilder::new()?,
81        })
82    }
83
84    /// Insert a single vector and its associated payload.
85    ///
86    /// Vectors are assigned sequential IDs starting from 0.
87    /// The slice length must equal `dims`.
88    pub fn add_vector(&mut self, vector: &[f32], payload: T) -> Result<()> {
89        let dims = self.config.dims;
90        if vector.len() != dims {
91            return Err(crate::error::RingDbError::DimensionMismatch {
92                expected: dims,
93                got: vector.len(),
94            });
95        }
96
97        let norm_sq: f32 = vector.iter().map(|x| x * x).sum();
98        self.norms_sq.push(norm_sq);
99        self.vectors.extend_from_slice(vector);
100        self.payload_builder.push(payload)?;
101        self.n_vectors += 1;
102        Ok(())
103    }
104
105    /// Transfer ownership of the accumulated data to the compute backend and
106    /// seal the database.
107    ///
108    /// Vector data is moved into the backend (zero-cost for the CPU backend).
109    /// Payloads are serialized and moved into a cold anonymous mmap — the
110    /// staging `Vec<T>` is dropped immediately after.
111    ///
112    /// If [`RingDbConfig::persist_dir`] is set the following files are written
113    /// to that directory before sealing:
114    ///
115    /// | File | Content |
116    /// |------|---------|
117    /// | `meta.bin` | `dims` + `n_vectors` as little-endian u64 |
118    /// | `vectors.bin` | raw f32 vectors (row-major) |
119    /// | `norms_sq.bin` | raw f32 squared norms |
120    /// | `payloads.bin` | concatenated bincode payload bytes |
121    /// | `offsets.bin` | byte offsets (u64) into `payloads.bin` |
122    ///
123    /// The database can be reloaded later with [`RingDb::load()`].
124    pub fn build(mut self) -> Result<SealedRingDb<T>> {
125        let dims = self.config.dims;
126        let n_vectors = self.n_vectors;
127
128        if let Some(dir) = self.config.persist_dir.clone() {
129            std::fs::create_dir_all(&dir)?;
130
131            write_meta(&dir.join("meta.bin"), dims, n_vectors)?;
132            write_f32_file(&dir.join("vectors.bin"), &self.vectors)?;
133            write_f32_file(&dir.join("norms_sq.bin"), &self.norms_sq)?;
134
135            let payload_store = self
136                .payload_builder
137                .finish_persisted(&dir.join("payloads.bin"), &dir.join("offsets.bin"))?;
138
139            self.backend
140                .upload_f32_dataset(dims, self.vectors, self.norms_sq)?;
141
142            return Ok(SealedRingDb {
143                config: self.config,
144                backend: self.backend,
145                n_vectors,
146                payload_store,
147            });
148        }
149
150        self.backend
151            .upload_f32_dataset(dims, self.vectors, self.norms_sq)?;
152        let payload_store = self.payload_builder.finish()?;
153        Ok(SealedRingDb {
154            config: self.config,
155            backend: self.backend,
156            n_vectors,
157            payload_store,
158        })
159    }
160
161    /// Reconstruct a [`SealedRingDb`] from a directory previously written by
162    /// [`RingDb::build()`] when [`RingDbConfig::persist_dir`] was set.
163    ///
164    /// Reads `meta.bin`, `vectors.bin`, `norms_sq.bin`, `payloads.bin`, and
165    /// `offsets.bin` from `dir`, re-uploads the vectors and norms to the
166    /// requested backend, and memory-maps the payload file.
167    ///
168    /// Pass [`BackendPreference::Cpu`] for the CPU backend (the only option
169    /// today; CUDA and others will be added later).
170    ///
171    /// # Errors
172    ///
173    /// Returns [`RingDbError::Corrupt`] if any file is missing, has an
174    /// unexpected size, or the dimension/count metadata is inconsistent.
175    ///
176    /// # Example
177    ///
178    /// ```no_run
179    /// use ringdb::{RingDb, RingDbConfig, BackendPreference};
180    /// use std::path::Path;
181    ///
182    /// // --- save ---
183    /// let mut db = RingDb::<()>::new(RingDbConfig::new(4).with_persist_dir("/tmp/mydb")).unwrap();
184    /// db.add_vector(&[1.0, 0.0, 0.0, 0.0], ()).unwrap();
185    /// let _sealed = db.build().unwrap(); // writes files to /tmp/mydb
186    ///
187    /// // --- load ---
188    /// let loaded = RingDb::<()>::load(Path::new("/tmp/mydb"), BackendPreference::Cpu).unwrap();
189    /// ```
190    pub fn load(
191        dir: &Path,
192        backend_preference: crate::config::BackendPreference,
193    ) -> Result<SealedRingDb<T>> {
194        let (dims, n_vectors) = read_meta(&dir.join("meta.bin"))?;
195
196        let vectors = read_f32_file(&dir.join("vectors.bin"))?;
197        let norms_sq = read_f32_file(&dir.join("norms_sq.bin"))?;
198
199        let expected_vec_len = n_vectors * dims;
200        if vectors.len() != expected_vec_len {
201            return Err(RingDbError::Corrupt(format!(
202                "vectors.bin has {} f32 values, expected {} (n_vectors={} × dims={})",
203                vectors.len(),
204                expected_vec_len,
205                n_vectors,
206                dims,
207            )));
208        }
209        if norms_sq.len() != n_vectors {
210            return Err(RingDbError::Corrupt(format!(
211                "norms_sq.bin has {} f32 values, expected {}",
212                norms_sq.len(),
213                n_vectors,
214            )));
215        }
216
217        let mut backend: Box<dyn RingComputeBackend> = match backend_preference {
218            crate::config::BackendPreference::Cpu => Box::new(CpuBackend::new()),
219        };
220        backend.upload_f32_dataset(dims, vectors, norms_sq)?;
221
222        let payload_store =
223            PayloadStore::load(&dir.join("payloads.bin"), &dir.join("offsets.bin"))?;
224
225        Ok(SealedRingDb {
226            config: RingDbConfig::new(dims)
227                .with_persist_dir(dir)
228                .with_backend_preference(backend_preference),
229            backend,
230            n_vectors,
231            payload_store,
232        })
233    }
234
235    /// Number of vectors currently staged.
236    pub fn len(&self) -> usize {
237        self.n_vectors
238    }
239
240    /// Returns `true` if no vectors have been inserted.
241    pub fn is_empty(&self) -> bool {
242        self.n_vectors == 0
243    }
244
245    /// Number of dimensions per vector.
246    pub fn dims(&self) -> usize {
247        self.config.dims
248    }
249
250    /// Name of the backend currently in use.
251    pub fn backend_name(&self) -> &str {
252        self.backend.name()
253    }
254}
255
256/// Sealed (immutable) ring-query database.
257///
258/// Obtained by calling [`RingDb::build()`] or [`RingDb::load()`]. Vectors can
259/// no longer be inserted — only queries and payload fetches are allowed.
260///
261/// The hot side (vectors + norms) is owned by the compute backend.
262/// The cold side (payloads) lives in an anonymous mmap managed by
263/// [`PayloadStore`].
264pub struct SealedRingDb<T = ()> {
265    config: RingDbConfig,
266    backend: Box<dyn RingComputeBackend>,
267    n_vectors: usize,
268    payload_store: PayloadStore<T>,
269}
270
271impl<T: Serialize + DeserializeOwned> SealedRingDb<T> {
272    /// Execute a ring query and return matching vector IDs.
273    ///
274    /// The ring `[d - lambda, d + lambda]` is converted to `[d_min, d_max]`
275    /// internally; negative lower bounds are clamped to 0.
276    pub fn query(&self, q: &RingQuery<'_>) -> Result<QueryResult> {
277        let dims = self.config.dims;
278        if q.query.len() != dims {
279            return Err(crate::error::RingDbError::DimensionMismatch {
280                expected: dims,
281                got: q.query.len(),
282            });
283        }
284
285        let d_min = (q.d - q.lambda).max(0.0);
286        let d_max = q.d + q.lambda;
287
288        let t = Instant::now();
289        let ids = self.backend.ring_query_f32(dims, q.query, d_min, d_max)?;
290        let elapsed = t.elapsed();
291
292        Ok(QueryResult {
293            ids,
294            backend_used: self.backend.name(),
295            elapsed,
296        })
297    }
298
299    /// Execute a range query and return matching vector IDs.
300    ///
301    /// Returns all vectors whose Euclidean distance to the query lies in
302    /// `[d_min, d_max]`.
303    pub fn query_range(&self, q: &RangeQuery<'_>) -> Result<QueryResult> {
304        let dims = self.config.dims;
305        if q.query.len() != dims {
306            return Err(crate::error::RingDbError::DimensionMismatch {
307                expected: dims,
308                got: q.query.len(),
309            });
310        }
311
312        let t = Instant::now();
313        let ids = self
314            .backend
315            .ring_query_f32(dims, q.query, q.d_min, q.d_max)?;
316        let elapsed = t.elapsed();
317
318        Ok(QueryResult {
319            ids,
320            backend_used: self.backend.name(),
321            elapsed,
322        })
323    }
324
325    /// Execute a disk query and return matching vector IDs.
326    ///
327    /// Returns all vectors within Euclidean distance `d_max` of the query
328    /// (i.e. the full ball of radius `d_max`, equivalent to `d_min = 0`).
329    pub fn query_disk(&self, q: &DiskQuery<'_>) -> Result<QueryResult> {
330        let dims = self.config.dims;
331        if q.query.len() != dims {
332            return Err(crate::error::RingDbError::DimensionMismatch {
333                expected: dims,
334                got: q.query.len(),
335            });
336        }
337
338        let t = Instant::now();
339        let ids = self.backend.ring_query_f32(dims, q.query, 0.0, q.d_max)?;
340        let elapsed = t.elapsed();
341
342        Ok(QueryResult {
343            ids,
344            backend_used: self.backend.name(),
345            elapsed,
346        })
347    }
348
349    /// Fetch the payload for a single vector ID.
350    ///
351    /// Reads and deserializes from the cold mmap. Call this after
352    /// [`query`](Self::query) to retrieve metadata for the matching vectors.
353    pub fn fetch_payload(&self, id: u32) -> Result<T> {
354        self.payload_store.fetch(id)
355    }
356
357    /// Fetch payloads for a slice of vector IDs, in order.
358    pub fn fetch_payloads(&self, ids: &[u32]) -> Result<Vec<T>> {
359        self.payload_store.fetch_many(ids)
360    }
361
362    /// Number of vectors stored.
363    pub fn len(&self) -> usize {
364        self.n_vectors
365    }
366
367    /// Returns `true` if the database contains no vectors.
368    pub fn is_empty(&self) -> bool {
369        self.n_vectors == 0
370    }
371
372    /// Number of dimensions per vector.
373    pub fn dims(&self) -> usize {
374        self.config.dims
375    }
376
377    /// Name of the backend currently in use.
378    pub fn backend_name(&self) -> &str {
379        self.backend.name()
380    }
381}