tinyquant_core/corpus/aggregate.rs
1//! [`Corpus`] aggregate root — manages a collection of compressed embedding vectors.
2//!
3//! The `Corpus` is the primary domain object in `TinyQuant`'s corpus layer. It
4//! owns an insertion-ordered map of [`VectorEntry`] values, enforces the active
5//! [`CompressionPolicy`], buffers domain events, and exposes a clean API for
6//! insertion, decompression, and event draining.
7//!
8//! ## Thread safety
9//!
10//! `Corpus` is not `Sync`. Callers that need concurrent access must wrap it in
11//! an external lock.
12//!
13//! ## Passthrough and Fp16 policies
14//!
15//! For `Passthrough`, each f32 dimension is stored as 4 little-endian bytes
16//! in the `indices` field of a [`CompressedVector`] with `bit_width = 8`.
17//! For `Fp16`, each f32 is cast to f16 and stored as 2 little-endian bytes
18//! (2 dimensions packed per byte pair, with `bit_width = 8` to keep the
19//! `CompressedVector` invariant satisfied).
20
21use alloc::{
22 boxed::Box,
23 collections::BTreeMap,
24 string::{String, ToString},
25 sync::Arc,
26 vec,
27 vec::Vec,
28};
29
30use half::f16;
31
32use crate::codec::{Codebook, Codec, CodecConfig, CompressedVector};
33use crate::corpus::compression_policy::CompressionPolicy;
34use crate::corpus::entry_meta_value::EntryMetaValue;
35use crate::corpus::events::CorpusEvent;
36use crate::corpus::vector_entry::VectorEntry;
37use crate::errors::CorpusError;
38use crate::types::{CorpusId, Timestamp, VectorId};
39
40// Use VectorIdMap for both std and no_std builds in Phase 18.
41// IndexMap integration is deferred to Phase 21+ per the plan recommendation.
42use crate::corpus::vector_id_map::VectorIdMap;
43
44/// Accounting summary returned by [`Corpus::insert_batch`].
45#[derive(Clone, Debug, PartialEq, Eq)]
46pub struct BatchReport {
47 /// Number of vectors successfully inserted.
48 ///
49 /// In strict (all-or-nothing) mode this is either `0` (on `Err`) or
50 /// `vectors.len()` (on `Ok`).
51 pub inserted: usize,
52 /// Number of vectors skipped due to soft failure.
53 ///
54 /// Always `0` in strict mode; non-zero only in lenient mode (Phase 22+).
55 pub skipped: usize,
56 /// First error encountered, with its zero-based batch index.
57 ///
58 /// `None` on strict-mode success; `Some` in lenient mode when any vector
59 /// was rejected.
60 pub first_error: Option<(usize, CorpusError)>,
61}
62
63/// Aggregate root that manages a collection of compressed embedding vectors.
64///
65/// # Domain contracts
66///
67/// - `CompressionPolicy` is immutable after the first successful `insert`.
68/// - Domain events are buffered and drained atomically via [`drain_events`](Self::drain_events).
69/// - `insert_batch` is all-or-nothing: on error the corpus is not modified.
70///
71/// # Usage
72///
73/// ```ignore
74/// let mut corpus = Corpus::new(
75/// Arc::from("my-corpus"),
76/// config,
77/// codebook,
78/// CompressionPolicy::Compress,
79/// BTreeMap::new(),
80/// );
81/// corpus.insert(Arc::from("v1"), &vector, None)?;
82/// let events = corpus.drain_events();
83/// ```
84#[allow(clippy::struct_field_names)]
85pub struct Corpus {
86 corpus_id: CorpusId,
87 config: CodecConfig,
88 codebook: Codebook,
89 compression_policy: CompressionPolicy,
90 vectors: VectorIdMap<VectorEntry>,
91 pending_events: Vec<CorpusEvent>,
92 metadata: BTreeMap<String, EntryMetaValue>,
93}
94
95impl Corpus {
96 /// Construct a new `Corpus` and emit a [`CorpusEvent::Created`] event.
97 ///
98 /// `timestamp` must be supplied by the caller (nanoseconds since Unix
99 /// epoch) to keep this crate `no_std` (no `std::time`).
100 #[must_use]
101 pub fn new(
102 corpus_id: CorpusId,
103 config: CodecConfig,
104 codebook: Codebook,
105 compression_policy: CompressionPolicy,
106 metadata: BTreeMap<String, EntryMetaValue>,
107 ) -> Self {
108 Self::new_at(corpus_id, config, codebook, compression_policy, metadata, 0)
109 }
110
111 /// Construct a new `Corpus` with an explicit creation timestamp.
112 ///
113 /// Use this variant in tests or callers that supply their own time source.
114 #[must_use]
115 pub fn new_at(
116 corpus_id: CorpusId,
117 config: CodecConfig,
118 codebook: Codebook,
119 compression_policy: CompressionPolicy,
120 metadata: BTreeMap<String, EntryMetaValue>,
121 timestamp: Timestamp,
122 ) -> Self {
123 let event = CorpusEvent::Created {
124 corpus_id: Arc::clone(&corpus_id),
125 codec_config: config.clone(),
126 compression_policy,
127 timestamp,
128 };
129 Self {
130 corpus_id,
131 config,
132 codebook,
133 compression_policy,
134 vectors: VectorIdMap::new(),
135 pending_events: vec![event],
136 metadata,
137 }
138 }
139
140 // ── Accessors ─────────────────────────────────────────────────────────────
141
142 /// The stable string identifier for this corpus.
143 #[must_use]
144 pub const fn corpus_id(&self) -> &CorpusId {
145 &self.corpus_id
146 }
147
148 /// The codec configuration used at construction time.
149 #[must_use]
150 pub const fn config(&self) -> &CodecConfig {
151 &self.config
152 }
153
154 /// The active compression policy.
155 #[must_use]
156 pub const fn compression_policy(&self) -> CompressionPolicy {
157 self.compression_policy
158 }
159
160 /// Number of vectors currently stored in the corpus.
161 #[must_use]
162 pub fn vector_count(&self) -> usize {
163 self.vectors.len()
164 }
165
166 /// Returns `true` when the corpus contains no vectors.
167 #[must_use]
168 pub fn is_empty(&self) -> bool {
169 self.vectors.is_empty()
170 }
171
172 /// Returns `true` if a vector with the given id exists.
173 #[must_use]
174 pub fn contains(&self, id: &VectorId) -> bool {
175 self.vectors.contains_key(id.as_ref())
176 }
177
178 /// Iterate over `(VectorId, VectorEntry)` pairs in insertion order.
179 pub fn iter(&self) -> impl Iterator<Item = (&VectorId, &VectorEntry)> {
180 self.vectors.iter()
181 }
182
183 /// Read-only access to the corpus-level metadata.
184 #[must_use]
185 pub const fn metadata(&self) -> &BTreeMap<String, EntryMetaValue> {
186 &self.metadata
187 }
188
189 // ── Mutation ──────────────────────────────────────────────────────────────
190
191 /// Drain and return all pending domain events.
192 ///
193 /// Uses `core::mem::take` — O(1), no allocation. After this call
194 /// `pending_events` is empty.
195 pub fn drain_events(&mut self) -> Vec<CorpusEvent> {
196 core::mem::take(&mut self.pending_events)
197 }
198
199 /// Insert a single vector into the corpus.
200 ///
201 /// # Errors
202 ///
203 /// - [`CorpusError::DimensionMismatch`] if `vector.len() != config.dimension()`.
204 /// - [`CorpusError::DuplicateVectorId`] if `id` is already present.
205 /// - [`CorpusError::Codec`] if the codec pipeline fails (only for `Compress` policy).
206 pub fn insert(
207 &mut self,
208 id: VectorId,
209 vector: &[f32],
210 entry_metadata: Option<EntryMetaValue>,
211 timestamp: Timestamp,
212 ) -> Result<(), CorpusError> {
213 self.validate_dimension(vector)?;
214 if self.vectors.contains_key(id.as_ref()) {
215 return Err(CorpusError::DuplicateVectorId { id });
216 }
217
218 let compressed = self.compress_vector(vector)?;
219 let meta = Self::unwrap_meta(entry_metadata);
220 #[allow(clippy::cast_possible_truncation)]
221 let declared_dim = vector.len() as u32;
222 let entry = VectorEntry::new(Arc::clone(&id), compressed, declared_dim, timestamp, meta);
223
224 self.vectors.insert(Arc::clone(&id), entry);
225
226 self.pending_events.push(CorpusEvent::VectorsInserted {
227 corpus_id: Arc::clone(&self.corpus_id),
228 vector_ids: Arc::from([id]),
229 count: 1,
230 timestamp,
231 });
232
233 Ok(())
234 }
235
236 /// Insert a batch of vectors atomically (all-or-nothing).
237 ///
238 /// On failure the corpus is unchanged and no events are emitted.
239 /// On success, exactly one [`CorpusEvent::VectorsInserted`] is emitted
240 /// covering all inserted ids.
241 ///
242 /// # Errors
243 ///
244 /// - [`CorpusError::BatchAtomicityFailure`] if any vector fails validation
245 /// or compression. The `index` field identifies the first failing vector.
246 pub fn insert_batch(
247 &mut self,
248 vectors: &[(VectorId, &[f32], Option<EntryMetaValue>)],
249 timestamp: Timestamp,
250 ) -> Result<BatchReport, CorpusError> {
251 // Stage 1: validate + compress all vectors without touching the map.
252 let mut staged: Vec<(VectorId, VectorEntry)> = Vec::with_capacity(vectors.len());
253
254 for (index, (id, vector, meta)) in vectors.iter().enumerate() {
255 // Validate dimension.
256 if let Err(e) = self.validate_dimension(vector) {
257 return Err(CorpusError::BatchAtomicityFailure {
258 index,
259 source: Box::new(e),
260 });
261 }
262 // Check for duplicates already in the corpus.
263 if self.vectors.contains_key(id.as_ref()) {
264 return Err(CorpusError::BatchAtomicityFailure {
265 index,
266 source: Box::new(CorpusError::DuplicateVectorId { id: Arc::clone(id) }),
267 });
268 }
269 // Check for duplicates within this batch itself.
270 let already_staged = staged
271 .iter()
272 .any(|(staged_id, _)| staged_id.as_ref() == id.as_ref());
273 if already_staged {
274 return Err(CorpusError::BatchAtomicityFailure {
275 index,
276 source: Box::new(CorpusError::DuplicateVectorId { id: Arc::clone(id) }),
277 });
278 }
279
280 // Compress.
281 let compressed = match self.compress_vector(vector) {
282 Ok(cv) => cv,
283 Err(e) => {
284 return Err(CorpusError::BatchAtomicityFailure {
285 index,
286 source: Box::new(CorpusError::Codec(e)),
287 });
288 }
289 };
290
291 let entry_meta = Self::unwrap_meta(meta.clone());
292 #[allow(clippy::cast_possible_truncation)]
293 let declared_dim = vector.len() as u32;
294 staged.push((
295 Arc::clone(id),
296 VectorEntry::new(
297 Arc::clone(id),
298 compressed,
299 declared_dim,
300 timestamp,
301 entry_meta,
302 ),
303 ));
304 }
305
306 // Stage 2: commit — all validations passed, flip into the map.
307 let mut ids: Vec<VectorId> = Vec::with_capacity(staged.len());
308 let inserted_count = staged.len();
309 for (id, entry) in staged {
310 ids.push(Arc::clone(&id));
311 self.vectors.insert(id, entry);
312 }
313
314 if !ids.is_empty() {
315 #[allow(clippy::cast_possible_truncation)]
316 let count = ids.len() as u32;
317 self.pending_events.push(CorpusEvent::VectorsInserted {
318 corpus_id: Arc::clone(&self.corpus_id),
319 vector_ids: Arc::from(ids.into_boxed_slice()),
320 count,
321 timestamp,
322 });
323 }
324
325 Ok(BatchReport {
326 inserted: inserted_count,
327 skipped: 0,
328 first_error: None,
329 })
330 }
331
332 /// Decompress a single vector by id.
333 ///
334 /// Does **not** emit a domain event (Python parity).
335 ///
336 /// # Errors
337 ///
338 /// - [`CorpusError::UnknownVectorId`] if `id` is not present.
339 /// - [`CorpusError::Codec`] if the codec pipeline fails.
340 pub fn decompress(&self, id: &VectorId) -> Result<Vec<f32>, CorpusError> {
341 let entry = self
342 .vectors
343 .get(id.as_ref())
344 .ok_or_else(|| CorpusError::UnknownVectorId { id: Arc::clone(id) })?;
345
346 self.decompress_entry(entry)
347 }
348
349 /// Decompress all vectors and return them with an explicit timestamp.
350 ///
351 /// Emits exactly one [`CorpusEvent::Decompressed`] on success.
352 /// No event is emitted when the corpus is empty — matches Python parity.
353 ///
354 /// # Errors
355 ///
356 /// Propagates errors from the codec pipeline for any individual vector.
357 pub fn decompress_all_at(
358 &mut self,
359 timestamp: Timestamp,
360 ) -> Result<BTreeMap<VectorId, Vec<f32>>, CorpusError> {
361 // No event emitted for empty corpus — matches Python parity.
362 if self.vectors.is_empty() {
363 return Ok(BTreeMap::new());
364 }
365
366 let mut out = BTreeMap::new();
367 for (id, entry) in self.vectors.iter() {
368 let decompressed = self.decompress_entry(entry)?;
369 out.insert(Arc::clone(id), decompressed);
370 }
371
372 #[allow(clippy::cast_possible_truncation)]
373 let vector_count = out.len() as u32;
374
375 self.pending_events.push(CorpusEvent::Decompressed {
376 corpus_id: Arc::clone(&self.corpus_id),
377 vector_count,
378 timestamp,
379 });
380
381 Ok(out)
382 }
383
384 /// Remove a vector by id, returning the entry if present.
385 ///
386 /// Silent: no domain event is emitted (Python parity — `del corpus[id]`).
387 pub fn remove(&mut self, id: &VectorId) -> Option<VectorEntry> {
388 self.vectors.remove(id.as_ref())
389 }
390
391 // ── Private helpers ───────────────────────────────────────────────────────
392
393 /// Validate that `vector.len()` matches the configured dimension.
394 #[allow(clippy::missing_const_for_fn)] // calls non-const CodecConfig::dimension()
395 fn validate_dimension(&self, vector: &[f32]) -> Result<(), CorpusError> {
396 let expected = self.config.dimension();
397 #[allow(clippy::cast_possible_truncation)]
398 let got = vector.len() as u32;
399 if got != expected {
400 return Err(CorpusError::DimensionMismatch { expected, got });
401 }
402 Ok(())
403 }
404
405 /// Compress a vector slice according to the active policy.
406 fn compress_vector(
407 &self,
408 vector: &[f32],
409 ) -> Result<CompressedVector, crate::errors::CodecError> {
410 match self.compression_policy {
411 CompressionPolicy::Compress => {
412 Codec::new().compress(vector, &self.config, &self.codebook)
413 }
414 CompressionPolicy::Passthrough => {
415 // Store raw f32 bytes: 4 bytes per dimension in the `indices` field.
416 // Each f32 is stored as 4 sequential index bytes (little-endian).
417 let mut indices: Vec<u8> = Vec::with_capacity(vector.len() * 4);
418 for &v in vector {
419 indices.extend_from_slice(&v.to_le_bytes());
420 }
421 let dim = vector.len();
422 #[allow(clippy::cast_possible_truncation)]
423 let dim_u32 = dim as u32;
424 // We store 4 bytes per dimension in `indices` but the CompressedVector
425 // invariant expects `indices.len() == dimension`. We set dimension to
426 // byte_count (= dim * 4) to satisfy the invariant.
427 let byte_len = indices.len();
428 #[allow(clippy::cast_possible_truncation)]
429 let byte_len_u32 = byte_len as u32;
430 // Store byte_len_u32 as the CompressedVector dimension.
431 // Decompression reads back 4 bytes per original f32 dimension.
432 let _ = dim_u32; // original dim tracked via byte_len / 4 at decompress time
433 CompressedVector::new(
434 indices.into_boxed_slice(),
435 None,
436 self.config.config_hash().clone(),
437 byte_len_u32,
438 8,
439 )
440 }
441 CompressionPolicy::Fp16 => {
442 // Store f16 LE: 2 bytes per dimension.
443 let mut indices: Vec<u8> = Vec::with_capacity(vector.len() * 2);
444 for &v in vector {
445 let h = f16::from_f32(v);
446 indices.extend_from_slice(&h.to_le_bytes());
447 }
448 let byte_len = indices.len();
449 #[allow(clippy::cast_possible_truncation)]
450 let byte_len_u32 = byte_len as u32;
451 CompressedVector::new(
452 indices.into_boxed_slice(),
453 None,
454 self.config.config_hash().clone(),
455 byte_len_u32,
456 8,
457 )
458 }
459 }
460 }
461
462 /// Decompress a [`VectorEntry`] according to the active policy.
463 fn decompress_entry(&self, entry: &VectorEntry) -> Result<Vec<f32>, CorpusError> {
464 match self.compression_policy {
465 CompressionPolicy::Compress => Codec::new()
466 .decompress(entry.compressed(), &self.config, &self.codebook)
467 .map_err(CorpusError::Codec),
468 CompressionPolicy::Passthrough => {
469 // Reconstruct f32 from 4-byte chunks in `indices`.
470 let bytes = entry.compressed().indices();
471 if bytes.len() % 4 != 0 {
472 return Err(CorpusError::Codec(
473 crate::errors::CodecError::LengthMismatch {
474 left: bytes.len(),
475 right: (bytes.len() / 4) * 4,
476 },
477 ));
478 }
479 let floats: Vec<f32> = bytes
480 .chunks_exact(4)
481 .map(|chunk| {
482 // chunks_exact guarantees exactly 4 bytes; array indexing is safe.
483 #[allow(clippy::indexing_slicing)]
484 f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]])
485 })
486 .collect();
487 Ok(floats)
488 }
489 CompressionPolicy::Fp16 => {
490 // Reconstruct f32 from 2-byte f16 LE chunks.
491 let bytes = entry.compressed().indices();
492 if bytes.len() % 2 != 0 {
493 return Err(CorpusError::Codec(
494 crate::errors::CodecError::LengthMismatch {
495 left: bytes.len(),
496 right: (bytes.len() / 2) * 2,
497 },
498 ));
499 }
500 let floats: Vec<f32> = bytes
501 .chunks_exact(2)
502 .map(|chunk| {
503 #[allow(clippy::indexing_slicing)]
504 let h = f16::from_le_bytes([chunk[0], chunk[1]]);
505 f32::from(h)
506 })
507 .collect();
508 Ok(floats)
509 }
510 }
511 }
512
513 /// Convert an `Option<EntryMetaValue>` to a metadata map.
514 fn unwrap_meta(meta: Option<EntryMetaValue>) -> BTreeMap<String, EntryMetaValue> {
515 match meta {
516 None => BTreeMap::new(),
517 Some(EntryMetaValue::Object(arc_map)) => {
518 // Flatten an Object into the map — keys become string keys.
519 arc_map
520 .iter()
521 .map(|(k, v)| (k.as_ref().to_string(), v.clone()))
522 .collect()
523 }
524 Some(other) => {
525 // Non-object metadata stored under a sentinel key.
526 let mut m = BTreeMap::new();
527 m.insert("_value".to_string(), other);
528 m
529 }
530 }
531 }
532}