Skip to main content

content_index/
lib.rs

1//! # UCFP Index
2//!
3//! This crate provides a backend-agnostic index for storing and searching
4//! Universal Content Fingerprinting (UCFP) records. It is designed to handle
5//! canonical hashes, perceptual fingerprints, and semantic embeddings, offering
6//! a unified interface for persistence and retrieval.
7//!
8//! ## Core Features
9//!
10//! - **Pluggable Backends**: Supports multiple storage backends through a common
11//!   [`IndexBackend`] trait. Out of the box, it provides:
12//!   - **Redb** (default): Pure Rust ACID-compliant embedded database. No C++ dependencies.
13//!   - **In-memory**: HashMap-based backend for fast, ephemeral storage (ideal for testing).
14//! - **Flexible Configuration**: All behaviors, including the choice of backend,
15//!   compression, and quantization strategies, are configured at runtime via the
16//!   [`IndexConfig`] struct.
17//! - **Efficient Storage**:
18//!   - **Quantization**: Provides utilities to quantize `f32` embeddings into `i8` vectors
19//!     to reduce storage space and improve query performance. Use the `quantize` or
20//!     `quantize_with_strategy` methods before creating `IndexRecord` instances.
21//!   - **Compression**: Compresses serialized records (using Zstd by default) before
22//!     writing to the backend.
23//! - **Similarity Search**: Provides search capabilities for both semantic and
24//!   perceptual fingerprints:
25//!   - **Semantic Search**: Computes cosine similarity on quantized embeddings.
26//!   - **Perceptual Search**: Computes Jaccard similarity on MinHash signatures.
27//!
28//! ## Backend Selection Guide
29//!
30//! | Backend | Use Case | Dependencies | Compile Time |
31//! |---------|----------|--------------|--------------|
32//! | **Redb** (default) | Production, single-node | None (pure Rust) | Fast |
33//! | **InMemory** | Testing, development | None | Fastest |
34//!
35//! ### Why Redb?
36//!
37//! Redb is the default backend because:
38//! - **No C++ dependencies**: Compiles with just Rust toolchain (no clang/LLVM required)
39//! - **ACID transactions**: Crash-safe by default with MVCC
40//! - **Pure Rust**: Better integration with Rust ecosystem, easier cross-compilation
41//! - **Fast compilation**: No C++ compilation overhead
42//! - **Future-proof**: Easy migration path to PostgreSQL when horizontal scaling is needed
43//!
44//! ### Configuration Examples
45//!
46//! ```rust
47//! use index::{UfpIndex, IndexConfig, BackendConfig};
48//!
49//! // Redb (default, recommended)
50//! let config = IndexConfig::new()
51//!     .with_backend(BackendConfig::redb("/data/ucfp.redb"));
52//!
53//! // In-memory (testing)
54//! let config = IndexConfig::new()
55//!     .with_backend(BackendConfig::in_memory());
56//! ```
57//!
58//! ## Key Concepts
59//!
60//! The central struct is [`UfpIndex`], which provides a high-level API for
61//! interacting with the index. It handles the details of serialization,
62//! compression, and quantization, allowing callers to work with the simple
63//! [`IndexRecord`] struct.
64//!
65//! The [`IndexBackend`] trait abstracts the underlying storage mechanism, making
66//! it easy to swap out backends or implement custom ones.
67//!
68//! ## Example Usage
69//!
70//! ```
71//! use index::{UfpIndex, IndexConfig, BackendConfig, IndexRecord, QueryMode, INDEX_SCHEMA_VERSION};
72//! use serde_json::json;
73//!
74//! // Configure with Redb (default, persistent storage)
75//! let config = IndexConfig::new().with_backend(BackendConfig::redb("/tmp/ucfp.redb"));
76//! let index = UfpIndex::new(config).unwrap();
77//!
78//! // Create and insert a record
79//! let record = IndexRecord {
80//!     schema_version: INDEX_SCHEMA_VERSION,
81//!     canonical_hash: "doc-1".to_string(),
82//!     perceptual: Some(vec![1, 2, 3]),
83//!     embedding: Some(vec![10, 20, 30]),
84//!     metadata: json!({ "title": "My Document" }),
85//! };
86//! index.upsert(&record).unwrap();
87//!
88//! // Search for similar records
89//! let query_record = IndexRecord {
90//!     schema_version: INDEX_SCHEMA_VERSION,
91//!     canonical_hash: "query-1".to_string(),
92//!     perceptual: Some(vec![1, 2, 4]),
93//!     embedding: Some(vec![11, 22, 33]),
94//!     metadata: json!({}),
95//! };
96//!
97//! let results = index.search(&query_record, QueryMode::Perceptual, 10).unwrap();
98//! // assert_eq!(results.len(), 1);
99//! // assert_eq!(results[0].canonical_hash, "doc-1");
100//! ```
101
102pub mod ann;
103mod backend;
104mod query;
105
106use crate::ann::AnnConfig;
107use dashmap::DashMap;
108
109mod metadata_serde {
110    use serde::de::Error as DeError;
111    use serde::ser::Error as SerError;
112    use serde::{Deserialize, Deserializer, Serializer};
113    use serde_json::Value;
114
115    pub(super) fn serialize<S>(value: &Value, serializer: S) -> Result<S::Ok, S::Error>
116    where
117        S: Serializer,
118    {
119        let bytes = serde_json::to_vec(value).map_err(SerError::custom)?;
120        serializer.serialize_bytes(&bytes)
121    }
122
123    pub(super) fn deserialize<'de, D>(deserializer: D) -> Result<Value, D::Error>
124    where
125        D: Deserializer<'de>,
126    {
127        let bytes = Vec::<u8>::deserialize(deserializer)?;
128        serde_json::from_slice(&bytes).map_err(DeError::custom)
129    }
130}
131
132#[cfg(feature = "backend-redb")]
133pub use backend::RedbBackend;
134pub use backend::{BackendConfig, InMemoryBackend, IndexBackend};
135pub use query::{QueryMode, QueryResult};
136
137use bincode::config::standard;
138use bincode::error::{DecodeError, EncodeError};
139use bincode::serde::{decode_from_slice, encode_to_vec};
140use ndarray::Array1;
141use serde::{Deserialize, Serialize};
142use thiserror::Error;
143use zstd::{decode_all, encode_all};
144
145/// Bump this value whenever the on-disk `IndexRecord` layout changes.
146pub const INDEX_SCHEMA_VERSION: u16 = 1;
147
148/// Quantized embedding type (compact float representation)
149pub type QuantizedVec = Vec<i8>;
150
151/// Unified index record for any modality
152/// Unified index record for any modality.
153///
154/// This struct represents a document in the index with its canonical hash,
155/// perceptual fingerprints (MinHash), semantic embedding (quantized), and metadata.
156#[derive(Serialize, Deserialize, Clone, Debug)]
157pub struct IndexRecord {
158    /// Schema version for backward compatibility when deserializing.
159    #[serde(default = "default_schema_version")]
160    pub schema_version: u16,
161    /// Canonical hash (SHA-256 hex) that uniquely identifies the document.
162    pub canonical_hash: String,
163    /// Perceptual fingerprint (MinHash signature) for similarity search.
164    pub perceptual: Option<Vec<u64>>,
165    /// Quantized semantic embedding for vector similarity search.
166    pub embedding: Option<QuantizedVec>,
167    /// Arbitrary metadata associated with the document (JSON).
168    #[serde(with = "metadata_serde")]
169    pub metadata: serde_json::Value,
170}
171
172const fn default_schema_version() -> u16 {
173    INDEX_SCHEMA_VERSION
174}
175
176/// Compression codec options for index storage.
177#[derive(Clone, Debug, Default)]
178pub enum CompressionCodec {
179    /// No compression (useful for debugging or when storage is not a concern).
180    None,
181    /// Zstd compression (default, good balance of speed and ratio).
182    #[default]
183    Zstd,
184}
185
186/// Compression behavior configuration.
187#[derive(Clone, Debug)]
188pub struct CompressionConfig {
189    /// The compression codec to use (None or Zstd).
190    pub codec: CompressionCodec,
191    /// Compression level (1-22 for Zstd, where higher = better compression but slower).
192    pub level: i32,
193}
194
195impl Default for CompressionConfig {
196    fn default() -> Self {
197        Self {
198            codec: CompressionCodec::default(),
199            level: 3,
200        }
201    }
202}
203
204impl CompressionConfig {
205    pub fn new(codec: CompressionCodec, level: i32) -> Self {
206        Self { codec, level }
207    }
208
209    pub fn with_codec(mut self, codec: CompressionCodec) -> Self {
210        self.codec = codec;
211        self
212    }
213
214    pub fn with_level(mut self, level: i32) -> Self {
215        self.level = level;
216        self
217    }
218
219    fn compress(&self, data: &[u8]) -> Result<Vec<u8>, IndexError> {
220        match self.codec {
221            CompressionCodec::None => Ok(data.to_vec()),
222            CompressionCodec::Zstd => Ok(encode_all(data, self.level)?),
223        }
224    }
225
226    fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, IndexError> {
227        match self.codec {
228            CompressionCodec::None => Ok(data.to_vec()),
229            CompressionCodec::Zstd => Ok(decode_all(data)?),
230        }
231    }
232}
233
234/// Quantization strategies for embeddings.
235///
236/// Quantization reduces memory usage by converting f32 embeddings to smaller types.
237#[derive(Clone, Debug)]
238pub enum QuantizationConfig {
239    /// 8-bit integer quantization using a linear scale factor.
240    ///
241    /// Values are computed as: `quantized = (value * scale).clamp(-128.0, 127.0) as i8`
242    Int8 {
243        /// The scaling factor for quantization (default: 100.0).
244        scale: f32,
245    },
246}
247
248impl Default for QuantizationConfig {
249    fn default() -> Self {
250        QuantizationConfig::Int8 { scale: 100.0 }
251    }
252}
253
254impl QuantizationConfig {
255    pub fn scale(&self) -> f32 {
256        match self {
257            QuantizationConfig::Int8 { scale } => *scale,
258        }
259    }
260
261    pub fn with_scale(mut self, scale: f32) -> Self {
262        match &mut self {
263            QuantizationConfig::Int8 { scale: existing } => *existing = scale,
264        }
265        self
266    }
267}
268
269/// Config for initializing the index.
270#[derive(Clone, Debug, Default)]
271pub struct IndexConfig {
272    /// Backend storage configuration (in-memory or Redb).
273    pub backend: BackendConfig,
274    /// Compression settings for stored records.
275    pub compression: CompressionConfig,
276    /// Quantization settings for embeddings.
277    pub quantization: QuantizationConfig,
278    /// ANN (Approximate Nearest Neighbor) configuration for semantic search.
279    /// When enabled, uses HNSW algorithm for sublinear search on large datasets.
280    pub ann: AnnConfig,
281}
282
283impl IndexConfig {
284    pub fn new() -> Self {
285        Self::default()
286    }
287
288    pub fn with_backend(mut self, backend: BackendConfig) -> Self {
289        self.backend = backend;
290        self
291    }
292
293    pub fn with_compression(mut self, compression: CompressionConfig) -> Self {
294        self.compression = compression;
295        self
296    }
297
298    pub fn with_quantization(mut self, quantization: QuantizationConfig) -> Self {
299        self.quantization = quantization;
300        self
301    }
302
303    pub fn with_ann(mut self, ann: AnnConfig) -> Self {
304        self.ann = ann;
305        self
306    }
307}
308
309/// Custom error type
310#[derive(Error, Debug, Clone)]
311pub enum IndexError {
312    #[error("Backend error: {0}")]
313    Backend(String),
314    #[error("Serialization encode error: {0}")]
315    Encode(String),
316    #[error("Serialization decode error: {0}")]
317    Decode(String),
318    #[error("Compression error: {0}")]
319    Zstd(String),
320}
321
322impl From<EncodeError> for IndexError {
323    fn from(e: EncodeError) -> Self {
324        IndexError::Encode(e.to_string())
325    }
326}
327
328impl From<DecodeError> for IndexError {
329    fn from(e: DecodeError) -> Self {
330        IndexError::Decode(e.to_string())
331    }
332}
333
334impl From<std::io::Error> for IndexError {
335    fn from(e: std::io::Error) -> Self {
336        IndexError::Zstd(e.to_string())
337    }
338}
339
340impl IndexError {
341    pub fn backend<E: std::fmt::Display>(err: E) -> Self {
342        Self::Backend(err.to_string())
343    }
344}
345
346/// Index structure with lock-free concurrent access via DashMap and ANN support
347pub struct UfpIndex {
348    /// The backend used for storage, abstracted behind a trait.
349    backend: Box<dyn IndexBackend>,
350    /// The configuration for index.
351    cfg: IndexConfig,
352    /// Lock-free inverted index for MinHash signatures to enable concurrent O(1) lookup
353    perceptual_index: DashMap<u64, Vec<String>>,
354    /// Lock-free index for semantic embeddings enabling concurrent access
355    semantic_index: DashMap<String, QuantizedVec>,
356    /// ANN index for fast approximate semantic search (HNSW)
357    ann_index: std::sync::Mutex<Option<ann::AnnIndex>>,
358    /// Tracks if ANN index needs rebuilding
359    ann_needs_rebuild: std::sync::atomic::AtomicBool,
360}
361
362impl UfpIndex {
363    /// Initialize or open an index using the configured backend.
364    /// This will build the backend from the config.
365    pub fn new(cfg: IndexConfig) -> Result<Self, IndexError> {
366        let backend = cfg.backend.build()?;
367        Ok(Self::with_backend(cfg, backend))
368    }
369
370    /// Build an index with a custom backend (e.g., in-memory for tests).
371    /// This is useful for dependency injection and testing.
372    pub fn with_backend(cfg: IndexConfig, backend: Box<dyn IndexBackend>) -> Self {
373        let ann_index = if cfg.ann.enabled {
374            // Initialize with placeholder dimension - will be set on first insert
375            Some(crate::ann::AnnIndex::new(0, cfg.ann))
376        } else {
377            None
378        };
379
380        Self {
381            backend,
382            cfg,
383            perceptual_index: DashMap::new(),
384            semantic_index: DashMap::new(),
385            ann_index: std::sync::Mutex::new(ann_index),
386            ann_needs_rebuild: std::sync::atomic::AtomicBool::new(false),
387        }
388    }
389
390    /// Get the number of vectors in the semantic index.
391    pub fn semantic_vector_count(&self) -> usize {
392        self.semantic_index.len()
393    }
394
395    /// Check if ANN search should be used for the current dataset size.
396    pub fn should_use_ann(&self) -> bool {
397        self.cfg.ann.enabled && self.semantic_vector_count() >= self.cfg.ann.min_vectors_for_ann
398    }
399
400    /// Rebuild the ANN index if needed.
401    /// This should be called periodically after batch insertions.
402    pub fn rebuild_ann_if_needed(&self) {
403        if self
404            .ann_needs_rebuild
405            .load(std::sync::atomic::Ordering::Relaxed)
406            && self.should_use_ann()
407        {
408            if let Ok(mut ann_lock) = self.ann_index.try_lock() {
409                if let Some(ref mut ann) = *ann_lock {
410                    // Collect all embeddings from DashMap
411                    let mut vectors_to_insert: Vec<(String, Vec<f32>)> = Vec::new();
412                    let mut dimension = 0;
413
414                    for entry in self.semantic_index.iter() {
415                        let hash = entry.key().clone();
416                        let quantized = entry.value();
417
418                        // Dequantize from i8 to f32 for ANN
419                        let float_vec: Vec<f32> =
420                            quantized.iter().map(|&v| v as f32 / 100.0).collect();
421                        dimension = float_vec.len();
422                        vectors_to_insert.push((hash, float_vec));
423                    }
424
425                    // Rebuild ANN index with correct dimension
426                    if dimension > 0 && !vectors_to_insert.is_empty() {
427                        *ann = ann::AnnIndex::new(dimension, self.cfg.ann);
428                        for (hash, vec) in vectors_to_insert {
429                            let _ = ann.insert(hash, vec);
430                        }
431                        ann.build();
432                    }
433
434                    self.ann_needs_rebuild
435                        .store(false, std::sync::atomic::Ordering::Relaxed);
436                }
437            }
438        }
439    }
440
441    /// Quantize float embeddings -> i8 using a raw scale.
442    /// This is a simple linear quantization with clamping.
443    pub fn quantize(vec: &Array1<f32>, scale: f32) -> QuantizedVec {
444        let mut out = Vec::with_capacity(vec.len());
445        out.extend(vec.iter().map(|&v| (v * scale).clamp(-128.0, 127.0) as i8));
446        out
447    }
448
449    /// Quantize using a configured strategy.
450    /// This allows for different quantization strategies to be used in the future.
451    pub fn quantize_with_strategy(vec: &Array1<f32>, cfg: &QuantizationConfig) -> QuantizedVec {
452        Self::quantize(vec, cfg.scale())
453    }
454
455    /// Insert or update a record.
456    /// The record is encoded and compressed before being sent to the backend.
457    pub fn upsert(&self, rec: &IndexRecord) -> Result<(), IndexError> {
458        let payload = self.encode_record(rec)?;
459
460        // Update auxiliary indexes for faster queries using lock-free DashMap
461        if let Some(ref perceptual) = rec.perceptual {
462            for &hash_val in perceptual {
463                self.perceptual_index
464                    .entry(hash_val)
465                    .and_modify(|v| v.push(rec.canonical_hash.clone()))
466                    .or_insert_with(|| vec![rec.canonical_hash.clone()]);
467            }
468        }
469
470        if let Some(ref embedding) = rec.embedding {
471            self.semantic_index
472                .insert(rec.canonical_hash.clone(), embedding.clone());
473
474            // Also insert into ANN index if enabled
475            if self.cfg.ann.enabled {
476                if let Ok(mut ann_lock) = self.ann_index.try_lock() {
477                    if let Some(ref mut ann) = *ann_lock {
478                        // Dequantize from i8 to f32 for ANN
479                        let float_vec: Vec<f32> =
480                            embedding.iter().map(|&v| v as f32 / 100.0).collect();
481                        let _ = ann.insert(rec.canonical_hash.clone(), float_vec);
482                    }
483                }
484            }
485        }
486
487        self.backend.put(&rec.canonical_hash, &payload)
488    }
489
490    /// Remove a record from the index.
491    pub fn delete(&self, hash: &str) -> Result<(), IndexError> {
492        self.backend.delete(hash)
493    }
494
495    /// Flush backend buffers if supported.
496    /// This is useful for ensuring data is written to disk.
497    pub fn flush(&self) -> Result<(), IndexError> {
498        self.backend.flush()
499    }
500
501    /// Retrieve a record by hash.
502    /// The record is decompressed and decoded after being retrieved from the backend.
503    pub fn get(&self, hash: &str) -> Result<Option<IndexRecord>, IndexError> {
504        if let Some(data) = self.backend.get(hash)? {
505            let record = self.decode_record(&data)?;
506            Ok(Some(record))
507        } else {
508            Ok(None)
509        }
510    }
511
512    /// Scan all records in the index.
513    /// Iterates over all records in the backend, decodes them, and passes them to the visitor.
514    pub fn scan(
515        &self,
516        visitor: &mut dyn FnMut(&IndexRecord) -> Result<(), IndexError>,
517    ) -> Result<(), IndexError> {
518        self.backend.scan(&mut |data: &[u8]| {
519            let record = self.decode_record(data)?;
520            visitor(&record)
521        })
522    }
523
524    /// Batch insert multiple records (efficient for large datasets).
525    /// This can be much faster than calling `upsert` in a loop.
526    pub fn batch_insert(&self, records: &[IndexRecord]) -> Result<(), IndexError> {
527        let mut entries = Vec::with_capacity(records.len());
528        let mut perceptual_updates: Vec<(u64, &str)> = Vec::new();
529        let mut semantic_updates: Vec<(&str, &QuantizedVec)> = Vec::new();
530
531        for rec in records {
532            entries.push((rec.canonical_hash.clone(), self.encode_record(rec)?));
533            if let Some(ref perceptual) = rec.perceptual {
534                for &hash_val in perceptual {
535                    perceptual_updates.push((hash_val, rec.canonical_hash.as_str()));
536                }
537            }
538            if let Some(ref embedding) = rec.embedding {
539                semantic_updates.push((rec.canonical_hash.as_str(), embedding));
540            }
541        }
542
543        // Update perceptual index using lock-free DashMap
544        for (hash_val, canonical_hash) in perceptual_updates {
545            self.perceptual_index
546                .entry(hash_val)
547                .and_modify(|v| v.push(canonical_hash.to_string()))
548                .or_insert_with(|| vec![canonical_hash.to_string()]);
549        }
550
551        // Update semantic index using lock-free DashMap
552        let mut ann_needs_rebuild = false;
553        for (canonical_hash, embedding) in semantic_updates {
554            self.semantic_index
555                .insert(canonical_hash.to_string(), embedding.clone());
556
557            // Also insert into ANN index if enabled
558            if self.cfg.ann.enabled {
559                if let Ok(mut ann_lock) = self.ann_index.try_lock() {
560                    if let Some(ref mut ann) = *ann_lock {
561                        // Dequantize from i8 to f32 for ANN
562                        let float_vec: Vec<f32> =
563                            embedding.iter().map(|&v| v as f32 / 100.0).collect();
564                        let _ = ann.insert(canonical_hash.to_string(), float_vec);
565                        ann_needs_rebuild = true;
566                    }
567                }
568            }
569        }
570
571        // Mark ANN index for rebuild after batch insert
572        if ann_needs_rebuild {
573            self.ann_needs_rebuild
574                .store(true, std::sync::atomic::Ordering::Relaxed);
575        }
576
577        self.backend.batch_put(entries)
578    }
579
580    /// Decodes and decompresses a record from the backend.
581    pub(crate) fn decode_record(&self, data: &[u8]) -> Result<IndexRecord, IndexError> {
582        let decompressed = self.cfg.compression.decompress(data)?;
583        let (record, _) = decode_from_slice(&decompressed, standard())?;
584        Ok(record)
585    }
586
587    /// Encodes and compresses a record for storage in the backend.
588    fn encode_record(&self, rec: &IndexRecord) -> Result<Vec<u8>, IndexError> {
589        let encoded = encode_to_vec(rec, standard())?;
590        self.cfg.compression.compress(&encoded)
591    }
592}
593
594#[cfg(test)]
595mod tests {
596    use super::*;
597    use serde_json::json;
598
599    fn test_config() -> IndexConfig {
600        IndexConfig::new().with_backend(BackendConfig::InMemory)
601    }
602
603    fn sample_record(hash: &str, embedding: Vec<i8>, perceptual: Vec<u64>) -> IndexRecord {
604        IndexRecord {
605            schema_version: INDEX_SCHEMA_VERSION,
606            canonical_hash: hash.to_string(),
607            perceptual: Some(perceptual),
608            embedding: Some(embedding),
609            metadata: json!({ "source": hash }),
610        }
611    }
612
613    #[test]
614    fn in_memory_backend_roundtrip() {
615        let backend = Box::new(InMemoryBackend::new());
616        let index = UfpIndex::with_backend(test_config(), backend);
617
618        let rec = sample_record("doc-a", vec![1, 2, 3], vec![10, 20, 30]);
619        index.upsert(&rec).expect("upsert succeeds");
620
621        let fetched = index.get("doc-a").expect("get ok").expect("record exists");
622        assert_eq!(fetched.canonical_hash, "doc-a");
623        assert_eq!(fetched.metadata, rec.metadata);
624    }
625
626    #[test]
627    fn search_uses_backend_scan() {
628        let backend = Box::new(InMemoryBackend::new());
629        let index = UfpIndex::with_backend(test_config(), backend);
630
631        let records = vec![
632            sample_record("doc-a", vec![10, 0], vec![1, 2, 3]),
633            sample_record("doc-b", vec![9, 0], vec![3, 4, 5]),
634        ];
635        for rec in &records {
636            index.upsert(rec).unwrap();
637        }
638
639        let query = IndexRecord {
640            schema_version: INDEX_SCHEMA_VERSION,
641            canonical_hash: "query".into(),
642            perceptual: Some(vec![3, 5]),
643            embedding: Some(vec![10, 0]),
644            metadata: json!({}),
645        };
646
647        let semantic = index
648            .search(&query, QueryMode::Semantic, 2)
649            .expect("semantic search");
650        assert_eq!(semantic.len(), 2);
651        assert_eq!(semantic[0].canonical_hash, "doc-a");
652
653        let perceptual = index
654            .search(&query, QueryMode::Perceptual, 2)
655            .expect("perceptual search");
656        assert_eq!(perceptual[0].canonical_hash, "doc-b");
657    }
658}