Skip to main content

tinyquant_core/corpus/
aggregate.rs

1//! [`Corpus`] aggregate root — manages a collection of compressed embedding vectors.
2//!
3//! The `Corpus` is the primary domain object in `TinyQuant`'s corpus layer.  It
4//! owns an insertion-ordered map of [`VectorEntry`] values, enforces the active
5//! [`CompressionPolicy`], buffers domain events, and exposes a clean API for
6//! insertion, decompression, and event draining.
7//!
8//! ## Thread safety
9//!
10//! `Corpus` is not `Sync`.  Callers that need concurrent access must wrap it in
11//! an external lock.
12//!
13//! ## Passthrough and Fp16 policies
14//!
15//! For `Passthrough`, each f32 dimension is stored as 4 little-endian bytes
16//! in the `indices` field of a [`CompressedVector`] with `bit_width = 8`.
17//! For `Fp16`, each f32 is cast to f16 and stored as 2 little-endian bytes
18//! (2 dimensions packed per byte pair, with `bit_width = 8` to keep the
19//! `CompressedVector` invariant satisfied).
20
21use alloc::{
22    boxed::Box,
23    collections::BTreeMap,
24    string::{String, ToString},
25    sync::Arc,
26    vec,
27    vec::Vec,
28};
29
30use half::f16;
31
32use crate::codec::{Codebook, Codec, CodecConfig, CompressedVector};
33use crate::corpus::compression_policy::CompressionPolicy;
34use crate::corpus::entry_meta_value::EntryMetaValue;
35use crate::corpus::events::CorpusEvent;
36use crate::corpus::vector_entry::VectorEntry;
37use crate::errors::CorpusError;
38use crate::types::{CorpusId, Timestamp, VectorId};
39
40// Use VectorIdMap for both std and no_std builds in Phase 18.
41// IndexMap integration is deferred to Phase 21+ per the plan recommendation.
42use crate::corpus::vector_id_map::VectorIdMap;
43
44/// Accounting summary returned by [`Corpus::insert_batch`].
45#[derive(Clone, Debug, PartialEq, Eq)]
46pub struct BatchReport {
47    /// Number of vectors successfully inserted.
48    ///
49    /// In strict (all-or-nothing) mode this is either `0` (on `Err`) or
50    /// `vectors.len()` (on `Ok`).
51    pub inserted: usize,
52    /// Number of vectors skipped due to soft failure.
53    ///
54    /// Always `0` in strict mode; non-zero only in lenient mode (Phase 22+).
55    pub skipped: usize,
56    /// First error encountered, with its zero-based batch index.
57    ///
58    /// `None` on strict-mode success; `Some` in lenient mode when any vector
59    /// was rejected.
60    pub first_error: Option<(usize, CorpusError)>,
61}
62
63/// Aggregate root that manages a collection of compressed embedding vectors.
64///
65/// # Domain contracts
66///
67/// - `CompressionPolicy` is immutable after the first successful `insert`.
68/// - Domain events are buffered and drained atomically via [`drain_events`](Self::drain_events).
69/// - `insert_batch` is all-or-nothing: on error the corpus is not modified.
70///
71/// # Usage
72///
73/// ```ignore
74/// let mut corpus = Corpus::new(
75///     Arc::from("my-corpus"),
76///     config,
77///     codebook,
78///     CompressionPolicy::Compress,
79///     BTreeMap::new(),
80/// );
81/// corpus.insert(Arc::from("v1"), &vector, None)?;
82/// let events = corpus.drain_events();
83/// ```
84#[allow(clippy::struct_field_names)]
85pub struct Corpus {
86    corpus_id: CorpusId,
87    config: CodecConfig,
88    codebook: Codebook,
89    compression_policy: CompressionPolicy,
90    vectors: VectorIdMap<VectorEntry>,
91    pending_events: Vec<CorpusEvent>,
92    metadata: BTreeMap<String, EntryMetaValue>,
93}
94
95impl Corpus {
96    /// Construct a new `Corpus` and emit a [`CorpusEvent::Created`] event.
97    ///
98    /// `timestamp` must be supplied by the caller (nanoseconds since Unix
99    /// epoch) to keep this crate `no_std` (no `std::time`).
100    #[must_use]
101    pub fn new(
102        corpus_id: CorpusId,
103        config: CodecConfig,
104        codebook: Codebook,
105        compression_policy: CompressionPolicy,
106        metadata: BTreeMap<String, EntryMetaValue>,
107    ) -> Self {
108        Self::new_at(corpus_id, config, codebook, compression_policy, metadata, 0)
109    }
110
111    /// Construct a new `Corpus` with an explicit creation timestamp.
112    ///
113    /// Use this variant in tests or callers that supply their own time source.
114    #[must_use]
115    pub fn new_at(
116        corpus_id: CorpusId,
117        config: CodecConfig,
118        codebook: Codebook,
119        compression_policy: CompressionPolicy,
120        metadata: BTreeMap<String, EntryMetaValue>,
121        timestamp: Timestamp,
122    ) -> Self {
123        let event = CorpusEvent::Created {
124            corpus_id: Arc::clone(&corpus_id),
125            codec_config: config.clone(),
126            compression_policy,
127            timestamp,
128        };
129        Self {
130            corpus_id,
131            config,
132            codebook,
133            compression_policy,
134            vectors: VectorIdMap::new(),
135            pending_events: vec![event],
136            metadata,
137        }
138    }
139
140    // ── Accessors ─────────────────────────────────────────────────────────────
141
142    /// The stable string identifier for this corpus.
143    #[must_use]
144    pub const fn corpus_id(&self) -> &CorpusId {
145        &self.corpus_id
146    }
147
148    /// The codec configuration used at construction time.
149    #[must_use]
150    pub const fn config(&self) -> &CodecConfig {
151        &self.config
152    }
153
154    /// The active compression policy.
155    #[must_use]
156    pub const fn compression_policy(&self) -> CompressionPolicy {
157        self.compression_policy
158    }
159
160    /// Number of vectors currently stored in the corpus.
161    #[must_use]
162    pub fn vector_count(&self) -> usize {
163        self.vectors.len()
164    }
165
166    /// Returns `true` when the corpus contains no vectors.
167    #[must_use]
168    pub fn is_empty(&self) -> bool {
169        self.vectors.is_empty()
170    }
171
172    /// Returns `true` if a vector with the given id exists.
173    #[must_use]
174    pub fn contains(&self, id: &VectorId) -> bool {
175        self.vectors.contains_key(id.as_ref())
176    }
177
178    /// Iterate over `(VectorId, VectorEntry)` pairs in insertion order.
179    pub fn iter(&self) -> impl Iterator<Item = (&VectorId, &VectorEntry)> {
180        self.vectors.iter()
181    }
182
183    /// Read-only access to the corpus-level metadata.
184    #[must_use]
185    pub const fn metadata(&self) -> &BTreeMap<String, EntryMetaValue> {
186        &self.metadata
187    }
188
189    // ── Mutation ──────────────────────────────────────────────────────────────
190
191    /// Drain and return all pending domain events.
192    ///
193    /// Uses `core::mem::take` — O(1), no allocation.  After this call
194    /// `pending_events` is empty.
195    pub fn drain_events(&mut self) -> Vec<CorpusEvent> {
196        core::mem::take(&mut self.pending_events)
197    }
198
199    /// Insert a single vector into the corpus.
200    ///
201    /// # Errors
202    ///
203    /// - [`CorpusError::DimensionMismatch`] if `vector.len() != config.dimension()`.
204    /// - [`CorpusError::DuplicateVectorId`] if `id` is already present.
205    /// - [`CorpusError::Codec`] if the codec pipeline fails (only for `Compress` policy).
206    pub fn insert(
207        &mut self,
208        id: VectorId,
209        vector: &[f32],
210        entry_metadata: Option<EntryMetaValue>,
211        timestamp: Timestamp,
212    ) -> Result<(), CorpusError> {
213        self.validate_dimension(vector)?;
214        if self.vectors.contains_key(id.as_ref()) {
215            return Err(CorpusError::DuplicateVectorId { id });
216        }
217
218        let compressed = self.compress_vector(vector)?;
219        let meta = Self::unwrap_meta(entry_metadata);
220        #[allow(clippy::cast_possible_truncation)]
221        let declared_dim = vector.len() as u32;
222        let entry = VectorEntry::new(Arc::clone(&id), compressed, declared_dim, timestamp, meta);
223
224        self.vectors.insert(Arc::clone(&id), entry);
225
226        self.pending_events.push(CorpusEvent::VectorsInserted {
227            corpus_id: Arc::clone(&self.corpus_id),
228            vector_ids: Arc::from([id]),
229            count: 1,
230            timestamp,
231        });
232
233        Ok(())
234    }
235
236    /// Insert a batch of vectors atomically (all-or-nothing).
237    ///
238    /// On failure the corpus is unchanged and no events are emitted.
239    /// On success, exactly one [`CorpusEvent::VectorsInserted`] is emitted
240    /// covering all inserted ids.
241    ///
242    /// # Errors
243    ///
244    /// - [`CorpusError::BatchAtomicityFailure`] if any vector fails validation
245    ///   or compression. The `index` field identifies the first failing vector.
246    pub fn insert_batch(
247        &mut self,
248        vectors: &[(VectorId, &[f32], Option<EntryMetaValue>)],
249        timestamp: Timestamp,
250    ) -> Result<BatchReport, CorpusError> {
251        // Stage 1: validate + compress all vectors without touching the map.
252        let mut staged: Vec<(VectorId, VectorEntry)> = Vec::with_capacity(vectors.len());
253
254        for (index, (id, vector, meta)) in vectors.iter().enumerate() {
255            // Validate dimension.
256            if let Err(e) = self.validate_dimension(vector) {
257                return Err(CorpusError::BatchAtomicityFailure {
258                    index,
259                    source: Box::new(e),
260                });
261            }
262            // Check for duplicates already in the corpus.
263            if self.vectors.contains_key(id.as_ref()) {
264                return Err(CorpusError::BatchAtomicityFailure {
265                    index,
266                    source: Box::new(CorpusError::DuplicateVectorId { id: Arc::clone(id) }),
267                });
268            }
269            // Check for duplicates within this batch itself.
270            let already_staged = staged
271                .iter()
272                .any(|(staged_id, _)| staged_id.as_ref() == id.as_ref());
273            if already_staged {
274                return Err(CorpusError::BatchAtomicityFailure {
275                    index,
276                    source: Box::new(CorpusError::DuplicateVectorId { id: Arc::clone(id) }),
277                });
278            }
279
280            // Compress.
281            let compressed = match self.compress_vector(vector) {
282                Ok(cv) => cv,
283                Err(e) => {
284                    return Err(CorpusError::BatchAtomicityFailure {
285                        index,
286                        source: Box::new(CorpusError::Codec(e)),
287                    });
288                }
289            };
290
291            let entry_meta = Self::unwrap_meta(meta.clone());
292            #[allow(clippy::cast_possible_truncation)]
293            let declared_dim = vector.len() as u32;
294            staged.push((
295                Arc::clone(id),
296                VectorEntry::new(
297                    Arc::clone(id),
298                    compressed,
299                    declared_dim,
300                    timestamp,
301                    entry_meta,
302                ),
303            ));
304        }
305
306        // Stage 2: commit — all validations passed, flip into the map.
307        let mut ids: Vec<VectorId> = Vec::with_capacity(staged.len());
308        let inserted_count = staged.len();
309        for (id, entry) in staged {
310            ids.push(Arc::clone(&id));
311            self.vectors.insert(id, entry);
312        }
313
314        if !ids.is_empty() {
315            #[allow(clippy::cast_possible_truncation)]
316            let count = ids.len() as u32;
317            self.pending_events.push(CorpusEvent::VectorsInserted {
318                corpus_id: Arc::clone(&self.corpus_id),
319                vector_ids: Arc::from(ids.into_boxed_slice()),
320                count,
321                timestamp,
322            });
323        }
324
325        Ok(BatchReport {
326            inserted: inserted_count,
327            skipped: 0,
328            first_error: None,
329        })
330    }
331
332    /// Decompress a single vector by id.
333    ///
334    /// Does **not** emit a domain event (Python parity).
335    ///
336    /// # Errors
337    ///
338    /// - [`CorpusError::UnknownVectorId`] if `id` is not present.
339    /// - [`CorpusError::Codec`] if the codec pipeline fails.
340    pub fn decompress(&self, id: &VectorId) -> Result<Vec<f32>, CorpusError> {
341        let entry = self
342            .vectors
343            .get(id.as_ref())
344            .ok_or_else(|| CorpusError::UnknownVectorId { id: Arc::clone(id) })?;
345
346        self.decompress_entry(entry)
347    }
348
349    /// Decompress all vectors and return them with an explicit timestamp.
350    ///
351    /// Emits exactly one [`CorpusEvent::Decompressed`] on success.
352    /// No event is emitted when the corpus is empty — matches Python parity.
353    ///
354    /// # Errors
355    ///
356    /// Propagates errors from the codec pipeline for any individual vector.
357    pub fn decompress_all_at(
358        &mut self,
359        timestamp: Timestamp,
360    ) -> Result<BTreeMap<VectorId, Vec<f32>>, CorpusError> {
361        // No event emitted for empty corpus — matches Python parity.
362        if self.vectors.is_empty() {
363            return Ok(BTreeMap::new());
364        }
365
366        let mut out = BTreeMap::new();
367        for (id, entry) in self.vectors.iter() {
368            let decompressed = self.decompress_entry(entry)?;
369            out.insert(Arc::clone(id), decompressed);
370        }
371
372        #[allow(clippy::cast_possible_truncation)]
373        let vector_count = out.len() as u32;
374
375        self.pending_events.push(CorpusEvent::Decompressed {
376            corpus_id: Arc::clone(&self.corpus_id),
377            vector_count,
378            timestamp,
379        });
380
381        Ok(out)
382    }
383
384    /// Remove a vector by id, returning the entry if present.
385    ///
386    /// Silent: no domain event is emitted (Python parity — `del corpus[id]`).
387    pub fn remove(&mut self, id: &VectorId) -> Option<VectorEntry> {
388        self.vectors.remove(id.as_ref())
389    }
390
391    // ── Private helpers ───────────────────────────────────────────────────────
392
393    /// Validate that `vector.len()` matches the configured dimension.
394    #[allow(clippy::missing_const_for_fn)] // calls non-const CodecConfig::dimension()
395    fn validate_dimension(&self, vector: &[f32]) -> Result<(), CorpusError> {
396        let expected = self.config.dimension();
397        #[allow(clippy::cast_possible_truncation)]
398        let got = vector.len() as u32;
399        if got != expected {
400            return Err(CorpusError::DimensionMismatch { expected, got });
401        }
402        Ok(())
403    }
404
405    /// Compress a vector slice according to the active policy.
406    fn compress_vector(
407        &self,
408        vector: &[f32],
409    ) -> Result<CompressedVector, crate::errors::CodecError> {
410        match self.compression_policy {
411            CompressionPolicy::Compress => {
412                Codec::new().compress(vector, &self.config, &self.codebook)
413            }
414            CompressionPolicy::Passthrough => {
415                // Store raw f32 bytes: 4 bytes per dimension in the `indices` field.
416                // Each f32 is stored as 4 sequential index bytes (little-endian).
417                let mut indices: Vec<u8> = Vec::with_capacity(vector.len() * 4);
418                for &v in vector {
419                    indices.extend_from_slice(&v.to_le_bytes());
420                }
421                let dim = vector.len();
422                #[allow(clippy::cast_possible_truncation)]
423                let dim_u32 = dim as u32;
424                // We store 4 bytes per dimension in `indices` but the CompressedVector
425                // invariant expects `indices.len() == dimension`. We set dimension to
426                // byte_count (= dim * 4) to satisfy the invariant.
427                let byte_len = indices.len();
428                #[allow(clippy::cast_possible_truncation)]
429                let byte_len_u32 = byte_len as u32;
430                // Store byte_len_u32 as the CompressedVector dimension.
431                // Decompression reads back 4 bytes per original f32 dimension.
432                let _ = dim_u32; // original dim tracked via byte_len / 4 at decompress time
433                CompressedVector::new(
434                    indices.into_boxed_slice(),
435                    None,
436                    self.config.config_hash().clone(),
437                    byte_len_u32,
438                    8,
439                )
440            }
441            CompressionPolicy::Fp16 => {
442                // Store f16 LE: 2 bytes per dimension.
443                let mut indices: Vec<u8> = Vec::with_capacity(vector.len() * 2);
444                for &v in vector {
445                    let h = f16::from_f32(v);
446                    indices.extend_from_slice(&h.to_le_bytes());
447                }
448                let byte_len = indices.len();
449                #[allow(clippy::cast_possible_truncation)]
450                let byte_len_u32 = byte_len as u32;
451                CompressedVector::new(
452                    indices.into_boxed_slice(),
453                    None,
454                    self.config.config_hash().clone(),
455                    byte_len_u32,
456                    8,
457                )
458            }
459        }
460    }
461
462    /// Decompress a [`VectorEntry`] according to the active policy.
463    fn decompress_entry(&self, entry: &VectorEntry) -> Result<Vec<f32>, CorpusError> {
464        match self.compression_policy {
465            CompressionPolicy::Compress => Codec::new()
466                .decompress(entry.compressed(), &self.config, &self.codebook)
467                .map_err(CorpusError::Codec),
468            CompressionPolicy::Passthrough => {
469                // Reconstruct f32 from 4-byte chunks in `indices`.
470                let bytes = entry.compressed().indices();
471                if bytes.len() % 4 != 0 {
472                    return Err(CorpusError::Codec(
473                        crate::errors::CodecError::LengthMismatch {
474                            left: bytes.len(),
475                            right: (bytes.len() / 4) * 4,
476                        },
477                    ));
478                }
479                let floats: Vec<f32> = bytes
480                    .chunks_exact(4)
481                    .map(|chunk| {
482                        // chunks_exact guarantees exactly 4 bytes; array indexing is safe.
483                        #[allow(clippy::indexing_slicing)]
484                        f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]])
485                    })
486                    .collect();
487                Ok(floats)
488            }
489            CompressionPolicy::Fp16 => {
490                // Reconstruct f32 from 2-byte f16 LE chunks.
491                let bytes = entry.compressed().indices();
492                if bytes.len() % 2 != 0 {
493                    return Err(CorpusError::Codec(
494                        crate::errors::CodecError::LengthMismatch {
495                            left: bytes.len(),
496                            right: (bytes.len() / 2) * 2,
497                        },
498                    ));
499                }
500                let floats: Vec<f32> = bytes
501                    .chunks_exact(2)
502                    .map(|chunk| {
503                        #[allow(clippy::indexing_slicing)]
504                        let h = f16::from_le_bytes([chunk[0], chunk[1]]);
505                        f32::from(h)
506                    })
507                    .collect();
508                Ok(floats)
509            }
510        }
511    }
512
513    /// Convert an `Option<EntryMetaValue>` to a metadata map.
514    fn unwrap_meta(meta: Option<EntryMetaValue>) -> BTreeMap<String, EntryMetaValue> {
515        match meta {
516            None => BTreeMap::new(),
517            Some(EntryMetaValue::Object(arc_map)) => {
518                // Flatten an Object into the map — keys become string keys.
519                arc_map
520                    .iter()
521                    .map(|(k, v)| (k.as_ref().to_string(), v.clone()))
522                    .collect()
523            }
524            Some(other) => {
525                // Non-object metadata stored under a sentinel key.
526                let mut m = BTreeMap::new();
527                m.insert("_value".to_string(), other);
528                m
529            }
530        }
531    }
532}