Skip to main content

storage/
delta.rs

1//! Delta Encoding for Vector Updates
2//!
3//! This module provides efficient storage of vector updates by encoding only
4//! the differences (deltas) between consecutive vector versions. This is useful
5//! for frequently updated vectors where changes are typically small.
6//!
7//! Key features:
8//! - Sparse delta encoding (only changed components stored)
9//! - Run-length encoding for contiguous changes
10//! - Configurable compression thresholds
11//! - Version tracking and reconstruction
12//! - Automatic compaction of delta chains
13
14use common::{NamespaceId, Vector, VectorId};
15use parking_lot::RwLock;
16use serde::{Deserialize, Serialize};
17use std::collections::{HashMap, VecDeque};
18use std::sync::atomic::{AtomicU64, Ordering};
19
20// ============================================================================
21// Delta Encoding Configuration
22// ============================================================================
23
24/// Configuration for delta encoding
25#[derive(Debug, Clone)]
26pub struct DeltaConfig {
27    /// Maximum number of deltas to chain before forcing a full snapshot
28    pub max_delta_chain: usize,
29    /// Threshold for sparse encoding (ratio of changed components)
30    /// Below this threshold, use sparse encoding; above, store full vector
31    pub sparse_threshold: f32,
32    /// Minimum change magnitude to consider a component changed
33    pub epsilon: f32,
34    /// Enable run-length encoding for contiguous changes
35    pub enable_rle: bool,
36    /// Maximum memory for delta storage per namespace (bytes)
37    pub max_memory_per_namespace: usize,
38    /// Enable automatic compaction
39    pub auto_compact: bool,
40    /// Compact when delta chain reaches this length
41    pub compact_threshold: usize,
42}
43
44impl Default for DeltaConfig {
45    fn default() -> Self {
46        Self {
47            max_delta_chain: 10,
48            sparse_threshold: 0.5, // If >50% components changed, store full vector
49            epsilon: 1e-7,
50            enable_rle: true,
51            max_memory_per_namespace: 100 * 1024 * 1024, // 100MB
52            auto_compact: true,
53            compact_threshold: 5,
54        }
55    }
56}
57
58impl DeltaConfig {
59    /// Create a new config
60    pub fn new() -> Self {
61        Self::default()
62    }
63
64    /// Set max delta chain length
65    pub fn with_max_chain(mut self, max: usize) -> Self {
66        self.max_delta_chain = max;
67        self
68    }
69
70    /// Set sparse threshold
71    pub fn with_sparse_threshold(mut self, threshold: f32) -> Self {
72        self.sparse_threshold = threshold.clamp(0.0, 1.0);
73        self
74    }
75
76    /// Set epsilon for change detection
77    pub fn with_epsilon(mut self, epsilon: f32) -> Self {
78        self.epsilon = epsilon.abs();
79        self
80    }
81
82    /// Disable RLE
83    pub fn without_rle(mut self) -> Self {
84        self.enable_rle = false;
85        self
86    }
87
88    /// Disable auto compaction
89    pub fn without_auto_compact(mut self) -> Self {
90        self.auto_compact = false;
91        self
92    }
93}
94
95// ============================================================================
96// Delta Types
97// ============================================================================
98
99/// A single component change
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct ComponentChange {
102    /// Index of the changed component
103    pub index: u32,
104    /// New value
105    pub value: f32,
106}
107
108/// Run-length encoded change (contiguous range)
109#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct RleChange {
111    /// Starting index
112    pub start: u32,
113    /// Values for contiguous indices
114    pub values: Vec<f32>,
115}
116
117/// Encoding type for a delta
118#[derive(Debug, Clone, Serialize, Deserialize)]
119pub enum DeltaEncoding {
120    /// Sparse encoding: list of individual component changes
121    Sparse(Vec<ComponentChange>),
122    /// Run-length encoded: contiguous ranges
123    Rle(Vec<RleChange>),
124    /// Full vector (when too many changes)
125    Full(Vec<f32>),
126}
127
128/// A delta representing changes between vector versions
129#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct VectorDelta {
131    /// Version number this delta leads to
132    pub version: u64,
133    /// Previous version (base for this delta)
134    pub base_version: u64,
135    /// Timestamp when delta was created
136    pub timestamp: u64,
137    /// The encoded changes
138    pub encoding: DeltaEncoding,
139    /// Size in bytes (estimated)
140    pub size_bytes: usize,
141}
142
143impl VectorDelta {
144    /// Calculate the number of changed components
145    pub fn change_count(&self) -> usize {
146        match &self.encoding {
147            DeltaEncoding::Sparse(changes) => changes.len(),
148            DeltaEncoding::Rle(runs) => runs.iter().map(|r| r.values.len()).sum(),
149            DeltaEncoding::Full(values) => values.len(),
150        }
151    }
152
153    /// Check if this is a full snapshot (not a delta)
154    pub fn is_full(&self) -> bool {
155        matches!(self.encoding, DeltaEncoding::Full(_))
156    }
157}
158
159/// A versioned vector with its delta history
160#[derive(Debug, Clone)]
161pub struct VersionedVector {
162    /// Vector ID
163    pub id: VectorId,
164    /// Current version number
165    pub current_version: u64,
166    /// Base snapshot (full vector at some version)
167    pub base_snapshot: Vec<f32>,
168    /// Version of the base snapshot
169    pub base_version: u64,
170    /// Chain of deltas from base to current
171    pub deltas: VecDeque<VectorDelta>,
172    /// Metadata (preserved across versions)
173    pub metadata: Option<serde_json::Value>,
174    /// TTL fields
175    pub ttl_seconds: Option<u64>,
176    pub expires_at: Option<u64>,
177}
178
179impl VersionedVector {
180    /// Create a new versioned vector
181    pub fn new(vector: &Vector) -> Self {
182        Self {
183            id: vector.id.clone(),
184            current_version: 1,
185            base_snapshot: vector.values.clone(),
186            base_version: 1,
187            deltas: VecDeque::new(),
188            metadata: vector.metadata.clone(),
189            ttl_seconds: vector.ttl_seconds,
190            expires_at: vector.expires_at,
191        }
192    }
193
194    /// Reconstruct the current vector values
195    pub fn reconstruct(&self) -> Vec<f32> {
196        let mut values = self.base_snapshot.clone();
197
198        for delta in &self.deltas {
199            apply_delta(&mut values, delta);
200        }
201
202        values
203    }
204
205    /// Reconstruct vector at a specific version
206    pub fn reconstruct_at_version(&self, version: u64) -> Option<Vec<f32>> {
207        if version < self.base_version {
208            return None; // Version not available
209        }
210
211        if version == self.base_version {
212            return Some(self.base_snapshot.clone());
213        }
214
215        let mut values = self.base_snapshot.clone();
216
217        for delta in &self.deltas {
218            if delta.version > version {
219                break;
220            }
221            apply_delta(&mut values, delta);
222        }
223
224        Some(values)
225    }
226
227    /// Get the total size in bytes (estimated)
228    pub fn size_bytes(&self) -> usize {
229        let base_size = self.base_snapshot.len() * 4;
230        let delta_size: usize = self.deltas.iter().map(|d| d.size_bytes).sum();
231        let metadata_size = self
232            .metadata
233            .as_ref()
234            .map(|m| serde_json::to_string(m).map(|s| s.len()).unwrap_or(0))
235            .unwrap_or(0);
236
237        base_size + delta_size + metadata_size + 64 // overhead
238    }
239
240    /// Convert to a Vector
241    pub fn to_vector(&self) -> Vector {
242        Vector {
243            id: self.id.clone(),
244            values: self.reconstruct(),
245            metadata: self.metadata.clone(),
246            ttl_seconds: self.ttl_seconds,
247            expires_at: self.expires_at,
248        }
249    }
250
251    /// Get delta chain length
252    pub fn delta_chain_length(&self) -> usize {
253        self.deltas.len()
254    }
255}
256
257// ============================================================================
258// Delta Operations
259// ============================================================================
260
261/// Apply a delta to a vector, returning the new values
262/// This handles dimension changes by resizing when needed
263fn apply_delta(values: &mut Vec<f32>, delta: &VectorDelta) {
264    match &delta.encoding {
265        DeltaEncoding::Sparse(changes) => {
266            for change in changes {
267                let idx = change.index as usize;
268                // Extend if index is beyond current length
269                if idx >= values.len() {
270                    values.resize(idx + 1, 0.0);
271                }
272                values[idx] = change.value;
273            }
274        }
275        DeltaEncoding::Rle(runs) => {
276            for run in runs {
277                let start = run.start as usize;
278                for (i, &value) in run.values.iter().enumerate() {
279                    let idx = start + i;
280                    // Extend if index is beyond current length
281                    if idx >= values.len() {
282                        values.resize(idx + 1, 0.0);
283                    }
284                    values[idx] = value;
285                }
286            }
287        }
288        DeltaEncoding::Full(new_values) => {
289            // Full replacement - just clone the new values
290            values.clear();
291            values.extend_from_slice(new_values);
292        }
293    }
294}
295
296/// Compute delta between old and new vectors
297pub fn compute_delta(
298    old_values: &[f32],
299    new_values: &[f32],
300    config: &DeltaConfig,
301    base_version: u64,
302    new_version: u64,
303) -> VectorDelta {
304    let now = std::time::SystemTime::now()
305        .duration_since(std::time::UNIX_EPOCH)
306        .unwrap_or_default()
307        .as_secs();
308
309    // If dimensions changed, always use full encoding to avoid data corruption
310    if old_values.len() != new_values.len() {
311        let size_bytes = estimate_encoding_size(&DeltaEncoding::Full(new_values.to_vec()));
312        return VectorDelta {
313            version: new_version,
314            base_version,
315            timestamp: now,
316            encoding: DeltaEncoding::Full(new_values.to_vec()),
317            size_bytes,
318        };
319    }
320
321    // Find changed components
322    let mut changes: Vec<(usize, f32)> = Vec::new();
323    let len = old_values.len();
324
325    for i in 0..len {
326        if (old_values[i] - new_values[i]).abs() > config.epsilon {
327            changes.push((i, new_values[i]));
328        }
329    }
330
331    let change_ratio = changes.len() as f32 / new_values.len().max(1) as f32;
332
333    // Decide encoding
334    let encoding = if change_ratio > config.sparse_threshold {
335        // Too many changes, store full vector
336        DeltaEncoding::Full(new_values.to_vec())
337    } else if config.enable_rle && changes.len() > 2 {
338        // Try RLE encoding
339        encode_rle(&changes)
340    } else {
341        // Sparse encoding
342        DeltaEncoding::Sparse(
343            changes
344                .into_iter()
345                .map(|(idx, val)| ComponentChange {
346                    index: idx as u32,
347                    value: val,
348                })
349                .collect(),
350        )
351    };
352
353    let size_bytes = estimate_encoding_size(&encoding);
354
355    VectorDelta {
356        version: new_version,
357        base_version,
358        timestamp: now,
359        encoding,
360        size_bytes,
361    }
362}
363
364/// Encode changes using run-length encoding
365fn encode_rle(changes: &[(usize, f32)]) -> DeltaEncoding {
366    if changes.is_empty() {
367        return DeltaEncoding::Sparse(vec![]);
368    }
369
370    let mut runs: Vec<RleChange> = Vec::new();
371    let mut current_run: Option<RleChange> = None;
372
373    for &(idx, value) in changes {
374        match &mut current_run {
375            Some(run) => {
376                let expected_idx = run.start as usize + run.values.len();
377                if idx == expected_idx {
378                    // Extend current run
379                    run.values.push(value);
380                } else {
381                    // End current run, start new one
382                    runs.push(current_run.take().unwrap());
383                    current_run = Some(RleChange {
384                        start: idx as u32,
385                        values: vec![value],
386                    });
387                }
388            }
389            None => {
390                current_run = Some(RleChange {
391                    start: idx as u32,
392                    values: vec![value],
393                });
394            }
395        }
396    }
397
398    if let Some(run) = current_run {
399        runs.push(run);
400    }
401
402    // Decide if RLE is worth it
403    let rle_size: usize = runs.iter().map(|r| 4 + r.values.len() * 4).sum();
404    let sparse_size = changes.len() * 8; // index + value
405
406    if rle_size < sparse_size {
407        DeltaEncoding::Rle(runs)
408    } else {
409        DeltaEncoding::Sparse(
410            changes
411                .iter()
412                .map(|&(idx, val)| ComponentChange {
413                    index: idx as u32,
414                    value: val,
415                })
416                .collect(),
417        )
418    }
419}
420
421/// Estimate size of encoding in bytes
422fn estimate_encoding_size(encoding: &DeltaEncoding) -> usize {
423    match encoding {
424        DeltaEncoding::Sparse(changes) => changes.len() * 8 + 16,
425        DeltaEncoding::Rle(runs) => runs.iter().map(|r| 8 + r.values.len() * 4).sum::<usize>() + 16,
426        DeltaEncoding::Full(values) => values.len() * 4 + 16,
427    }
428}
429
430// ============================================================================
431// Delta Store
432// ============================================================================
433
434/// Statistics for delta storage
435#[derive(Debug, Clone, Default)]
436pub struct DeltaStats {
437    /// Total versioned vectors
438    pub total_vectors: u64,
439    /// Total deltas stored
440    pub total_deltas: u64,
441    /// Total base snapshots
442    pub total_snapshots: u64,
443    /// Estimated memory usage in bytes
444    pub memory_bytes: u64,
445    /// Average delta chain length
446    pub avg_chain_length: f64,
447    /// Compression ratio (original / compressed)
448    pub compression_ratio: f64,
449    /// Number of compactions performed
450    pub compactions: u64,
451}
452
453/// Delta store for a namespace
454pub struct NamespaceDeltaStore {
455    /// Versioned vectors
456    vectors: RwLock<HashMap<VectorId, VersionedVector>>,
457    /// Configuration
458    config: DeltaConfig,
459    /// Statistics
460    stats: AtomicDeltaStats,
461}
462
463/// Atomic statistics
464struct AtomicDeltaStats {
465    total_vectors: AtomicU64,
466    total_deltas: AtomicU64,
467    compactions: AtomicU64,
468}
469
470impl AtomicDeltaStats {
471    fn new() -> Self {
472        Self {
473            total_vectors: AtomicU64::new(0),
474            total_deltas: AtomicU64::new(0),
475            compactions: AtomicU64::new(0),
476        }
477    }
478}
479
480impl NamespaceDeltaStore {
481    /// Create a new delta store
482    pub fn new(config: DeltaConfig) -> Self {
483        Self {
484            vectors: RwLock::new(HashMap::new()),
485            config,
486            stats: AtomicDeltaStats::new(),
487        }
488    }
489
490    /// Insert or update a vector
491    pub fn upsert(&self, vector: &Vector) -> UpsertResult {
492        let mut vectors = self.vectors.write();
493
494        if let Some(existing) = vectors.get_mut(&vector.id) {
495            // Compute delta
496            let current_values = existing.reconstruct();
497            let new_version = existing.current_version + 1;
498
499            let delta = compute_delta(
500                &current_values,
501                &vector.values,
502                &self.config,
503                existing.current_version,
504                new_version,
505            );
506
507            let delta_size = delta.size_bytes;
508            let is_full = delta.is_full();
509
510            // Add delta
511            existing.deltas.push_back(delta);
512            existing.current_version = new_version;
513            existing.metadata = vector.metadata.clone();
514            existing.ttl_seconds = vector.ttl_seconds;
515            existing.expires_at = vector.expires_at;
516
517            self.stats.total_deltas.fetch_add(1, Ordering::SeqCst);
518
519            // Check if compaction needed
520            let should_compact = self.config.auto_compact
521                && existing.delta_chain_length() >= self.config.compact_threshold;
522
523            if should_compact {
524                self.compact_vector(existing);
525            }
526
527            UpsertResult {
528                is_new: false,
529                version: new_version,
530                delta_size,
531                used_full_encoding: is_full,
532                compacted: should_compact,
533            }
534        } else {
535            // New vector
536            let versioned = VersionedVector::new(vector);
537            vectors.insert(vector.id.clone(), versioned);
538
539            self.stats.total_vectors.fetch_add(1, Ordering::SeqCst);
540
541            UpsertResult {
542                is_new: true,
543                version: 1,
544                delta_size: 0,
545                used_full_encoding: false,
546                compacted: false,
547            }
548        }
549    }
550
551    /// Get a vector by ID
552    pub fn get(&self, id: &VectorId) -> Option<Vector> {
553        self.vectors.read().get(id).map(|v| v.to_vector())
554    }
555
556    /// Get a vector at a specific version
557    pub fn get_at_version(&self, id: &VectorId, version: u64) -> Option<Vector> {
558        let vectors = self.vectors.read();
559        let versioned = vectors.get(id)?;
560
561        let values = versioned.reconstruct_at_version(version)?;
562
563        Some(Vector {
564            id: versioned.id.clone(),
565            values,
566            metadata: versioned.metadata.clone(),
567            ttl_seconds: versioned.ttl_seconds,
568            expires_at: versioned.expires_at,
569        })
570    }
571
572    /// Get version history for a vector
573    pub fn get_version_info(&self, id: &VectorId) -> Option<VersionInfo> {
574        let vectors = self.vectors.read();
575        let versioned = vectors.get(id)?;
576
577        Some(VersionInfo {
578            id: versioned.id.clone(),
579            current_version: versioned.current_version,
580            base_version: versioned.base_version,
581            delta_count: versioned.deltas.len(),
582            size_bytes: versioned.size_bytes(),
583        })
584    }
585
586    /// Delete a vector
587    pub fn delete(&self, id: &VectorId) -> bool {
588        let removed = self.vectors.write().remove(id).is_some();
589        if removed {
590            self.stats.total_vectors.fetch_sub(1, Ordering::SeqCst);
591        }
592        removed
593    }
594
595    /// Get all vectors
596    pub fn get_all(&self) -> Vec<Vector> {
597        self.vectors
598            .read()
599            .values()
600            .map(|v| v.to_vector())
601            .collect()
602    }
603
604    /// Compact a vector's delta chain (collapse to new base snapshot)
605    fn compact_vector(&self, versioned: &mut VersionedVector) {
606        // Reconstruct current state
607        let current_values = versioned.reconstruct();
608
609        // Replace with new base
610        versioned.base_snapshot = current_values;
611        versioned.base_version = versioned.current_version;
612        versioned.deltas.clear();
613
614        self.stats.compactions.fetch_add(1, Ordering::SeqCst);
615    }
616
617    /// Manually compact a specific vector
618    pub fn compact(&self, id: &VectorId) -> bool {
619        let mut vectors = self.vectors.write();
620        if let Some(versioned) = vectors.get_mut(id) {
621            self.compact_vector(versioned);
622            true
623        } else {
624            false
625        }
626    }
627
628    /// Compact all vectors in the store
629    pub fn compact_all(&self) -> usize {
630        let mut vectors = self.vectors.write();
631        let mut count = 0;
632
633        for versioned in vectors.values_mut() {
634            if !versioned.deltas.is_empty() {
635                self.compact_vector(versioned);
636                count += 1;
637            }
638        }
639
640        count
641    }
642
643    /// Get statistics
644    pub fn stats(&self) -> DeltaStats {
645        let vectors = self.vectors.read();
646
647        let total_vectors = vectors.len() as u64;
648        let total_deltas: usize = vectors.values().map(|v| v.deltas.len()).sum();
649        let memory_bytes: usize = vectors.values().map(|v| v.size_bytes()).sum();
650
651        let avg_chain = if total_vectors > 0 {
652            total_deltas as f64 / total_vectors as f64
653        } else {
654            0.0
655        };
656
657        // Calculate compression ratio
658        let original_size: usize = vectors
659            .values()
660            .map(|v| v.reconstruct().len() * 4 * (v.deltas.len() + 1))
661            .sum();
662
663        let compression_ratio = if memory_bytes > 0 {
664            original_size as f64 / memory_bytes as f64
665        } else {
666            1.0
667        };
668
669        DeltaStats {
670            total_vectors,
671            total_deltas: total_deltas as u64,
672            total_snapshots: total_vectors,
673            memory_bytes: memory_bytes as u64,
674            avg_chain_length: avg_chain,
675            compression_ratio,
676            compactions: self.stats.compactions.load(Ordering::SeqCst),
677        }
678    }
679
680    /// Get vector count
681    pub fn count(&self) -> usize {
682        self.vectors.read().len()
683    }
684
685    /// Check if empty
686    pub fn is_empty(&self) -> bool {
687        self.vectors.read().is_empty()
688    }
689
690    /// Clear all vectors
691    pub fn clear(&self) {
692        self.vectors.write().clear();
693        self.stats.total_vectors.store(0, Ordering::SeqCst);
694        self.stats.total_deltas.store(0, Ordering::SeqCst);
695    }
696}
697
698/// Result of an upsert operation
699#[derive(Debug, Clone)]
700pub struct UpsertResult {
701    /// Whether this was a new vector
702    pub is_new: bool,
703    /// New version number
704    pub version: u64,
705    /// Size of delta (0 for new vectors)
706    pub delta_size: usize,
707    /// Whether full encoding was used (not sparse/RLE)
708    pub used_full_encoding: bool,
709    /// Whether compaction was triggered
710    pub compacted: bool,
711}
712
713/// Version information for a vector
714#[derive(Debug, Clone)]
715pub struct VersionInfo {
716    /// Vector ID
717    pub id: VectorId,
718    /// Current version number
719    pub current_version: u64,
720    /// Base snapshot version
721    pub base_version: u64,
722    /// Number of deltas in chain
723    pub delta_count: usize,
724    /// Total size in bytes
725    pub size_bytes: usize,
726}
727
728// ============================================================================
729// Delta Store Manager (Multi-Namespace)
730// ============================================================================
731
732/// Manager for delta stores across namespaces
733pub struct DeltaStoreManager {
734    stores: RwLock<HashMap<NamespaceId, NamespaceDeltaStore>>,
735    config: DeltaConfig,
736}
737
738impl DeltaStoreManager {
739    /// Create a new manager
740    pub fn new(config: DeltaConfig) -> Self {
741        Self {
742            stores: RwLock::new(HashMap::new()),
743            config,
744        }
745    }
746
747    /// Create with default config
748    pub fn with_defaults() -> Self {
749        Self::new(DeltaConfig::default())
750    }
751
752    /// Get or create a namespace store
753    pub fn get_or_create(&self, namespace: &NamespaceId) -> bool {
754        let mut stores = self.stores.write();
755        if !stores.contains_key(namespace) {
756            stores.insert(
757                namespace.clone(),
758                NamespaceDeltaStore::new(self.config.clone()),
759            );
760            true
761        } else {
762            false
763        }
764    }
765
766    /// Upsert vectors into a namespace
767    pub fn upsert(&self, namespace: &NamespaceId, vectors: &[Vector]) -> Vec<UpsertResult> {
768        self.get_or_create(namespace);
769
770        let stores = self.stores.read();
771        if let Some(store) = stores.get(namespace) {
772            vectors.iter().map(|v| store.upsert(v)).collect()
773        } else {
774            vec![]
775        }
776    }
777
778    /// Get a vector
779    pub fn get(&self, namespace: &NamespaceId, id: &VectorId) -> Option<Vector> {
780        self.stores.read().get(namespace)?.get(id)
781    }
782
783    /// Get all vectors in namespace
784    pub fn get_all(&self, namespace: &NamespaceId) -> Vec<Vector> {
785        self.stores
786            .read()
787            .get(namespace)
788            .map(|s| s.get_all())
789            .unwrap_or_default()
790    }
791
792    /// Delete a vector
793    pub fn delete(&self, namespace: &NamespaceId, id: &VectorId) -> bool {
794        self.stores
795            .read()
796            .get(namespace)
797            .map(|s| s.delete(id))
798            .unwrap_or(false)
799    }
800
801    /// Delete a namespace
802    pub fn delete_namespace(&self, namespace: &NamespaceId) -> bool {
803        self.stores.write().remove(namespace).is_some()
804    }
805
806    /// Compact all vectors in a namespace
807    pub fn compact_namespace(&self, namespace: &NamespaceId) -> usize {
808        self.stores
809            .read()
810            .get(namespace)
811            .map(|s| s.compact_all())
812            .unwrap_or(0)
813    }
814
815    /// Get stats for a namespace
816    pub fn namespace_stats(&self, namespace: &NamespaceId) -> Option<DeltaStats> {
817        self.stores.read().get(namespace).map(|s| s.stats())
818    }
819
820    /// Get combined stats
821    pub fn stats(&self) -> DeltaStats {
822        let stores = self.stores.read();
823
824        let mut combined = DeltaStats::default();
825
826        for store in stores.values() {
827            let s = store.stats();
828            combined.total_vectors += s.total_vectors;
829            combined.total_deltas += s.total_deltas;
830            combined.total_snapshots += s.total_snapshots;
831            combined.memory_bytes += s.memory_bytes;
832            combined.compactions += s.compactions;
833        }
834
835        if combined.total_vectors > 0 {
836            combined.avg_chain_length =
837                combined.total_deltas as f64 / combined.total_vectors as f64;
838        }
839
840        combined
841    }
842
843    /// List namespaces
844    pub fn list_namespaces(&self) -> Vec<NamespaceId> {
845        self.stores.read().keys().cloned().collect()
846    }
847}
848
849// ============================================================================
850// Tests
851// ============================================================================
852
853#[cfg(test)]
854mod tests {
855    use super::*;
856
857    fn make_vector(id: &str, values: Vec<f32>) -> Vector {
858        Vector {
859            id: id.to_string(),
860            values,
861            metadata: None,
862            ttl_seconds: None,
863            expires_at: None,
864        }
865    }
866
867    #[test]
868    fn test_delta_config_builder() {
869        let config = DeltaConfig::new()
870            .with_max_chain(20)
871            .with_sparse_threshold(0.3)
872            .with_epsilon(1e-5)
873            .without_rle()
874            .without_auto_compact();
875
876        assert_eq!(config.max_delta_chain, 20);
877        assert!((config.sparse_threshold - 0.3).abs() < 0.001);
878        assert!((config.epsilon - 1e-5).abs() < 1e-10);
879        assert!(!config.enable_rle);
880        assert!(!config.auto_compact);
881    }
882
883    #[test]
884    fn test_compute_delta_sparse() {
885        let old = vec![1.0, 2.0, 3.0, 4.0, 5.0];
886        let new = vec![1.0, 2.5, 3.0, 4.0, 5.5]; // Changed indices 1 and 4
887
888        let config = DeltaConfig::default();
889        let delta = compute_delta(&old, &new, &config, 1, 2);
890
891        assert_eq!(delta.version, 2);
892        assert_eq!(delta.base_version, 1);
893
894        if let DeltaEncoding::Sparse(changes) = delta.encoding {
895            assert_eq!(changes.len(), 2);
896            assert!(changes
897                .iter()
898                .any(|c| c.index == 1 && (c.value - 2.5).abs() < 0.001));
899            assert!(changes
900                .iter()
901                .any(|c| c.index == 4 && (c.value - 5.5).abs() < 0.001));
902        } else {
903            panic!("Expected sparse encoding");
904        }
905    }
906
907    #[test]
908    fn test_compute_delta_rle() {
909        let old = vec![0.0; 10];
910        let new = vec![0.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0]; // Run of 4 changes
911
912        let config = DeltaConfig::default();
913        let delta = compute_delta(&old, &new, &config, 1, 2);
914
915        // Should use RLE for contiguous changes
916        if let DeltaEncoding::Rle(runs) = delta.encoding {
917            assert_eq!(runs.len(), 1);
918            assert_eq!(runs[0].start, 1);
919            assert_eq!(runs[0].values.len(), 4);
920        } else if let DeltaEncoding::Sparse(changes) = delta.encoding {
921            // RLE wasn't more efficient, that's ok
922            assert_eq!(changes.len(), 4);
923        } else {
924            panic!("Expected RLE or Sparse encoding");
925        }
926    }
927
928    #[test]
929    fn test_compute_delta_full() {
930        let old = vec![0.0; 10];
931        let new = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]; // All changed
932
933        let config = DeltaConfig::default();
934        let delta = compute_delta(&old, &new, &config, 1, 2);
935
936        // Should use full encoding
937        assert!(delta.is_full());
938
939        if let DeltaEncoding::Full(values) = delta.encoding {
940            assert_eq!(values, new);
941        } else {
942            panic!("Expected full encoding");
943        }
944    }
945
946    #[test]
947    fn test_apply_delta_sparse() {
948        let mut values = vec![1.0, 2.0, 3.0, 4.0, 5.0];
949        let delta = VectorDelta {
950            version: 2,
951            base_version: 1,
952            timestamp: 0,
953            encoding: DeltaEncoding::Sparse(vec![
954                ComponentChange {
955                    index: 1,
956                    value: 10.0,
957                },
958                ComponentChange {
959                    index: 3,
960                    value: 20.0,
961                },
962            ]),
963            size_bytes: 0,
964        };
965
966        apply_delta(&mut values, &delta);
967
968        assert_eq!(values, vec![1.0, 10.0, 3.0, 20.0, 5.0]);
969    }
970
971    #[test]
972    fn test_apply_delta_rle() {
973        let mut values = vec![0.0; 10];
974        let delta = VectorDelta {
975            version: 2,
976            base_version: 1,
977            timestamp: 0,
978            encoding: DeltaEncoding::Rle(vec![RleChange {
979                start: 2,
980                values: vec![1.0, 2.0, 3.0],
981            }]),
982            size_bytes: 0,
983        };
984
985        apply_delta(&mut values, &delta);
986
987        assert_eq!(values[2], 1.0);
988        assert_eq!(values[3], 2.0);
989        assert_eq!(values[4], 3.0);
990    }
991
992    #[test]
993    fn test_versioned_vector() {
994        let v = make_vector("v1", vec![1.0, 2.0, 3.0]);
995        let versioned = VersionedVector::new(&v);
996
997        assert_eq!(versioned.current_version, 1);
998        assert_eq!(versioned.base_version, 1);
999        assert_eq!(versioned.deltas.len(), 0);
1000        assert_eq!(versioned.reconstruct(), vec![1.0, 2.0, 3.0]);
1001    }
1002
1003    #[test]
1004    fn test_namespace_delta_store_upsert_new() {
1005        let store = NamespaceDeltaStore::new(DeltaConfig::default());
1006
1007        let v = make_vector("v1", vec![1.0, 2.0, 3.0]);
1008        let result = store.upsert(&v);
1009
1010        assert!(result.is_new);
1011        assert_eq!(result.version, 1);
1012        assert_eq!(result.delta_size, 0);
1013    }
1014
1015    #[test]
1016    fn test_namespace_delta_store_upsert_update() {
1017        let store = NamespaceDeltaStore::new(DeltaConfig::default());
1018
1019        // Insert
1020        let v1 = make_vector("v1", vec![1.0, 2.0, 3.0]);
1021        store.upsert(&v1);
1022
1023        // Update
1024        let v2 = make_vector("v1", vec![1.0, 5.0, 3.0]);
1025        let result = store.upsert(&v2);
1026
1027        assert!(!result.is_new);
1028        assert_eq!(result.version, 2);
1029        assert!(result.delta_size > 0);
1030
1031        // Verify reconstruction
1032        let retrieved = store.get(&"v1".to_string()).unwrap();
1033        assert_eq!(retrieved.values, vec![1.0, 5.0, 3.0]);
1034    }
1035
1036    #[test]
1037    fn test_namespace_delta_store_version_history() {
1038        let store = NamespaceDeltaStore::new(DeltaConfig::default().without_auto_compact());
1039
1040        // Insert and update
1041        store.upsert(&make_vector("v1", vec![1.0, 2.0, 3.0])); // v1
1042        store.upsert(&make_vector("v1", vec![1.0, 5.0, 3.0])); // v2
1043        store.upsert(&make_vector("v1", vec![1.0, 5.0, 10.0])); // v3
1044
1045        // Get current
1046        let current = store.get(&"v1".to_string()).unwrap();
1047        assert_eq!(current.values, vec![1.0, 5.0, 10.0]);
1048
1049        // Get at version 2
1050        let v2 = store.get_at_version(&"v1".to_string(), 2).unwrap();
1051        assert_eq!(v2.values, vec![1.0, 5.0, 3.0]);
1052
1053        // Get at version 1
1054        let v1 = store.get_at_version(&"v1".to_string(), 1).unwrap();
1055        assert_eq!(v1.values, vec![1.0, 2.0, 3.0]);
1056    }
1057
1058    #[test]
1059    fn test_namespace_delta_store_compact() {
1060        let store = NamespaceDeltaStore::new(DeltaConfig::default().without_auto_compact());
1061
1062        // Create delta chain
1063        store.upsert(&make_vector("v1", vec![1.0, 2.0, 3.0]));
1064        store.upsert(&make_vector("v1", vec![1.0, 5.0, 3.0]));
1065        store.upsert(&make_vector("v1", vec![1.0, 5.0, 10.0]));
1066
1067        // Verify chain exists
1068        let info_before = store.get_version_info(&"v1".to_string()).unwrap();
1069        assert_eq!(info_before.delta_count, 2);
1070
1071        // Compact
1072        store.compact(&"v1".to_string());
1073
1074        // Verify chain is gone
1075        let info_after = store.get_version_info(&"v1".to_string()).unwrap();
1076        assert_eq!(info_after.delta_count, 0);
1077
1078        // Values should still be correct
1079        let v = store.get(&"v1".to_string()).unwrap();
1080        assert_eq!(v.values, vec![1.0, 5.0, 10.0]);
1081    }
1082
1083    #[test]
1084    fn test_namespace_delta_store_auto_compact() {
1085        let config = DeltaConfig::default()
1086            .with_max_chain(10)
1087            .with_sparse_threshold(0.9); // Ensure sparse encoding
1088
1089        // Low compact threshold for testing
1090        let store = NamespaceDeltaStore::new(DeltaConfig {
1091            compact_threshold: 3,
1092            auto_compact: true,
1093            ..config
1094        });
1095
1096        // Create deltas
1097        store.upsert(&make_vector("v1", vec![1.0, 2.0, 3.0]));
1098        store.upsert(&make_vector("v1", vec![1.1, 2.0, 3.0])); // delta 1
1099        store.upsert(&make_vector("v1", vec![1.2, 2.0, 3.0])); // delta 2
1100        let result = store.upsert(&make_vector("v1", vec![1.3, 2.0, 3.0])); // delta 3 -> compact
1101
1102        // Should have compacted
1103        assert!(result.compacted);
1104
1105        let info = store.get_version_info(&"v1".to_string()).unwrap();
1106        assert_eq!(info.delta_count, 0);
1107    }
1108
1109    #[test]
1110    fn test_namespace_delta_store_delete() {
1111        let store = NamespaceDeltaStore::new(DeltaConfig::default());
1112
1113        store.upsert(&make_vector("v1", vec![1.0, 2.0, 3.0]));
1114        assert!(store.delete(&"v1".to_string()));
1115        assert!(store.get(&"v1".to_string()).is_none());
1116        assert!(!store.delete(&"v1".to_string())); // Already deleted
1117    }
1118
1119    #[test]
1120    fn test_namespace_delta_store_stats() {
1121        let store = NamespaceDeltaStore::new(DeltaConfig::default().without_auto_compact());
1122
1123        store.upsert(&make_vector("v1", vec![1.0, 2.0, 3.0]));
1124        store.upsert(&make_vector("v1", vec![1.0, 5.0, 3.0]));
1125        store.upsert(&make_vector("v2", vec![4.0, 5.0, 6.0]));
1126
1127        let stats = store.stats();
1128        assert_eq!(stats.total_vectors, 2);
1129        assert_eq!(stats.total_deltas, 1);
1130        assert!(stats.memory_bytes > 0);
1131    }
1132
1133    #[test]
1134    fn test_delta_store_manager() {
1135        let manager = DeltaStoreManager::with_defaults();
1136
1137        // Upsert
1138        let results = manager.upsert(
1139            &"ns1".to_string(),
1140            &[make_vector("v1", vec![1.0, 2.0, 3.0])],
1141        );
1142        assert_eq!(results.len(), 1);
1143        assert!(results[0].is_new);
1144
1145        // Get
1146        let v = manager.get(&"ns1".to_string(), &"v1".to_string()).unwrap();
1147        assert_eq!(v.values, vec![1.0, 2.0, 3.0]);
1148
1149        // List namespaces
1150        let namespaces = manager.list_namespaces();
1151        assert!(namespaces.contains(&"ns1".to_string()));
1152
1153        // Delete namespace
1154        assert!(manager.delete_namespace(&"ns1".to_string()));
1155        assert!(manager.get(&"ns1".to_string(), &"v1".to_string()).is_none());
1156    }
1157
1158    #[test]
1159    fn test_encode_rle_efficiency() {
1160        // Test that RLE is used for contiguous changes
1161        let changes: Vec<(usize, f32)> = (0..10).map(|i| (i, i as f32)).collect();
1162
1163        let encoding = encode_rle(&changes);
1164
1165        if let DeltaEncoding::Rle(runs) = encoding {
1166            // Should be one run
1167            assert_eq!(runs.len(), 1);
1168            assert_eq!(runs[0].values.len(), 10);
1169        } else {
1170            // Sparse might be used if RLE isn't more efficient
1171            // This is acceptable
1172        }
1173    }
1174
1175    #[test]
1176    fn test_dimension_change() {
1177        let store = NamespaceDeltaStore::new(DeltaConfig::default());
1178
1179        // Insert with 3 dimensions
1180        store.upsert(&make_vector("v1", vec![1.0, 2.0, 3.0]));
1181
1182        // Update with 5 dimensions
1183        store.upsert(&make_vector("v1", vec![1.0, 2.0, 3.0, 4.0, 5.0]));
1184
1185        let v = store.get(&"v1".to_string()).unwrap();
1186        assert_eq!(v.values, vec![1.0, 2.0, 3.0, 4.0, 5.0]);
1187    }
1188
1189    #[test]
1190    fn test_epsilon_change_detection() {
1191        let config = DeltaConfig::default().with_epsilon(0.1);
1192        let store = NamespaceDeltaStore::new(config);
1193
1194        store.upsert(&make_vector("v1", vec![1.0, 2.0, 3.0]));
1195
1196        // Small change below epsilon
1197        store.upsert(&make_vector("v1", vec![1.05, 2.0, 3.0]));
1198
1199        // Should not create a delta with changes below epsilon
1200        let info = store.get_version_info(&"v1".to_string()).unwrap();
1201
1202        // Value should still be tracked even if no delta
1203        // The exact behavior depends on implementation details
1204        assert!(info.current_version >= 1);
1205    }
1206}