tinyquant_core/corpus/aggregate.rs
1//! [`Corpus`] aggregate root — manages a collection of compressed embedding vectors.
2//!
3//! The `Corpus` is the primary domain object in `TinyQuant`'s corpus layer. It
4//! owns an insertion-ordered map of [`VectorEntry`] values, enforces the active
5//! [`CompressionPolicy`], buffers domain events, and exposes a clean API for
6//! insertion, decompression, and event draining.
7//!
8//! ## Thread safety
9//!
10//! `Corpus` is not `Sync`. Callers that need concurrent access must wrap it in
11//! an external lock.
12//!
13//! ## Passthrough and Fp16 policies
14//!
15//! For `Passthrough`, each f32 dimension is stored as 4 little-endian bytes
16//! in the `indices` field of a [`CompressedVector`] with `bit_width = 8`.
17//! For `Fp16`, each f32 is cast to f16 and stored as 2 little-endian bytes
18//! (2 dimensions packed per byte pair, with `bit_width = 8` to keep the
19//! `CompressedVector` invariant satisfied).
20
21use alloc::{
22 boxed::Box,
23 collections::BTreeMap,
24 string::{String, ToString},
25 sync::Arc,
26 vec,
27 vec::Vec,
28};
29
30use half::f16;
31
32use crate::codec::{Codebook, Codec, CodecConfig, CompressedVector};
33use crate::corpus::compression_policy::CompressionPolicy;
34use crate::corpus::entry_meta_value::EntryMetaValue;
35use crate::corpus::events::CorpusEvent;
36use crate::corpus::vector_entry::VectorEntry;
37use crate::errors::CorpusError;
38use crate::types::{CorpusId, Timestamp, VectorId};
39
40// Use VectorIdMap for both std and no_std builds in Phase 18.
41// IndexMap integration is deferred to Phase 21+ per the plan recommendation.
42use crate::corpus::vector_id_map::VectorIdMap;
43
44/// Accounting summary returned by [`Corpus::insert_batch`].
45#[derive(Clone, Debug, PartialEq, Eq)]
46pub struct BatchReport {
47 /// Number of vectors successfully inserted.
48 ///
49 /// In strict (all-or-nothing) mode this is either `0` (on `Err`) or
50 /// `vectors.len()` (on `Ok`).
51 pub inserted: usize,
52 /// Number of vectors skipped due to soft failure.
53 ///
54 /// Always `0` in strict mode; non-zero only in lenient mode (Phase 22+).
55 pub skipped: usize,
56 /// First error encountered, with its zero-based batch index.
57 ///
58 /// `None` on strict-mode success; `Some` in lenient mode when any vector
59 /// was rejected.
60 pub first_error: Option<(usize, CorpusError)>,
61}
62
63/// Aggregate root that manages a collection of compressed embedding vectors.
64///
65/// # Domain contracts
66///
67/// - `CompressionPolicy` is immutable after the first successful `insert`.
68/// - Domain events are buffered and drained atomically via [`drain_events`](Self::drain_events).
69/// - `insert_batch` is all-or-nothing: on error the corpus is not modified.
70///
71/// # Usage
72///
73/// ```ignore
74/// let mut corpus = Corpus::new(
75/// Arc::from("my-corpus"),
76/// config,
77/// codebook,
78/// CompressionPolicy::Compress,
79/// BTreeMap::new(),
80/// );
81/// corpus.insert(Arc::from("v1"), &vector, None)?;
82/// let events = corpus.drain_events();
83/// ```
84pub struct Corpus {
85 corpus_id: CorpusId,
86 config: CodecConfig,
87 codebook: Codebook,
88 compression_policy: CompressionPolicy,
89 vectors: VectorIdMap<VectorEntry>,
90 pending_events: Vec<CorpusEvent>,
91 metadata: BTreeMap<String, EntryMetaValue>,
92}
93
94impl Corpus {
95 /// Construct a new `Corpus` and emit a [`CorpusEvent::Created`] event.
96 ///
97 /// `timestamp` must be supplied by the caller (nanoseconds since Unix
98 /// epoch) to keep this crate `no_std` (no `std::time`).
99 #[must_use]
100 pub fn new(
101 corpus_id: CorpusId,
102 config: CodecConfig,
103 codebook: Codebook,
104 compression_policy: CompressionPolicy,
105 metadata: BTreeMap<String, EntryMetaValue>,
106 ) -> Self {
107 Self::new_at(corpus_id, config, codebook, compression_policy, metadata, 0)
108 }
109
110 /// Construct a new `Corpus` with an explicit creation timestamp.
111 ///
112 /// Use this variant in tests or callers that supply their own time source.
113 #[must_use]
114 pub fn new_at(
115 corpus_id: CorpusId,
116 config: CodecConfig,
117 codebook: Codebook,
118 compression_policy: CompressionPolicy,
119 metadata: BTreeMap<String, EntryMetaValue>,
120 timestamp: Timestamp,
121 ) -> Self {
122 let event = CorpusEvent::Created {
123 corpus_id: Arc::clone(&corpus_id),
124 codec_config: config.clone(),
125 compression_policy,
126 timestamp,
127 };
128 Self {
129 corpus_id,
130 config,
131 codebook,
132 compression_policy,
133 vectors: VectorIdMap::new(),
134 pending_events: vec![event],
135 metadata,
136 }
137 }
138
139 // ── Accessors ─────────────────────────────────────────────────────────────
140
141 /// The stable string identifier for this corpus.
142 #[must_use]
143 pub const fn corpus_id(&self) -> &CorpusId {
144 &self.corpus_id
145 }
146
147 /// The codec configuration used at construction time.
148 #[must_use]
149 pub const fn config(&self) -> &CodecConfig {
150 &self.config
151 }
152
153 /// The active compression policy.
154 #[must_use]
155 pub const fn compression_policy(&self) -> CompressionPolicy {
156 self.compression_policy
157 }
158
159 /// Number of vectors currently stored in the corpus.
160 #[must_use]
161 pub fn vector_count(&self) -> usize {
162 self.vectors.len()
163 }
164
165 /// Returns `true` when the corpus contains no vectors.
166 #[must_use]
167 pub fn is_empty(&self) -> bool {
168 self.vectors.is_empty()
169 }
170
171 /// Returns `true` if a vector with the given id exists.
172 #[must_use]
173 pub fn contains(&self, id: &VectorId) -> bool {
174 self.vectors.contains_key(id.as_ref())
175 }
176
177 /// Iterate over `(VectorId, VectorEntry)` pairs in insertion order.
178 pub fn iter(&self) -> impl Iterator<Item = (&VectorId, &VectorEntry)> {
179 self.vectors.iter()
180 }
181
182 /// Read-only access to the corpus-level metadata.
183 #[must_use]
184 pub const fn metadata(&self) -> &BTreeMap<String, EntryMetaValue> {
185 &self.metadata
186 }
187
188 // ── Mutation ──────────────────────────────────────────────────────────────
189
190 /// Drain and return all pending domain events.
191 ///
192 /// Uses `core::mem::take` — O(1), no allocation. After this call
193 /// `pending_events` is empty.
194 pub fn drain_events(&mut self) -> Vec<CorpusEvent> {
195 core::mem::take(&mut self.pending_events)
196 }
197
198 /// Insert a single vector into the corpus.
199 ///
200 /// # Errors
201 ///
202 /// - [`CorpusError::DimensionMismatch`] if `vector.len() != config.dimension()`.
203 /// - [`CorpusError::DuplicateVectorId`] if `id` is already present.
204 /// - [`CorpusError::Codec`] if the codec pipeline fails (only for `Compress` policy).
205 pub fn insert(
206 &mut self,
207 id: VectorId,
208 vector: &[f32],
209 entry_metadata: Option<EntryMetaValue>,
210 timestamp: Timestamp,
211 ) -> Result<(), CorpusError> {
212 self.validate_dimension(vector)?;
213 if self.vectors.contains_key(id.as_ref()) {
214 return Err(CorpusError::DuplicateVectorId { id });
215 }
216
217 let compressed = self.compress_vector(vector)?;
218 let meta = Self::unwrap_meta(entry_metadata);
219 #[allow(clippy::cast_possible_truncation)]
220 let declared_dim = vector.len() as u32;
221 let entry = VectorEntry::new(Arc::clone(&id), compressed, declared_dim, timestamp, meta);
222
223 self.vectors.insert(Arc::clone(&id), entry);
224
225 self.pending_events.push(CorpusEvent::VectorsInserted {
226 corpus_id: Arc::clone(&self.corpus_id),
227 vector_ids: Arc::from([id]),
228 count: 1,
229 timestamp,
230 });
231
232 Ok(())
233 }
234
235 /// Insert a batch of vectors atomically (all-or-nothing).
236 ///
237 /// On failure the corpus is unchanged and no events are emitted.
238 /// On success, exactly one [`CorpusEvent::VectorsInserted`] is emitted
239 /// covering all inserted ids.
240 ///
241 /// # Errors
242 ///
243 /// - [`CorpusError::BatchAtomicityFailure`] if any vector fails validation
244 /// or compression. The `index` field identifies the first failing vector.
245 pub fn insert_batch(
246 &mut self,
247 vectors: &[(VectorId, &[f32], Option<EntryMetaValue>)],
248 timestamp: Timestamp,
249 ) -> Result<BatchReport, CorpusError> {
250 // Stage 1: validate + compress all vectors without touching the map.
251 let mut staged: Vec<(VectorId, VectorEntry)> = Vec::with_capacity(vectors.len());
252
253 for (index, (id, vector, meta)) in vectors.iter().enumerate() {
254 // Validate dimension.
255 if let Err(e) = self.validate_dimension(vector) {
256 return Err(CorpusError::BatchAtomicityFailure {
257 index,
258 source: Box::new(e),
259 });
260 }
261 // Check for duplicates already in the corpus.
262 if self.vectors.contains_key(id.as_ref()) {
263 return Err(CorpusError::BatchAtomicityFailure {
264 index,
265 source: Box::new(CorpusError::DuplicateVectorId { id: Arc::clone(id) }),
266 });
267 }
268 // Check for duplicates within this batch itself.
269 let already_staged = staged
270 .iter()
271 .any(|(staged_id, _)| staged_id.as_ref() == id.as_ref());
272 if already_staged {
273 return Err(CorpusError::BatchAtomicityFailure {
274 index,
275 source: Box::new(CorpusError::DuplicateVectorId { id: Arc::clone(id) }),
276 });
277 }
278
279 // Compress.
280 let compressed = match self.compress_vector(vector) {
281 Ok(cv) => cv,
282 Err(e) => {
283 return Err(CorpusError::BatchAtomicityFailure {
284 index,
285 source: Box::new(CorpusError::Codec(e)),
286 });
287 }
288 };
289
290 let entry_meta = Self::unwrap_meta(meta.clone());
291 #[allow(clippy::cast_possible_truncation)]
292 let declared_dim = vector.len() as u32;
293 staged.push((
294 Arc::clone(id),
295 VectorEntry::new(
296 Arc::clone(id),
297 compressed,
298 declared_dim,
299 timestamp,
300 entry_meta,
301 ),
302 ));
303 }
304
305 // Stage 2: commit — all validations passed, flip into the map.
306 let mut ids: Vec<VectorId> = Vec::with_capacity(staged.len());
307 let inserted_count = staged.len();
308 for (id, entry) in staged {
309 ids.push(Arc::clone(&id));
310 self.vectors.insert(id, entry);
311 }
312
313 if !ids.is_empty() {
314 #[allow(clippy::cast_possible_truncation)]
315 let count = ids.len() as u32;
316 self.pending_events.push(CorpusEvent::VectorsInserted {
317 corpus_id: Arc::clone(&self.corpus_id),
318 vector_ids: Arc::from(ids.into_boxed_slice()),
319 count,
320 timestamp,
321 });
322 }
323
324 Ok(BatchReport {
325 inserted: inserted_count,
326 skipped: 0,
327 first_error: None,
328 })
329 }
330
331 /// Decompress a single vector by id.
332 ///
333 /// Does **not** emit a domain event (Python parity).
334 ///
335 /// # Errors
336 ///
337 /// - [`CorpusError::UnknownVectorId`] if `id` is not present.
338 /// - [`CorpusError::Codec`] if the codec pipeline fails.
339 pub fn decompress(&self, id: &VectorId) -> Result<Vec<f32>, CorpusError> {
340 let entry = self
341 .vectors
342 .get(id.as_ref())
343 .ok_or_else(|| CorpusError::UnknownVectorId { id: Arc::clone(id) })?;
344
345 self.decompress_entry(entry)
346 }
347
348 /// Decompress all vectors and return them with an explicit timestamp.
349 ///
350 /// Emits exactly one [`CorpusEvent::Decompressed`] on success.
351 /// No event is emitted when the corpus is empty — matches Python parity.
352 ///
353 /// # Errors
354 ///
355 /// Propagates errors from the codec pipeline for any individual vector.
356 pub fn decompress_all_at(
357 &mut self,
358 timestamp: Timestamp,
359 ) -> Result<BTreeMap<VectorId, Vec<f32>>, CorpusError> {
360 // No event emitted for empty corpus — matches Python parity.
361 if self.vectors.is_empty() {
362 return Ok(BTreeMap::new());
363 }
364
365 let mut out = BTreeMap::new();
366 for (id, entry) in self.vectors.iter() {
367 let decompressed = self.decompress_entry(entry)?;
368 out.insert(Arc::clone(id), decompressed);
369 }
370
371 #[allow(clippy::cast_possible_truncation)]
372 let vector_count = out.len() as u32;
373
374 self.pending_events.push(CorpusEvent::Decompressed {
375 corpus_id: Arc::clone(&self.corpus_id),
376 vector_count,
377 timestamp,
378 });
379
380 Ok(out)
381 }
382
383 /// Remove a vector by id, returning the entry if present.
384 ///
385 /// Silent: no domain event is emitted (Python parity — `del corpus[id]`).
386 pub fn remove(&mut self, id: &VectorId) -> Option<VectorEntry> {
387 self.vectors.remove(id.as_ref())
388 }
389
390 // ── Private helpers ───────────────────────────────────────────────────────
391
392 /// Validate that `vector.len()` matches the configured dimension.
393 #[allow(clippy::missing_const_for_fn)] // calls non-const CodecConfig::dimension()
394 fn validate_dimension(&self, vector: &[f32]) -> Result<(), CorpusError> {
395 let expected = self.config.dimension();
396 #[allow(clippy::cast_possible_truncation)]
397 let got = vector.len() as u32;
398 if got != expected {
399 return Err(CorpusError::DimensionMismatch { expected, got });
400 }
401 Ok(())
402 }
403
404 /// Compress a vector slice according to the active policy.
405 fn compress_vector(
406 &self,
407 vector: &[f32],
408 ) -> Result<CompressedVector, crate::errors::CodecError> {
409 match self.compression_policy {
410 CompressionPolicy::Compress => {
411 Codec::new().compress(vector, &self.config, &self.codebook)
412 }
413 CompressionPolicy::Passthrough => {
414 // Store raw f32 bytes: 4 bytes per dimension in the `indices` field.
415 // Each f32 is stored as 4 sequential index bytes (little-endian).
416 let mut indices: Vec<u8> = Vec::with_capacity(vector.len() * 4);
417 for &v in vector {
418 indices.extend_from_slice(&v.to_le_bytes());
419 }
420 let dim = vector.len();
421 #[allow(clippy::cast_possible_truncation)]
422 let dim_u32 = dim as u32;
423 // We store 4 bytes per dimension in `indices` but the CompressedVector
424 // invariant expects `indices.len() == dimension`. We set dimension to
425 // byte_count (= dim * 4) to satisfy the invariant.
426 let byte_len = indices.len();
427 #[allow(clippy::cast_possible_truncation)]
428 let byte_len_u32 = byte_len as u32;
429 // Store byte_len_u32 as the CompressedVector dimension.
430 // Decompression reads back 4 bytes per original f32 dimension.
431 let _ = dim_u32; // original dim tracked via byte_len / 4 at decompress time
432 CompressedVector::new(
433 indices.into_boxed_slice(),
434 None,
435 self.config.config_hash().clone(),
436 byte_len_u32,
437 8,
438 )
439 }
440 CompressionPolicy::Fp16 => {
441 // Store f16 LE: 2 bytes per dimension.
442 let mut indices: Vec<u8> = Vec::with_capacity(vector.len() * 2);
443 for &v in vector {
444 let h = f16::from_f32(v);
445 indices.extend_from_slice(&h.to_le_bytes());
446 }
447 let byte_len = indices.len();
448 #[allow(clippy::cast_possible_truncation)]
449 let byte_len_u32 = byte_len as u32;
450 CompressedVector::new(
451 indices.into_boxed_slice(),
452 None,
453 self.config.config_hash().clone(),
454 byte_len_u32,
455 8,
456 )
457 }
458 }
459 }
460
461 /// Decompress a [`VectorEntry`] according to the active policy.
462 fn decompress_entry(&self, entry: &VectorEntry) -> Result<Vec<f32>, CorpusError> {
463 match self.compression_policy {
464 CompressionPolicy::Compress => Codec::new()
465 .decompress(entry.compressed(), &self.config, &self.codebook)
466 .map_err(CorpusError::Codec),
467 CompressionPolicy::Passthrough => {
468 // Reconstruct f32 from 4-byte chunks in `indices`.
469 let bytes = entry.compressed().indices();
470 if bytes.len() % 4 != 0 {
471 return Err(CorpusError::Codec(
472 crate::errors::CodecError::LengthMismatch {
473 left: bytes.len(),
474 right: (bytes.len() / 4) * 4,
475 },
476 ));
477 }
478 let floats: Vec<f32> = bytes
479 .chunks_exact(4)
480 .map(|chunk| {
481 // chunks_exact guarantees exactly 4 bytes; array indexing is safe.
482 #[allow(clippy::indexing_slicing)]
483 f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]])
484 })
485 .collect();
486 Ok(floats)
487 }
488 CompressionPolicy::Fp16 => {
489 // Reconstruct f32 from 2-byte f16 LE chunks.
490 let bytes = entry.compressed().indices();
491 if bytes.len() % 2 != 0 {
492 return Err(CorpusError::Codec(
493 crate::errors::CodecError::LengthMismatch {
494 left: bytes.len(),
495 right: (bytes.len() / 2) * 2,
496 },
497 ));
498 }
499 let floats: Vec<f32> = bytes
500 .chunks_exact(2)
501 .map(|chunk| {
502 #[allow(clippy::indexing_slicing)]
503 let h = f16::from_le_bytes([chunk[0], chunk[1]]);
504 f32::from(h)
505 })
506 .collect();
507 Ok(floats)
508 }
509 }
510 }
511
512 /// Convert an `Option<EntryMetaValue>` to a metadata map.
513 fn unwrap_meta(meta: Option<EntryMetaValue>) -> BTreeMap<String, EntryMetaValue> {
514 match meta {
515 None => BTreeMap::new(),
516 Some(EntryMetaValue::Object(arc_map)) => {
517 // Flatten an Object into the map — keys become string keys.
518 arc_map
519 .iter()
520 .map(|(k, v)| (k.as_ref().to_string(), v.clone()))
521 .collect()
522 }
523 Some(other) => {
524 // Non-object metadata stored under a sentinel key.
525 let mut m = BTreeMap::new();
526 m.insert("_value".to_string(), other);
527 m
528 }
529 }
530 }
531}