Skip to main content

tinyquant_core/corpus/
aggregate.rs

1//! [`Corpus`] aggregate root — manages a collection of compressed embedding vectors.
2//!
3//! The `Corpus` is the primary domain object in `TinyQuant`'s corpus layer.  It
4//! owns an insertion-ordered map of [`VectorEntry`] values, enforces the active
5//! [`CompressionPolicy`], buffers domain events, and exposes a clean API for
6//! insertion, decompression, and event draining.
7//!
8//! ## Thread safety
9//!
10//! `Corpus` is not `Sync`.  Callers that need concurrent access must wrap it in
11//! an external lock.
12//!
13//! ## Passthrough and Fp16 policies
14//!
15//! For `Passthrough`, each f32 dimension is stored as 4 little-endian bytes
16//! in the `indices` field of a [`CompressedVector`] with `bit_width = 8`.
17//! For `Fp16`, each f32 is cast to f16 and stored as 2 little-endian bytes
18//! (2 dimensions packed per byte pair, with `bit_width = 8` to keep the
19//! `CompressedVector` invariant satisfied).
20
21use alloc::{
22    boxed::Box,
23    collections::BTreeMap,
24    string::{String, ToString},
25    sync::Arc,
26    vec,
27    vec::Vec,
28};
29
30use half::f16;
31
32use crate::codec::{Codebook, Codec, CodecConfig, CompressedVector};
33use crate::corpus::compression_policy::CompressionPolicy;
34use crate::corpus::entry_meta_value::EntryMetaValue;
35use crate::corpus::events::CorpusEvent;
36use crate::corpus::vector_entry::VectorEntry;
37use crate::errors::CorpusError;
38use crate::types::{CorpusId, Timestamp, VectorId};
39
40// Use VectorIdMap for both std and no_std builds in Phase 18.
41// IndexMap integration is deferred to Phase 21+ per the plan recommendation.
42use crate::corpus::vector_id_map::VectorIdMap;
43
44/// Accounting summary returned by [`Corpus::insert_batch`].
45#[derive(Clone, Debug, PartialEq, Eq)]
46pub struct BatchReport {
47    /// Number of vectors successfully inserted.
48    ///
49    /// In strict (all-or-nothing) mode this is either `0` (on `Err`) or
50    /// `vectors.len()` (on `Ok`).
51    pub inserted: usize,
52    /// Number of vectors skipped due to soft failure.
53    ///
54    /// Always `0` in strict mode; non-zero only in lenient mode (Phase 22+).
55    pub skipped: usize,
56    /// First error encountered, with its zero-based batch index.
57    ///
58    /// `None` on strict-mode success; `Some` in lenient mode when any vector
59    /// was rejected.
60    pub first_error: Option<(usize, CorpusError)>,
61}
62
63/// Aggregate root that manages a collection of compressed embedding vectors.
64///
65/// # Domain contracts
66///
67/// - `CompressionPolicy` is immutable after the first successful `insert`.
68/// - Domain events are buffered and drained atomically via [`drain_events`](Self::drain_events).
69/// - `insert_batch` is all-or-nothing: on error the corpus is not modified.
70///
71/// # Usage
72///
73/// ```ignore
74/// let mut corpus = Corpus::new(
75///     Arc::from("my-corpus"),
76///     config,
77///     codebook,
78///     CompressionPolicy::Compress,
79///     BTreeMap::new(),
80/// );
81/// corpus.insert(Arc::from("v1"), &vector, None)?;
82/// let events = corpus.drain_events();
83/// ```
84pub struct Corpus {
85    corpus_id: CorpusId,
86    config: CodecConfig,
87    codebook: Codebook,
88    compression_policy: CompressionPolicy,
89    vectors: VectorIdMap<VectorEntry>,
90    pending_events: Vec<CorpusEvent>,
91    metadata: BTreeMap<String, EntryMetaValue>,
92}
93
94impl Corpus {
95    /// Construct a new `Corpus` and emit a [`CorpusEvent::Created`] event.
96    ///
97    /// `timestamp` must be supplied by the caller (nanoseconds since Unix
98    /// epoch) to keep this crate `no_std` (no `std::time`).
99    #[must_use]
100    pub fn new(
101        corpus_id: CorpusId,
102        config: CodecConfig,
103        codebook: Codebook,
104        compression_policy: CompressionPolicy,
105        metadata: BTreeMap<String, EntryMetaValue>,
106    ) -> Self {
107        Self::new_at(corpus_id, config, codebook, compression_policy, metadata, 0)
108    }
109
110    /// Construct a new `Corpus` with an explicit creation timestamp.
111    ///
112    /// Use this variant in tests or callers that supply their own time source.
113    #[must_use]
114    pub fn new_at(
115        corpus_id: CorpusId,
116        config: CodecConfig,
117        codebook: Codebook,
118        compression_policy: CompressionPolicy,
119        metadata: BTreeMap<String, EntryMetaValue>,
120        timestamp: Timestamp,
121    ) -> Self {
122        let event = CorpusEvent::Created {
123            corpus_id: Arc::clone(&corpus_id),
124            codec_config: config.clone(),
125            compression_policy,
126            timestamp,
127        };
128        Self {
129            corpus_id,
130            config,
131            codebook,
132            compression_policy,
133            vectors: VectorIdMap::new(),
134            pending_events: vec![event],
135            metadata,
136        }
137    }
138
139    // ── Accessors ─────────────────────────────────────────────────────────────
140
141    /// The stable string identifier for this corpus.
142    #[must_use]
143    pub const fn corpus_id(&self) -> &CorpusId {
144        &self.corpus_id
145    }
146
147    /// The codec configuration used at construction time.
148    #[must_use]
149    pub const fn config(&self) -> &CodecConfig {
150        &self.config
151    }
152
153    /// The active compression policy.
154    #[must_use]
155    pub const fn compression_policy(&self) -> CompressionPolicy {
156        self.compression_policy
157    }
158
159    /// Number of vectors currently stored in the corpus.
160    #[must_use]
161    pub fn vector_count(&self) -> usize {
162        self.vectors.len()
163    }
164
165    /// Returns `true` when the corpus contains no vectors.
166    #[must_use]
167    pub fn is_empty(&self) -> bool {
168        self.vectors.is_empty()
169    }
170
171    /// Returns `true` if a vector with the given id exists.
172    #[must_use]
173    pub fn contains(&self, id: &VectorId) -> bool {
174        self.vectors.contains_key(id.as_ref())
175    }
176
177    /// Iterate over `(VectorId, VectorEntry)` pairs in insertion order.
178    pub fn iter(&self) -> impl Iterator<Item = (&VectorId, &VectorEntry)> {
179        self.vectors.iter()
180    }
181
182    /// Read-only access to the corpus-level metadata.
183    #[must_use]
184    pub const fn metadata(&self) -> &BTreeMap<String, EntryMetaValue> {
185        &self.metadata
186    }
187
188    // ── Mutation ──────────────────────────────────────────────────────────────
189
190    /// Drain and return all pending domain events.
191    ///
192    /// Uses `core::mem::take` — O(1), no allocation.  After this call
193    /// `pending_events` is empty.
194    pub fn drain_events(&mut self) -> Vec<CorpusEvent> {
195        core::mem::take(&mut self.pending_events)
196    }
197
198    /// Insert a single vector into the corpus.
199    ///
200    /// # Errors
201    ///
202    /// - [`CorpusError::DimensionMismatch`] if `vector.len() != config.dimension()`.
203    /// - [`CorpusError::DuplicateVectorId`] if `id` is already present.
204    /// - [`CorpusError::Codec`] if the codec pipeline fails (only for `Compress` policy).
205    pub fn insert(
206        &mut self,
207        id: VectorId,
208        vector: &[f32],
209        entry_metadata: Option<EntryMetaValue>,
210        timestamp: Timestamp,
211    ) -> Result<(), CorpusError> {
212        self.validate_dimension(vector)?;
213        if self.vectors.contains_key(id.as_ref()) {
214            return Err(CorpusError::DuplicateVectorId { id });
215        }
216
217        let compressed = self.compress_vector(vector)?;
218        let meta = Self::unwrap_meta(entry_metadata);
219        #[allow(clippy::cast_possible_truncation)]
220        let declared_dim = vector.len() as u32;
221        let entry = VectorEntry::new(Arc::clone(&id), compressed, declared_dim, timestamp, meta);
222
223        self.vectors.insert(Arc::clone(&id), entry);
224
225        self.pending_events.push(CorpusEvent::VectorsInserted {
226            corpus_id: Arc::clone(&self.corpus_id),
227            vector_ids: Arc::from([id]),
228            count: 1,
229            timestamp,
230        });
231
232        Ok(())
233    }
234
235    /// Insert a batch of vectors atomically (all-or-nothing).
236    ///
237    /// On failure the corpus is unchanged and no events are emitted.
238    /// On success, exactly one [`CorpusEvent::VectorsInserted`] is emitted
239    /// covering all inserted ids.
240    ///
241    /// # Errors
242    ///
243    /// - [`CorpusError::BatchAtomicityFailure`] if any vector fails validation
244    ///   or compression. The `index` field identifies the first failing vector.
245    pub fn insert_batch(
246        &mut self,
247        vectors: &[(VectorId, &[f32], Option<EntryMetaValue>)],
248        timestamp: Timestamp,
249    ) -> Result<BatchReport, CorpusError> {
250        // Stage 1: validate + compress all vectors without touching the map.
251        let mut staged: Vec<(VectorId, VectorEntry)> = Vec::with_capacity(vectors.len());
252
253        for (index, (id, vector, meta)) in vectors.iter().enumerate() {
254            // Validate dimension.
255            if let Err(e) = self.validate_dimension(vector) {
256                return Err(CorpusError::BatchAtomicityFailure {
257                    index,
258                    source: Box::new(e),
259                });
260            }
261            // Check for duplicates already in the corpus.
262            if self.vectors.contains_key(id.as_ref()) {
263                return Err(CorpusError::BatchAtomicityFailure {
264                    index,
265                    source: Box::new(CorpusError::DuplicateVectorId { id: Arc::clone(id) }),
266                });
267            }
268            // Check for duplicates within this batch itself.
269            let already_staged = staged
270                .iter()
271                .any(|(staged_id, _)| staged_id.as_ref() == id.as_ref());
272            if already_staged {
273                return Err(CorpusError::BatchAtomicityFailure {
274                    index,
275                    source: Box::new(CorpusError::DuplicateVectorId { id: Arc::clone(id) }),
276                });
277            }
278
279            // Compress.
280            let compressed = match self.compress_vector(vector) {
281                Ok(cv) => cv,
282                Err(e) => {
283                    return Err(CorpusError::BatchAtomicityFailure {
284                        index,
285                        source: Box::new(CorpusError::Codec(e)),
286                    });
287                }
288            };
289
290            let entry_meta = Self::unwrap_meta(meta.clone());
291            #[allow(clippy::cast_possible_truncation)]
292            let declared_dim = vector.len() as u32;
293            staged.push((
294                Arc::clone(id),
295                VectorEntry::new(
296                    Arc::clone(id),
297                    compressed,
298                    declared_dim,
299                    timestamp,
300                    entry_meta,
301                ),
302            ));
303        }
304
305        // Stage 2: commit — all validations passed, flip into the map.
306        let mut ids: Vec<VectorId> = Vec::with_capacity(staged.len());
307        let inserted_count = staged.len();
308        for (id, entry) in staged {
309            ids.push(Arc::clone(&id));
310            self.vectors.insert(id, entry);
311        }
312
313        if !ids.is_empty() {
314            #[allow(clippy::cast_possible_truncation)]
315            let count = ids.len() as u32;
316            self.pending_events.push(CorpusEvent::VectorsInserted {
317                corpus_id: Arc::clone(&self.corpus_id),
318                vector_ids: Arc::from(ids.into_boxed_slice()),
319                count,
320                timestamp,
321            });
322        }
323
324        Ok(BatchReport {
325            inserted: inserted_count,
326            skipped: 0,
327            first_error: None,
328        })
329    }
330
331    /// Decompress a single vector by id.
332    ///
333    /// Does **not** emit a domain event (Python parity).
334    ///
335    /// # Errors
336    ///
337    /// - [`CorpusError::UnknownVectorId`] if `id` is not present.
338    /// - [`CorpusError::Codec`] if the codec pipeline fails.
339    pub fn decompress(&self, id: &VectorId) -> Result<Vec<f32>, CorpusError> {
340        let entry = self
341            .vectors
342            .get(id.as_ref())
343            .ok_or_else(|| CorpusError::UnknownVectorId { id: Arc::clone(id) })?;
344
345        self.decompress_entry(entry)
346    }
347
348    /// Decompress all vectors and return them with an explicit timestamp.
349    ///
350    /// Emits exactly one [`CorpusEvent::Decompressed`] on success.
351    /// No event is emitted when the corpus is empty — matches Python parity.
352    ///
353    /// # Errors
354    ///
355    /// Propagates errors from the codec pipeline for any individual vector.
356    pub fn decompress_all_at(
357        &mut self,
358        timestamp: Timestamp,
359    ) -> Result<BTreeMap<VectorId, Vec<f32>>, CorpusError> {
360        // No event emitted for empty corpus — matches Python parity.
361        if self.vectors.is_empty() {
362            return Ok(BTreeMap::new());
363        }
364
365        let mut out = BTreeMap::new();
366        for (id, entry) in self.vectors.iter() {
367            let decompressed = self.decompress_entry(entry)?;
368            out.insert(Arc::clone(id), decompressed);
369        }
370
371        #[allow(clippy::cast_possible_truncation)]
372        let vector_count = out.len() as u32;
373
374        self.pending_events.push(CorpusEvent::Decompressed {
375            corpus_id: Arc::clone(&self.corpus_id),
376            vector_count,
377            timestamp,
378        });
379
380        Ok(out)
381    }
382
383    /// Remove a vector by id, returning the entry if present.
384    ///
385    /// Silent: no domain event is emitted (Python parity — `del corpus[id]`).
386    pub fn remove(&mut self, id: &VectorId) -> Option<VectorEntry> {
387        self.vectors.remove(id.as_ref())
388    }
389
390    // ── Private helpers ───────────────────────────────────────────────────────
391
392    /// Validate that `vector.len()` matches the configured dimension.
393    #[allow(clippy::missing_const_for_fn)] // calls non-const CodecConfig::dimension()
394    fn validate_dimension(&self, vector: &[f32]) -> Result<(), CorpusError> {
395        let expected = self.config.dimension();
396        #[allow(clippy::cast_possible_truncation)]
397        let got = vector.len() as u32;
398        if got != expected {
399            return Err(CorpusError::DimensionMismatch { expected, got });
400        }
401        Ok(())
402    }
403
404    /// Compress a vector slice according to the active policy.
405    fn compress_vector(
406        &self,
407        vector: &[f32],
408    ) -> Result<CompressedVector, crate::errors::CodecError> {
409        match self.compression_policy {
410            CompressionPolicy::Compress => {
411                Codec::new().compress(vector, &self.config, &self.codebook)
412            }
413            CompressionPolicy::Passthrough => {
414                // Store raw f32 bytes: 4 bytes per dimension in the `indices` field.
415                // Each f32 is stored as 4 sequential index bytes (little-endian).
416                let mut indices: Vec<u8> = Vec::with_capacity(vector.len() * 4);
417                for &v in vector {
418                    indices.extend_from_slice(&v.to_le_bytes());
419                }
420                let dim = vector.len();
421                #[allow(clippy::cast_possible_truncation)]
422                let dim_u32 = dim as u32;
423                // We store 4 bytes per dimension in `indices` but the CompressedVector
424                // invariant expects `indices.len() == dimension`. We set dimension to
425                // byte_count (= dim * 4) to satisfy the invariant.
426                let byte_len = indices.len();
427                #[allow(clippy::cast_possible_truncation)]
428                let byte_len_u32 = byte_len as u32;
429                // Store byte_len_u32 as the CompressedVector dimension.
430                // Decompression reads back 4 bytes per original f32 dimension.
431                let _ = dim_u32; // original dim tracked via byte_len / 4 at decompress time
432                CompressedVector::new(
433                    indices.into_boxed_slice(),
434                    None,
435                    self.config.config_hash().clone(),
436                    byte_len_u32,
437                    8,
438                )
439            }
440            CompressionPolicy::Fp16 => {
441                // Store f16 LE: 2 bytes per dimension.
442                let mut indices: Vec<u8> = Vec::with_capacity(vector.len() * 2);
443                for &v in vector {
444                    let h = f16::from_f32(v);
445                    indices.extend_from_slice(&h.to_le_bytes());
446                }
447                let byte_len = indices.len();
448                #[allow(clippy::cast_possible_truncation)]
449                let byte_len_u32 = byte_len as u32;
450                CompressedVector::new(
451                    indices.into_boxed_slice(),
452                    None,
453                    self.config.config_hash().clone(),
454                    byte_len_u32,
455                    8,
456                )
457            }
458        }
459    }
460
461    /// Decompress a [`VectorEntry`] according to the active policy.
462    fn decompress_entry(&self, entry: &VectorEntry) -> Result<Vec<f32>, CorpusError> {
463        match self.compression_policy {
464            CompressionPolicy::Compress => Codec::new()
465                .decompress(entry.compressed(), &self.config, &self.codebook)
466                .map_err(CorpusError::Codec),
467            CompressionPolicy::Passthrough => {
468                // Reconstruct f32 from 4-byte chunks in `indices`.
469                let bytes = entry.compressed().indices();
470                if bytes.len() % 4 != 0 {
471                    return Err(CorpusError::Codec(
472                        crate::errors::CodecError::LengthMismatch {
473                            left: bytes.len(),
474                            right: (bytes.len() / 4) * 4,
475                        },
476                    ));
477                }
478                let floats: Vec<f32> = bytes
479                    .chunks_exact(4)
480                    .map(|chunk| {
481                        // chunks_exact guarantees exactly 4 bytes; array indexing is safe.
482                        #[allow(clippy::indexing_slicing)]
483                        f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]])
484                    })
485                    .collect();
486                Ok(floats)
487            }
488            CompressionPolicy::Fp16 => {
489                // Reconstruct f32 from 2-byte f16 LE chunks.
490                let bytes = entry.compressed().indices();
491                if bytes.len() % 2 != 0 {
492                    return Err(CorpusError::Codec(
493                        crate::errors::CodecError::LengthMismatch {
494                            left: bytes.len(),
495                            right: (bytes.len() / 2) * 2,
496                        },
497                    ));
498                }
499                let floats: Vec<f32> = bytes
500                    .chunks_exact(2)
501                    .map(|chunk| {
502                        #[allow(clippy::indexing_slicing)]
503                        let h = f16::from_le_bytes([chunk[0], chunk[1]]);
504                        f32::from(h)
505                    })
506                    .collect();
507                Ok(floats)
508            }
509        }
510    }
511
512    /// Convert an `Option<EntryMetaValue>` to a metadata map.
513    fn unwrap_meta(meta: Option<EntryMetaValue>) -> BTreeMap<String, EntryMetaValue> {
514        match meta {
515            None => BTreeMap::new(),
516            Some(EntryMetaValue::Object(arc_map)) => {
517                // Flatten an Object into the map — keys become string keys.
518                arc_map
519                    .iter()
520                    .map(|(k, v)| (k.as_ref().to_string(), v.clone()))
521                    .collect()
522            }
523            Some(other) => {
524                // Non-object metadata stored under a sentinel key.
525                let mut m = BTreeMap::new();
526                m.insert("_value".to_string(), other);
527                m
528            }
529        }
530    }
531}