1pub mod ann;
103mod backend;
104mod query;
105
106use crate::ann::AnnConfig;
107use dashmap::DashMap;
108
109mod metadata_serde {
110 use serde::de::Error as DeError;
111 use serde::ser::Error as SerError;
112 use serde::{Deserialize, Deserializer, Serializer};
113 use serde_json::Value;
114
115 pub(super) fn serialize<S>(value: &Value, serializer: S) -> Result<S::Ok, S::Error>
116 where
117 S: Serializer,
118 {
119 let bytes = serde_json::to_vec(value).map_err(SerError::custom)?;
120 serializer.serialize_bytes(&bytes)
121 }
122
123 pub(super) fn deserialize<'de, D>(deserializer: D) -> Result<Value, D::Error>
124 where
125 D: Deserializer<'de>,
126 {
127 let bytes = Vec::<u8>::deserialize(deserializer)?;
128 serde_json::from_slice(&bytes).map_err(DeError::custom)
129 }
130}
131
132#[cfg(feature = "backend-redb")]
133pub use backend::RedbBackend;
134pub use backend::{BackendConfig, InMemoryBackend, IndexBackend};
135pub use query::{QueryMode, QueryResult};
136
137use bincode::config::standard;
138use bincode::error::{DecodeError, EncodeError};
139use bincode::serde::{decode_from_slice, encode_to_vec};
140use ndarray::Array1;
141use serde::{Deserialize, Serialize};
142use thiserror::Error;
143use zstd::{decode_all, encode_all};
144
145pub const INDEX_SCHEMA_VERSION: u16 = 1;
147
148pub type QuantizedVec = Vec<i8>;
150
151#[derive(Serialize, Deserialize, Clone, Debug)]
157pub struct IndexRecord {
158 #[serde(default = "default_schema_version")]
160 pub schema_version: u16,
161 pub canonical_hash: String,
163 pub perceptual: Option<Vec<u64>>,
165 pub embedding: Option<QuantizedVec>,
167 #[serde(with = "metadata_serde")]
169 pub metadata: serde_json::Value,
170}
171
172const fn default_schema_version() -> u16 {
173 INDEX_SCHEMA_VERSION
174}
175
176#[derive(Clone, Debug, Default)]
178pub enum CompressionCodec {
179 None,
181 #[default]
183 Zstd,
184}
185
186#[derive(Clone, Debug)]
188pub struct CompressionConfig {
189 pub codec: CompressionCodec,
191 pub level: i32,
193}
194
195impl Default for CompressionConfig {
196 fn default() -> Self {
197 Self {
198 codec: CompressionCodec::default(),
199 level: 3,
200 }
201 }
202}
203
204impl CompressionConfig {
205 pub fn new(codec: CompressionCodec, level: i32) -> Self {
206 Self { codec, level }
207 }
208
209 pub fn with_codec(mut self, codec: CompressionCodec) -> Self {
210 self.codec = codec;
211 self
212 }
213
214 pub fn with_level(mut self, level: i32) -> Self {
215 self.level = level;
216 self
217 }
218
219 fn compress(&self, data: &[u8]) -> Result<Vec<u8>, IndexError> {
220 match self.codec {
221 CompressionCodec::None => Ok(data.to_vec()),
222 CompressionCodec::Zstd => Ok(encode_all(data, self.level)?),
223 }
224 }
225
226 fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, IndexError> {
227 match self.codec {
228 CompressionCodec::None => Ok(data.to_vec()),
229 CompressionCodec::Zstd => Ok(decode_all(data)?),
230 }
231 }
232}
233
234#[derive(Clone, Debug)]
238pub enum QuantizationConfig {
239 Int8 {
243 scale: f32,
245 },
246}
247
248impl Default for QuantizationConfig {
249 fn default() -> Self {
250 QuantizationConfig::Int8 { scale: 100.0 }
251 }
252}
253
254impl QuantizationConfig {
255 pub fn scale(&self) -> f32 {
256 match self {
257 QuantizationConfig::Int8 { scale } => *scale,
258 }
259 }
260
261 pub fn with_scale(mut self, scale: f32) -> Self {
262 match &mut self {
263 QuantizationConfig::Int8 { scale: existing } => *existing = scale,
264 }
265 self
266 }
267}
268
269#[derive(Clone, Debug, Default)]
271pub struct IndexConfig {
272 pub backend: BackendConfig,
274 pub compression: CompressionConfig,
276 pub quantization: QuantizationConfig,
278 pub ann: AnnConfig,
281}
282
283impl IndexConfig {
284 pub fn new() -> Self {
285 Self::default()
286 }
287
288 pub fn with_backend(mut self, backend: BackendConfig) -> Self {
289 self.backend = backend;
290 self
291 }
292
293 pub fn with_compression(mut self, compression: CompressionConfig) -> Self {
294 self.compression = compression;
295 self
296 }
297
298 pub fn with_quantization(mut self, quantization: QuantizationConfig) -> Self {
299 self.quantization = quantization;
300 self
301 }
302
303 pub fn with_ann(mut self, ann: AnnConfig) -> Self {
304 self.ann = ann;
305 self
306 }
307}
308
309#[derive(Error, Debug, Clone)]
311pub enum IndexError {
312 #[error("Backend error: {0}")]
313 Backend(String),
314 #[error("Serialization encode error: {0}")]
315 Encode(String),
316 #[error("Serialization decode error: {0}")]
317 Decode(String),
318 #[error("Compression error: {0}")]
319 Zstd(String),
320}
321
322impl From<EncodeError> for IndexError {
323 fn from(e: EncodeError) -> Self {
324 IndexError::Encode(e.to_string())
325 }
326}
327
328impl From<DecodeError> for IndexError {
329 fn from(e: DecodeError) -> Self {
330 IndexError::Decode(e.to_string())
331 }
332}
333
334impl From<std::io::Error> for IndexError {
335 fn from(e: std::io::Error) -> Self {
336 IndexError::Zstd(e.to_string())
337 }
338}
339
340impl IndexError {
341 pub fn backend<E: std::fmt::Display>(err: E) -> Self {
342 Self::Backend(err.to_string())
343 }
344}
345
346pub struct UfpIndex {
348 backend: Box<dyn IndexBackend>,
350 cfg: IndexConfig,
352 perceptual_index: DashMap<u64, Vec<String>>,
354 semantic_index: DashMap<String, QuantizedVec>,
356 ann_index: std::sync::Mutex<Option<ann::AnnIndex>>,
358 ann_needs_rebuild: std::sync::atomic::AtomicBool,
360}
361
362impl UfpIndex {
363 pub fn new(cfg: IndexConfig) -> Result<Self, IndexError> {
366 let backend = cfg.backend.build()?;
367 Ok(Self::with_backend(cfg, backend))
368 }
369
370 pub fn with_backend(cfg: IndexConfig, backend: Box<dyn IndexBackend>) -> Self {
373 let ann_index = if cfg.ann.enabled {
374 Some(crate::ann::AnnIndex::new(0, cfg.ann))
376 } else {
377 None
378 };
379
380 Self {
381 backend,
382 cfg,
383 perceptual_index: DashMap::new(),
384 semantic_index: DashMap::new(),
385 ann_index: std::sync::Mutex::new(ann_index),
386 ann_needs_rebuild: std::sync::atomic::AtomicBool::new(false),
387 }
388 }
389
390 pub fn semantic_vector_count(&self) -> usize {
392 self.semantic_index.len()
393 }
394
395 pub fn should_use_ann(&self) -> bool {
397 self.cfg.ann.enabled && self.semantic_vector_count() >= self.cfg.ann.min_vectors_for_ann
398 }
399
400 pub fn rebuild_ann_if_needed(&self) {
403 if self
404 .ann_needs_rebuild
405 .load(std::sync::atomic::Ordering::Relaxed)
406 && self.should_use_ann()
407 {
408 if let Ok(mut ann_lock) = self.ann_index.try_lock() {
409 if let Some(ref mut ann) = *ann_lock {
410 let mut vectors_to_insert: Vec<(String, Vec<f32>)> = Vec::new();
412 let mut dimension = 0;
413
414 for entry in self.semantic_index.iter() {
415 let hash = entry.key().clone();
416 let quantized = entry.value();
417
418 let float_vec: Vec<f32> =
420 quantized.iter().map(|&v| v as f32 / 100.0).collect();
421 dimension = float_vec.len();
422 vectors_to_insert.push((hash, float_vec));
423 }
424
425 if dimension > 0 && !vectors_to_insert.is_empty() {
427 *ann = ann::AnnIndex::new(dimension, self.cfg.ann);
428 for (hash, vec) in vectors_to_insert {
429 let _ = ann.insert(hash, vec);
430 }
431 ann.build();
432 }
433
434 self.ann_needs_rebuild
435 .store(false, std::sync::atomic::Ordering::Relaxed);
436 }
437 }
438 }
439 }
440
441 pub fn quantize(vec: &Array1<f32>, scale: f32) -> QuantizedVec {
444 let mut out = Vec::with_capacity(vec.len());
445 out.extend(vec.iter().map(|&v| (v * scale).clamp(-128.0, 127.0) as i8));
446 out
447 }
448
449 pub fn quantize_with_strategy(vec: &Array1<f32>, cfg: &QuantizationConfig) -> QuantizedVec {
452 Self::quantize(vec, cfg.scale())
453 }
454
455 pub fn upsert(&self, rec: &IndexRecord) -> Result<(), IndexError> {
458 let payload = self.encode_record(rec)?;
459
460 if let Some(ref perceptual) = rec.perceptual {
462 for &hash_val in perceptual {
463 self.perceptual_index
464 .entry(hash_val)
465 .and_modify(|v| v.push(rec.canonical_hash.clone()))
466 .or_insert_with(|| vec![rec.canonical_hash.clone()]);
467 }
468 }
469
470 if let Some(ref embedding) = rec.embedding {
471 self.semantic_index
472 .insert(rec.canonical_hash.clone(), embedding.clone());
473
474 if self.cfg.ann.enabled {
476 if let Ok(mut ann_lock) = self.ann_index.try_lock() {
477 if let Some(ref mut ann) = *ann_lock {
478 let float_vec: Vec<f32> =
480 embedding.iter().map(|&v| v as f32 / 100.0).collect();
481 let _ = ann.insert(rec.canonical_hash.clone(), float_vec);
482 }
483 }
484 }
485 }
486
487 self.backend.put(&rec.canonical_hash, &payload)
488 }
489
490 pub fn delete(&self, hash: &str) -> Result<(), IndexError> {
492 self.backend.delete(hash)
493 }
494
495 pub fn flush(&self) -> Result<(), IndexError> {
498 self.backend.flush()
499 }
500
501 pub fn get(&self, hash: &str) -> Result<Option<IndexRecord>, IndexError> {
504 if let Some(data) = self.backend.get(hash)? {
505 let record = self.decode_record(&data)?;
506 Ok(Some(record))
507 } else {
508 Ok(None)
509 }
510 }
511
512 pub fn scan(
515 &self,
516 visitor: &mut dyn FnMut(&IndexRecord) -> Result<(), IndexError>,
517 ) -> Result<(), IndexError> {
518 self.backend.scan(&mut |data: &[u8]| {
519 let record = self.decode_record(data)?;
520 visitor(&record)
521 })
522 }
523
524 pub fn batch_insert(&self, records: &[IndexRecord]) -> Result<(), IndexError> {
527 let mut entries = Vec::with_capacity(records.len());
528 let mut perceptual_updates: Vec<(u64, &str)> = Vec::new();
529 let mut semantic_updates: Vec<(&str, &QuantizedVec)> = Vec::new();
530
531 for rec in records {
532 entries.push((rec.canonical_hash.clone(), self.encode_record(rec)?));
533 if let Some(ref perceptual) = rec.perceptual {
534 for &hash_val in perceptual {
535 perceptual_updates.push((hash_val, rec.canonical_hash.as_str()));
536 }
537 }
538 if let Some(ref embedding) = rec.embedding {
539 semantic_updates.push((rec.canonical_hash.as_str(), embedding));
540 }
541 }
542
543 for (hash_val, canonical_hash) in perceptual_updates {
545 self.perceptual_index
546 .entry(hash_val)
547 .and_modify(|v| v.push(canonical_hash.to_string()))
548 .or_insert_with(|| vec![canonical_hash.to_string()]);
549 }
550
551 let mut ann_needs_rebuild = false;
553 for (canonical_hash, embedding) in semantic_updates {
554 self.semantic_index
555 .insert(canonical_hash.to_string(), embedding.clone());
556
557 if self.cfg.ann.enabled {
559 if let Ok(mut ann_lock) = self.ann_index.try_lock() {
560 if let Some(ref mut ann) = *ann_lock {
561 let float_vec: Vec<f32> =
563 embedding.iter().map(|&v| v as f32 / 100.0).collect();
564 let _ = ann.insert(canonical_hash.to_string(), float_vec);
565 ann_needs_rebuild = true;
566 }
567 }
568 }
569 }
570
571 if ann_needs_rebuild {
573 self.ann_needs_rebuild
574 .store(true, std::sync::atomic::Ordering::Relaxed);
575 }
576
577 self.backend.batch_put(entries)
578 }
579
580 pub(crate) fn decode_record(&self, data: &[u8]) -> Result<IndexRecord, IndexError> {
582 let decompressed = self.cfg.compression.decompress(data)?;
583 let (record, _) = decode_from_slice(&decompressed, standard())?;
584 Ok(record)
585 }
586
587 fn encode_record(&self, rec: &IndexRecord) -> Result<Vec<u8>, IndexError> {
589 let encoded = encode_to_vec(rec, standard())?;
590 self.cfg.compression.compress(&encoded)
591 }
592}
593
594#[cfg(test)]
595mod tests {
596 use super::*;
597 use serde_json::json;
598
599 fn test_config() -> IndexConfig {
600 IndexConfig::new().with_backend(BackendConfig::InMemory)
601 }
602
603 fn sample_record(hash: &str, embedding: Vec<i8>, perceptual: Vec<u64>) -> IndexRecord {
604 IndexRecord {
605 schema_version: INDEX_SCHEMA_VERSION,
606 canonical_hash: hash.to_string(),
607 perceptual: Some(perceptual),
608 embedding: Some(embedding),
609 metadata: json!({ "source": hash }),
610 }
611 }
612
613 #[test]
614 fn in_memory_backend_roundtrip() {
615 let backend = Box::new(InMemoryBackend::new());
616 let index = UfpIndex::with_backend(test_config(), backend);
617
618 let rec = sample_record("doc-a", vec![1, 2, 3], vec![10, 20, 30]);
619 index.upsert(&rec).expect("upsert succeeds");
620
621 let fetched = index.get("doc-a").expect("get ok").expect("record exists");
622 assert_eq!(fetched.canonical_hash, "doc-a");
623 assert_eq!(fetched.metadata, rec.metadata);
624 }
625
626 #[test]
627 fn search_uses_backend_scan() {
628 let backend = Box::new(InMemoryBackend::new());
629 let index = UfpIndex::with_backend(test_config(), backend);
630
631 let records = vec![
632 sample_record("doc-a", vec![10, 0], vec![1, 2, 3]),
633 sample_record("doc-b", vec![9, 0], vec![3, 4, 5]),
634 ];
635 for rec in &records {
636 index.upsert(rec).unwrap();
637 }
638
639 let query = IndexRecord {
640 schema_version: INDEX_SCHEMA_VERSION,
641 canonical_hash: "query".into(),
642 perceptual: Some(vec![3, 5]),
643 embedding: Some(vec![10, 0]),
644 metadata: json!({}),
645 };
646
647 let semantic = index
648 .search(&query, QueryMode::Semantic, 2)
649 .expect("semantic search");
650 assert_eq!(semantic.len(), 2);
651 assert_eq!(semantic[0].canonical_hash, "doc-a");
652
653 let perceptual = index
654 .search(&query, QueryMode::Perceptual, 2)
655 .expect("perceptual search");
656 assert_eq!(perceptual[0].canonical_hash, "doc-b");
657 }
658}