1use crc32fast::Hasher;
6use serde::{Deserialize, Serialize};
7
8use crate::columnar::encoding::{Column, LogicalType};
9use crate::columnar::encoding_v2::{create_decoder, create_encoder, Bitmap, EncodingV2};
10use crate::columnar::segment_v2::{
11 ColumnSchema, ColumnSegmentV2, InMemorySegmentSource, RecordBatch, Schema, SegmentReaderV2,
12 SegmentWriterV2,
13};
14use crate::columnar::statistics::{ScalarValue, VectorSegmentStatistics};
15use crate::storage::compression::CompressionV2;
16use crate::vector::simd::select_kernel;
17use crate::vector::{CompactionResult, DeleteResult, Metric};
18use crate::{Error, Result};
19use std::collections::HashSet;
20
21const VECTOR_SEGMENT_VERSION: u8 = 1;
22
23#[derive(Serialize, Deserialize)]
24struct VectorSegmentEnvelope {
25 version: u8,
26 segment_id: u64,
27 dimension: usize,
28 metric: Metric,
29 statistics: VectorSegmentStatistics,
30 segment: ColumnSegmentV2,
31}
32
33#[derive(Clone, Debug, Serialize, Deserialize)]
35pub struct EncodedColumn {
36 pub logical_type: LogicalType,
38 pub encoding: crate::columnar::encoding_v2::EncodingV2,
40 pub num_values: u64,
42 pub data: Vec<u8>,
44 pub null_bitmap: Option<Bitmap>,
46}
47
48#[derive(Clone, Debug, Serialize, Deserialize)]
50pub struct VectorSegment {
51 pub segment_id: u64,
53 pub dimension: usize,
55 pub metric: Metric,
57 pub num_vectors: u64,
59 pub vectors: EncodedColumn,
61 pub keys: EncodedColumn,
63 pub deleted: Bitmap,
65 pub metadata: Option<Vec<EncodedColumn>>,
67 pub statistics: VectorSegmentStatistics,
69}
70
71impl VectorSegment {
72 pub fn to_bytes(&self) -> Result<Vec<u8>> {
74 self.validate()?;
75 let envelope = VectorSegmentEnvelope {
76 version: VECTOR_SEGMENT_VERSION,
77 segment_id: self.segment_id,
78 dimension: self.dimension,
79 metric: self.metric,
80 statistics: self.statistics.clone(),
81 segment: self.build_column_segment()?,
82 };
83
84 let mut payload =
85 bincode::serialize(&envelope).map_err(|e| Error::InvalidFormat(e.to_string()))?;
86 let mut hasher = Hasher::new();
87 hasher.update(&payload);
88 let checksum = hasher.finalize();
89 payload.extend_from_slice(&checksum.to_le_bytes());
90 Ok(payload)
91 }
92
93 pub fn from_bytes(bytes: &[u8]) -> Result<Self> {
95 if bytes.len() < 4 {
96 return Err(Error::InvalidFormat("VectorSegment bytes too short".into()));
97 }
98 let (payload, checksum_bytes) = bytes.split_at(bytes.len() - 4);
99 let expected =
100 u32::from_le_bytes(checksum_bytes.try_into().expect("split gives 4-byte slice"));
101
102 let mut hasher = Hasher::new();
103 hasher.update(payload);
104 let computed = hasher.finalize();
105 if computed != expected {
106 return Err(Error::ChecksumMismatch);
107 }
108
109 let envelope: VectorSegmentEnvelope =
110 bincode::deserialize(payload).map_err(|e| Error::InvalidFormat(e.to_string()))?;
111 if envelope.version != VECTOR_SEGMENT_VERSION {
112 return Err(Error::InvalidFormat(
113 "unsupported VectorSegment version".into(),
114 ));
115 }
116
117 let segment = Self::from_column_segment(envelope)?;
118 segment.validate()?;
119 Ok(segment)
120 }
121
122 fn validate(&self) -> Result<()> {
124 if self.dimension == 0 {
125 return Err(Error::InvalidFormat("dimension must be > 0".into()));
126 }
127 let n = self.num_vectors as usize;
128
129 if self.vectors.logical_type != LogicalType::Float32 {
131 return Err(Error::InvalidFormat(
132 "vectors.logical_type must be Float32".into(),
133 ));
134 }
135 let expected_values = n
136 .checked_mul(self.dimension)
137 .ok_or_else(|| Error::InvalidFormat("num_vectors * dimension overflow".into()))?;
138 if self.vectors.num_values as usize != expected_values {
139 return Err(Error::InvalidFormat(
140 "vectors.num_values mismatch num_vectors * dimension".into(),
141 ));
142 }
143 if let Some(bm) = &self.vectors.null_bitmap {
144 if bm.len() != expected_values {
145 return Err(Error::InvalidFormat(
146 "vectors.null_bitmap length mismatch".into(),
147 ));
148 }
149 }
150
151 if self.keys.logical_type != LogicalType::Int64 {
153 return Err(Error::InvalidFormat(
154 "keys.logical_type must be Int64".into(),
155 ));
156 }
157 if self.keys.num_values as usize != n {
158 return Err(Error::InvalidFormat(
159 "keys.num_values mismatch num_vectors".into(),
160 ));
161 }
162 if let Some(bm) = &self.keys.null_bitmap {
163 if bm.len() != n {
164 return Err(Error::InvalidFormat(
165 "keys.null_bitmap length mismatch".into(),
166 ));
167 }
168 }
169 if self.deleted.len() != n {
171 return Err(Error::InvalidFormat(
172 "deleted bitmap length mismatch num_vectors".into(),
173 ));
174 }
175 let mut deleted_count = 0u64;
176 let mut active_count = 0u64;
177 for idx in 0..n {
178 if self.deleted.get(idx) {
179 deleted_count += 1;
180 } else {
181 active_count += 1;
182 }
183 }
184
185 if let Some(meta_cols) = &self.metadata {
187 for (idx, col) in meta_cols.iter().enumerate() {
188 if col.num_values as usize != n {
189 return Err(Error::InvalidFormat(format!(
190 "metadata column {} num_values mismatch num_vectors",
191 idx
192 )));
193 }
194 if let Some(bm) = &col.null_bitmap {
195 if bm.len() != n {
196 return Err(Error::InvalidFormat(format!(
197 "metadata column {} null_bitmap length mismatch",
198 idx
199 )));
200 }
201 }
202 }
203 }
204
205 if self.statistics.row_count != self.num_vectors {
207 return Err(Error::InvalidFormat(
208 "statistics.row_count mismatch num_vectors".into(),
209 ));
210 }
211 let active_deleted = self
212 .statistics
213 .active_count
214 .saturating_add(self.statistics.deleted_count);
215 if active_deleted != self.num_vectors {
216 return Err(Error::InvalidFormat(
217 "statistics.active_count + deleted_count mismatch num_vectors".into(),
218 ));
219 }
220 if self.statistics.deleted_count != deleted_count {
221 return Err(Error::InvalidFormat(
222 "statistics.deleted_count mismatch deleted bitmap".into(),
223 ));
224 }
225 if self.statistics.active_count != active_count {
226 return Err(Error::InvalidFormat(
227 "statistics.active_count mismatch deleted bitmap".into(),
228 ));
229 }
230 if self.statistics.row_count > 0 {
231 let expected_ratio =
232 (self.statistics.deleted_count as f32) / (self.statistics.row_count as f32);
233 if (self.statistics.deletion_ratio - expected_ratio).abs() > 1e-6 {
234 return Err(Error::InvalidFormat(
235 "statistics.deletion_ratio mismatch deleted_count/row_count".into(),
236 ));
237 }
238 } else if self.statistics.deletion_ratio != 0.0 {
239 return Err(Error::InvalidFormat(
240 "statistics.deletion_ratio must be 0 when row_count is 0".into(),
241 ));
242 }
243
244 Ok(())
245 }
246
247 pub fn decode_vectors(&self) -> Result<Vec<f32>> {
249 let decoder = create_decoder(self.vectors.encoding);
250 let (col, _) = decoder
251 .decode(
252 &self.vectors.data,
253 self.vectors.num_values as usize,
254 self.vectors.logical_type,
255 )
256 .map_err(|e| Error::InvalidFormat(e.to_string()))?;
257 match col {
258 Column::Float32(v) => Ok(v),
259 other => Err(Error::InvalidFormat(format!(
260 "vectors column decoded to unexpected type {:?}",
261 other
262 ))),
263 }
264 }
265
266 pub fn decode_keys(&self) -> Result<Vec<i64>> {
268 let decoder = create_decoder(self.keys.encoding);
269 let (col, _) = decoder
270 .decode(
271 &self.keys.data,
272 self.keys.num_values as usize,
273 self.keys.logical_type,
274 )
275 .map_err(|e| Error::InvalidFormat(e.to_string()))?;
276 match col {
277 Column::Int64(v) => Ok(v),
278 other => Err(Error::InvalidFormat(format!(
279 "keys column decoded to unexpected type {:?}",
280 other
281 ))),
282 }
283 }
284
285 fn recompute_deletion_stats(&mut self) {
287 let row_count = self.num_vectors;
288 let deleted_count = (0..row_count as usize)
289 .filter(|&i| self.deleted.get(i))
290 .count() as u64;
291 let active_count = row_count.saturating_sub(deleted_count);
292 self.statistics.row_count = row_count;
293 self.statistics.deleted_count = deleted_count;
294 self.statistics.active_count = active_count;
295 self.statistics.deletion_ratio = if row_count > 0 {
296 deleted_count as f32 / row_count as f32
297 } else {
298 0.0
299 };
300 }
301
302 fn build_column_segment(&self) -> Result<ColumnSegmentV2> {
304 use crate::columnar::segment_v2::SegmentConfigV2;
305
306 let n = self.num_vectors as usize;
307 let dim = self.dimension;
308 let compression = CompressionV2::None;
309
310 let (vec_col_decoded, vec_bm) = self.decode_column(&self.vectors)?;
312 let floats = match vec_col_decoded {
313 Column::Float32(v) => v,
314 other => {
315 return Err(Error::InvalidFormat(format!(
316 "vectors column must decode to Float32, got {:?}",
317 other
318 )))
319 }
320 };
321 if floats.len() != n * dim {
322 return Err(Error::InvalidFormat(
323 "decoded vectors length mismatch dimension".into(),
324 ));
325 }
326 let fixed_len = dim
327 .checked_mul(4)
328 .ok_or_else(|| Error::InvalidFormat("dimension overflow".into()))?;
329 if fixed_len > u16::MAX as usize {
330 return Err(Error::InvalidFormat("dimension too large for Fixed".into()));
331 }
332 let mut fixed_values = Vec::with_capacity(n);
333 for chunk in floats.chunks(dim) {
334 let mut buf = Vec::with_capacity(fixed_len);
335 for v in chunk {
336 buf.extend_from_slice(&v.to_le_bytes());
337 }
338 fixed_values.push(buf);
339 }
340 let vectors_column = Column::Binary(fixed_values);
341
342 let (keys_col_decoded, keys_bm) = self.decode_column(&self.keys)?;
344 let keys_column = match keys_col_decoded {
345 Column::Int64(v) => {
346 if v.len() != n {
347 return Err(Error::InvalidFormat(
348 "keys length mismatch num_vectors".into(),
349 ));
350 }
351 Column::Int64(v)
352 }
353 other => {
354 return Err(Error::InvalidFormat(format!(
355 "keys column must decode to Int64, got {:?}",
356 other
357 )))
358 }
359 };
360
361 let deleted_column = Column::Bool((0..n).map(|i| self.deleted.get(i)).collect());
363
364 let mut metadata_columns = Vec::new();
366 let mut metadata_bitmaps = Vec::new();
367 if let Some(meta_cols) = &self.metadata {
368 for col in meta_cols {
369 let (decoded, bm) = self.decode_column(col)?;
370 if column_length(&decoded) != n {
371 return Err(Error::InvalidFormat(
372 "metadata length mismatch num_vectors".into(),
373 ));
374 }
375 let normalized = if let LogicalType::Fixed(len) = col.logical_type {
376 ensure_fixed_column(decoded, len as usize)?
377 } else {
378 decoded
379 };
380 metadata_columns.push(normalized);
381 metadata_bitmaps.push(bm);
382 }
383 }
384
385 let mut schema_columns = Vec::new();
386 let mut columns = Vec::new();
387 let mut bitmaps = Vec::new();
388
389 schema_columns.push(ColumnSchema {
390 name: "vectors".into(),
391 logical_type: LogicalType::Binary,
392 nullable: vec_bm.is_some(),
393 fixed_len: Some(fixed_len as u32),
394 });
395 columns.push(vectors_column);
396 bitmaps.push(vec_bm);
397
398 schema_columns.push(ColumnSchema {
399 name: "keys".into(),
400 logical_type: LogicalType::Int64,
401 nullable: keys_bm.is_some(),
402 fixed_len: None,
403 });
404 columns.push(keys_column);
405 bitmaps.push(keys_bm);
406
407 schema_columns.push(ColumnSchema {
408 name: "deleted".into(),
409 logical_type: LogicalType::Bool,
410 nullable: false,
411 fixed_len: None,
412 });
413 columns.push(deleted_column);
414 bitmaps.push(None);
415
416 for (idx, col) in metadata_columns.into_iter().enumerate() {
417 let bm = metadata_bitmaps.get(idx).cloned().unwrap_or(None);
418 schema_columns.push(ColumnSchema {
419 name: format!("meta_{idx}"),
420 logical_type: column_logical_type(&col)?,
421 nullable: bm.is_some(),
422 fixed_len: match &col {
423 Column::Fixed { len, .. } => Some(*len as u32),
424 _ => None,
425 },
426 });
427 columns.push(col);
428 bitmaps.push(bm);
429 }
430
431 let schema = Schema {
432 columns: schema_columns,
433 };
434 let batch = RecordBatch::new(schema, columns, bitmaps);
435
436 let mut writer = SegmentWriterV2::new(SegmentConfigV2 {
437 compression,
438 ..Default::default()
439 });
440 writer
441 .write_batch(batch)
442 .map_err(|e| Error::InvalidFormat(e.to_string()))?;
443 writer
444 .finish()
445 .map_err(|e| Error::InvalidFormat(e.to_string()))
446 }
447
448 fn from_column_segment(envelope: VectorSegmentEnvelope) -> Result<Self> {
450 let num_vectors = envelope.segment.meta.num_rows;
451
452 let reader = SegmentReaderV2::open(Box::new(InMemorySegmentSource::new(
454 envelope.segment.data.clone(),
455 )))
456 .map_err(|e| Error::InvalidFormat(e.to_string()))?;
457
458 let column_count = envelope.segment.meta.schema.column_count();
459 let mut combined_columns: Vec<Option<Column>> = vec![None; column_count];
460 let mut combined_bitmaps: Vec<Option<Bitmap>> = vec![None; column_count];
461
462 for batch in reader
463 .iter_row_groups()
464 .collect::<std::result::Result<Vec<_>, _>>()
465 .map_err(|e| Error::InvalidFormat(e.to_string()))?
466 {
467 for (idx, col) in batch.columns.iter().enumerate() {
468 if idx >= combined_columns.len() {
469 return Err(Error::InvalidFormat("column index out of bounds".into()));
470 }
471 combined_columns[idx] =
472 Some(append_column(combined_columns[idx].take(), col.clone())?);
473 }
474 for (idx, bm) in batch.null_bitmaps.iter().enumerate() {
475 if idx >= combined_bitmaps.len() {
476 return Err(Error::InvalidFormat("bitmap index out of bounds".into()));
477 }
478 combined_bitmaps[idx] = append_bitmap(combined_bitmaps[idx].take(), bm.clone());
479 }
480 }
481
482 let vectors_col = combined_columns
484 .first()
485 .and_then(|c| c.clone())
486 .ok_or_else(|| Error::InvalidFormat("missing vectors column".into()))?;
487 let vec_bitmap = combined_bitmaps.first().cloned().unwrap_or(None);
488 let vectors =
489 encode_vectors_from_fixed(vectors_col, vec_bitmap.clone(), envelope.dimension)?;
490
491 let keys_col = combined_columns
493 .get(1)
494 .and_then(|c| c.clone())
495 .ok_or_else(|| Error::InvalidFormat("missing keys column".into()))?;
496 let keys_bitmap = combined_bitmaps.get(1).cloned().unwrap_or(None);
497 let keys =
498 encode_generic_column(keys_col, keys_bitmap, LogicalType::Int64, EncodingV2::Plain)?;
499
500 let deleted_col = combined_columns
502 .get(2)
503 .and_then(|c| c.clone())
504 .ok_or_else(|| Error::InvalidFormat("missing deleted column".into()))?;
505 let deleted = column_to_bitmap(deleted_col, num_vectors as usize)?;
506
507 let mut metadata_cols = Vec::new();
509 for (idx, col_opt) in combined_columns.iter().enumerate().skip(3) {
510 let col = col_opt
511 .clone()
512 .ok_or_else(|| Error::InvalidFormat("missing metadata column".into()))?;
513 let bm = combined_bitmaps.get(idx).cloned().unwrap_or(None);
514 let logical_type = column_logical_type(&col)?;
515 let encoded = encode_generic_column(col, bm, logical_type, EncodingV2::Plain)?;
516 metadata_cols.push(encoded);
517 }
518
519 let segment = VectorSegment {
520 segment_id: envelope.segment_id,
521 dimension: envelope.dimension,
522 metric: envelope.metric,
523 num_vectors,
524 vectors,
525 keys,
526 deleted,
527 metadata: if metadata_cols.is_empty() {
528 None
529 } else {
530 Some(metadata_cols)
531 },
532 statistics: envelope.statistics,
533 };
534 segment.validate()?;
535 Ok(segment)
536 }
537
538 fn decode_column(&self, col: &EncodedColumn) -> Result<(Column, Option<Bitmap>)> {
539 let decoder = create_decoder(col.encoding);
540 let encoded_bytes = col.data.clone();
541
542 decoder
543 .decode(&encoded_bytes, col.num_values as usize, col.logical_type)
544 .map_err(|e| Error::InvalidFormat(e.to_string()))
545 }
546}
547
548fn column_logical_type(col: &Column) -> Result<LogicalType> {
549 match col {
550 Column::Int64(_) => Ok(LogicalType::Int64),
551 Column::Float32(_) => Ok(LogicalType::Float32),
552 Column::Float64(_) => Ok(LogicalType::Float64),
553 Column::Bool(_) => Ok(LogicalType::Bool),
554 Column::Binary(_) => Ok(LogicalType::Binary),
555 Column::Fixed { len, .. } => {
556 Ok(LogicalType::Fixed((*len).try_into().map_err(|_| {
557 Error::InvalidFormat("fixed length too large".into())
558 })?))
559 }
560 }
561}
562
563fn column_length(col: &Column) -> usize {
564 match col {
565 Column::Int64(v) => v.len(),
566 Column::Float32(v) => v.len(),
567 Column::Float64(v) => v.len(),
568 Column::Bool(v) => v.len(),
569 Column::Binary(v) => v.len(),
570 Column::Fixed { values, .. } => values.len(),
571 }
572}
573
574fn append_column(current: Option<Column>, next: Column) -> Result<Column> {
575 match (current, next) {
576 (None, n) => Ok(n),
577 (Some(Column::Int64(mut a)), Column::Int64(b)) => {
578 a.extend_from_slice(&b);
579 Ok(Column::Int64(a))
580 }
581 (Some(Column::Float32(mut a)), Column::Float32(b)) => {
582 a.extend_from_slice(&b);
583 Ok(Column::Float32(a))
584 }
585 (Some(Column::Float64(mut a)), Column::Float64(b)) => {
586 a.extend_from_slice(&b);
587 Ok(Column::Float64(a))
588 }
589 (Some(Column::Bool(mut a)), Column::Bool(b)) => {
590 a.extend_from_slice(&b);
591 Ok(Column::Bool(a))
592 }
593 (Some(Column::Binary(mut a)), Column::Binary(b)) => {
594 a.extend_from_slice(&b);
595 Ok(Column::Binary(a))
596 }
597 (
598 Some(Column::Fixed { len, mut values }),
599 Column::Fixed {
600 len: len2,
601 values: v,
602 },
603 ) => {
604 if len != len2 {
605 return Err(Error::InvalidFormat("fixed length mismatch".into()));
606 }
607 values.extend_from_slice(&v);
608 Ok(Column::Fixed { len, values })
609 }
610 _ => Err(Error::InvalidFormat(
611 "column type mismatch when merging row groups".into(),
612 )),
613 }
614}
615
616fn append_bitmap(current: Option<Bitmap>, next: Option<Bitmap>) -> Option<Bitmap> {
617 match (current, next) {
618 (None, None) => None,
619 (Some(b), None) => Some(b),
620 (None, Some(b)) => Some(b),
621 (Some(a), Some(b)) => {
622 let mut merged: Vec<bool> = Vec::with_capacity(a.len() + b.len());
623 for i in 0..a.len() {
624 merged.push(a.get(i));
625 }
626 for i in 0..b.len() {
627 merged.push(b.get(i));
628 }
629 Some(Bitmap::from_bools(&merged))
630 }
631 }
632}
633
634fn encode_vectors_from_fixed(
635 col: Column,
636 bitmap: Option<Bitmap>,
637 dimension: usize,
638) -> Result<EncodedColumn> {
639 let values = match col {
640 Column::Binary(values) => values,
641 Column::Fixed { values, len } => {
642 if len != dimension * 4 {
643 return Err(Error::InvalidFormat(
644 "vectors fixed length mismatch dimension".into(),
645 ));
646 }
647 values
648 }
649 other => {
650 return Err(Error::InvalidFormat(format!(
651 "vectors column must be Binary/Fixed, got {:?}",
652 other
653 )))
654 }
655 };
656 let expected_len = dimension
657 .checked_mul(4)
658 .ok_or_else(|| Error::InvalidFormat("dimension overflow".into()))?;
659
660 let mut floats = Vec::with_capacity(values.len() * dimension);
661 for chunk in values {
662 if chunk.len() != expected_len {
663 return Err(Error::InvalidFormat(
664 "vector payload length mismatch".into(),
665 ));
666 }
667 for bytes in chunk.chunks_exact(4) {
668 floats.push(f32::from_le_bytes(
669 bytes
670 .try_into()
671 .map_err(|_| Error::InvalidFormat("vector chunk".into()))?,
672 ));
673 }
674 }
675
676 let encoder = create_encoder(EncodingV2::ByteStreamSplit);
677 let encoded = encoder
678 .encode(&Column::Float32(floats.clone()), bitmap.as_ref())
679 .map_err(|e| Error::InvalidFormat(e.to_string()))?;
680
681 Ok(EncodedColumn {
682 logical_type: LogicalType::Float32,
683 encoding: EncodingV2::ByteStreamSplit,
684 num_values: floats.len() as u64,
685 data: encoded,
686 null_bitmap: bitmap,
687 })
688}
689
690fn encode_generic_column(
691 col: Column,
692 bitmap: Option<Bitmap>,
693 logical_type: LogicalType,
694 encoding: EncodingV2,
695) -> Result<EncodedColumn> {
696 let col = match logical_type {
697 LogicalType::Fixed(len) => ensure_fixed_column(col, len as usize)?,
698 _ => col,
699 };
700 let encoder = create_encoder(encoding);
701 let encoded = encoder
702 .encode(&col, bitmap.as_ref())
703 .map_err(|e| Error::InvalidFormat(e.to_string()))?;
704
705 Ok(EncodedColumn {
706 logical_type,
707 encoding,
708 num_values: column_length(&col) as u64,
709 data: encoded,
710 null_bitmap: bitmap,
711 })
712}
713
714fn slice_column(col: &Column, indices: &[usize]) -> Result<Column> {
715 Ok(match col {
716 Column::Int64(v) => Column::Int64(take_indices(v, indices)?),
717 Column::Float32(v) => Column::Float32(take_indices(v, indices)?),
718 Column::Float64(v) => Column::Float64(take_indices(v, indices)?),
719 Column::Bool(v) => Column::Bool(take_indices(v, indices)?),
720 Column::Binary(v) => Column::Binary(take_indices(v, indices)?),
721 Column::Fixed { len, values } => Column::Fixed {
722 len: *len,
723 values: take_indices(values, indices)?,
724 },
725 })
726}
727
728fn slice_bitmap(bm: Option<Bitmap>, indices: &[usize]) -> Option<Bitmap> {
729 bm.map(|source| {
730 let mut sliced = Bitmap::new_zeroed(indices.len());
731 for (dst_idx, src_idx) in indices.iter().enumerate() {
732 if source.get(*src_idx) {
733 sliced.set(dst_idx, true);
734 }
735 }
736 sliced
737 })
738}
739
740fn take_indices<T: Clone>(values: &[T], indices: &[usize]) -> Result<Vec<T>> {
741 let mut out = Vec::with_capacity(indices.len());
742 for &idx in indices {
743 out.push(
744 values
745 .get(idx)
746 .cloned()
747 .ok_or_else(|| Error::InvalidFormat("index out of bounds".into()))?,
748 );
749 }
750 Ok(out)
751}
752
753fn column_to_bitmap(col: Column, expected_len: usize) -> Result<Bitmap> {
754 match col {
755 Column::Bool(values) => {
756 if values.len() != expected_len {
757 return Err(Error::InvalidFormat(
758 "deleted length mismatch num_vectors".into(),
759 ));
760 }
761 Ok(if values.iter().all(|v| !*v) {
762 Bitmap::new(expected_len)
763 } else if values.iter().all(|v| *v) {
764 Bitmap::all_valid(expected_len)
765 } else {
766 Bitmap::from_bools(&values)
767 })
768 }
769 other => Err(Error::InvalidFormat(format!(
770 "deleted column must be Bool, got {:?}",
771 other
772 ))),
773 }
774}
775
776fn ensure_fixed_column(col: Column, len: usize) -> Result<Column> {
777 match col {
778 Column::Fixed { len: l, values } => {
779 if l != len {
780 return Err(Error::InvalidFormat(
781 "fixed column length mismatch expected length".into(),
782 ));
783 }
784 Ok(Column::Fixed { len, values })
785 }
786 Column::Binary(values) => {
787 if values.iter().any(|v| v.len() != len) {
788 return Err(Error::InvalidFormat(
789 "binary column has variable-length values for Fixed type".into(),
790 ));
791 }
792 Ok(Column::Fixed { len, values })
793 }
794 other => Err(Error::InvalidFormat(format!(
795 "column must be Fixed/Binary for Fixed logical type, got {:?}",
796 other
797 ))),
798 }
799}
800
801pub mod key_layout {
803 pub fn vector_segment_key(segment_id: u64) -> Vec<u8> {
805 format!("vector_segment:{segment_id}").into_bytes()
806 }
807}
808
809#[derive(Clone, Debug, Serialize, Deserialize)]
818pub struct VectorStoreConfig {
819 pub dimension: usize,
821 pub metric: Metric,
823 pub segment_max_vectors: usize,
825 pub compaction_threshold: f32,
827 pub encoding: EncodingV2,
829}
830
831impl Default for VectorStoreConfig {
832 fn default() -> Self {
833 Self {
834 dimension: 128,
835 metric: Metric::Cosine,
836 segment_max_vectors: 65_536,
837 compaction_threshold: 0.3,
838 encoding: EncodingV2::ByteStreamSplit,
839 }
840 }
841}
842
843#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
845pub struct AppendResult {
846 pub vectors_added: usize,
848 pub segments_created: usize,
850}
851
852#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
854pub struct VectorSearchParams {
855 pub query: Vec<f32>,
857 pub metric: Metric,
859 pub top_k: usize,
861 pub projection: Option<Vec<usize>>,
863 pub filter_mask: Option<Vec<bool>>,
865}
866
867#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
869pub struct VectorSearchResult {
870 pub row_id: i64,
872 pub score: f32,
874 pub columns: Vec<ScalarValue>,
876}
877
878#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
880pub struct SearchStats {
881 pub segments_scanned: u64,
883 pub segments_pruned: u64,
885 pub rows_scanned: u64,
887 pub rows_matched: u64,
889}
890
891#[derive(Debug)]
893pub struct VectorStoreManager {
894 config: VectorStoreConfig,
895 segments: Vec<VectorSegment>,
896 next_segment_id: u64,
897}
898
899impl VectorStoreManager {
900 pub fn new(config: VectorStoreConfig) -> Self {
902 Self {
903 config,
904 segments: Vec::new(),
905 next_segment_id: 0,
906 }
907 }
908
909 pub fn segments(&self) -> &[VectorSegment] {
915 &self.segments
916 }
917
918 pub fn config(&self) -> &VectorStoreConfig {
923 &self.config
924 }
925
926 pub fn next_segment_id(&self) -> u64 {
931 self.next_segment_id
932 }
933
934 pub fn from_segments(
942 config: VectorStoreConfig,
943 segments: Vec<VectorSegment>,
944 next_segment_id: u64,
945 ) -> Self {
946 Self {
947 config,
948 segments,
949 next_segment_id,
950 }
951 }
952
953 pub async fn append_batch(
972 &mut self,
973 keys: &[i64],
974 vectors: &[Vec<f32>],
975 ) -> Result<AppendResult> {
976 if keys.len() != vectors.len() {
977 return Err(Error::InvalidFormat("keys/vectors length mismatch".into()));
978 }
979 if vectors.is_empty() {
980 return Ok(AppendResult::default());
981 }
982 let dim = self.config.dimension;
983 for (idx, v) in vectors.iter().enumerate() {
984 if v.len() != dim {
985 return Err(Error::DimensionMismatch {
986 expected: dim,
987 actual: v.len(),
988 });
989 }
990 if contains_nan_or_inf(v) {
991 return Err(Error::InvalidVector {
992 index: idx,
993 reason: "vector contains NaN or Inf".into(),
994 });
995 }
996 }
997
998 let mut vectors_added = 0usize;
999 let mut segments_created = 0usize;
1000 let mut start = 0usize;
1001 while start < vectors.len() {
1002 let end = usize::min(start + self.config.segment_max_vectors, vectors.len());
1003 let slice = &vectors[start..end];
1004 let key_slice = &keys[start..end];
1005
1006 let segment = self.build_segment(key_slice, slice)?;
1007 self.segments.push(segment);
1008 self.next_segment_id += 1;
1009 vectors_added += slice.len();
1010 segments_created += 1;
1011 start = end;
1012 }
1013
1014 Ok(AppendResult {
1015 vectors_added,
1016 segments_created,
1017 })
1018 }
1019
1020 pub fn segments_needing_compaction(&self) -> Vec<u64> {
1037 let threshold = self.config.compaction_threshold;
1038 if threshold >= 1.0 {
1039 return Vec::new();
1040 }
1041 let mut ids = Vec::new();
1042 for seg in &self.segments {
1043 if seg.statistics.deletion_ratio >= threshold && seg.statistics.deletion_ratio > 0.0 {
1044 ids.push(seg.segment_id);
1045 }
1046 }
1047 ids
1048 }
1049
1050 pub async fn delete_batch(&mut self, keys: &[i64]) -> Result<DeleteResult> {
1064 if keys.is_empty() {
1065 return Ok(DeleteResult::default());
1066 }
1067 let key_set: HashSet<i64> = keys.iter().copied().collect();
1068 let mut result = DeleteResult::default();
1069
1070 for segment in &mut self.segments {
1071 let decoded_keys = segment.decode_keys()?;
1072 let mut modified = false;
1073 for (idx, key) in decoded_keys.iter().enumerate() {
1074 if !key_set.contains(key) {
1075 continue;
1076 }
1077 if !segment.deleted.get(idx) {
1078 segment.deleted.set(idx, true);
1079 result.vectors_deleted = result.vectors_deleted.saturating_add(1);
1080 modified = true;
1081 }
1082 }
1083
1084 if modified {
1085 segment.recompute_deletion_stats();
1086 result.segments_modified.push(segment.segment_id);
1087 }
1088 }
1089
1090 Ok(result)
1091 }
1092
1093 pub async fn compact_segment(&mut self, segment_id: u64) -> Result<CompactionResult> {
1112 let pos = self
1113 .segments
1114 .iter()
1115 .position(|s| s.segment_id == segment_id)
1116 .ok_or(Error::NotFound)?;
1117
1118 let old = self.segments.get(pos).cloned().ok_or(Error::NotFound)?;
1119 let old_size = old.to_bytes().map(|b| b.len() as u64).unwrap_or(0);
1120
1121 let active_indices: Vec<usize> = (0..old.num_vectors as usize)
1122 .filter(|&i| !old.deleted.get(i))
1123 .collect();
1124
1125 if active_indices.is_empty() {
1126 self.segments.remove(pos);
1127 return Ok(CompactionResult {
1128 old_segment_id: segment_id,
1129 new_segment_id: None,
1130 vectors_removed: old.num_vectors,
1131 space_reclaimed: old_size,
1132 });
1133 }
1134
1135 let decoded_vectors = old.decode_vectors()?;
1136 let decoded_keys = old.decode_keys()?;
1137
1138 let mut new_vecs = Vec::with_capacity(active_indices.len());
1139 for &idx in &active_indices {
1140 let start = idx * self.config.dimension;
1141 let end = start + self.config.dimension;
1142 new_vecs.push(decoded_vectors[start..end].to_vec());
1143 }
1144 let new_keys: Vec<i64> = active_indices
1145 .iter()
1146 .map(|&i| {
1147 decoded_keys
1148 .get(i)
1149 .copied()
1150 .ok_or_else(|| Error::InvalidFormat("missing key".into()))
1151 })
1152 .collect::<Result<_>>()?;
1153
1154 let mut new_segment = self.build_segment(&new_keys, &new_vecs)?;
1155
1156 if let Some(meta_cols) = &old.metadata {
1158 let mut new_meta = Vec::with_capacity(meta_cols.len());
1159 for col in meta_cols {
1160 let (decoded_col, bitmap) = old.decode_column(col)?;
1161 let sliced_col = slice_column(&decoded_col, &active_indices)?;
1162 let sliced_bitmap = slice_bitmap(bitmap, &active_indices);
1163 let encoded = encode_generic_column(
1164 sliced_col,
1165 sliced_bitmap,
1166 col.logical_type,
1167 col.encoding,
1168 )?;
1169 new_meta.push(encoded);
1170 }
1171 if !new_meta.is_empty() {
1172 new_segment.metadata = Some(new_meta);
1173 }
1174 }
1175
1176 let new_segment_id = self.next_segment_id;
1178 new_segment.segment_id = new_segment_id; self.next_segment_id += 1;
1180 let new_size = new_segment.to_bytes().map(|b| b.len() as u64).unwrap_or(0);
1181 let space_reclaimed = old_size.saturating_sub(new_size);
1182 let vectors_removed = old.num_vectors.saturating_sub(new_segment.num_vectors);
1183
1184 self.segments[pos] = new_segment;
1185
1186 Ok(CompactionResult {
1187 old_segment_id: segment_id,
1188 new_segment_id: Some(new_segment_id),
1189 vectors_removed,
1190 space_reclaimed,
1191 })
1192 }
1193
1194 pub fn search(&self, params: VectorSearchParams) -> Result<Vec<VectorSearchResult>> {
1200 let mut stats = SearchStats::default();
1201 let (results, _) = self.search_internal(params, &mut stats)?;
1202 Ok(results)
1203 }
1204
1205 pub fn search_with_stats(
1209 &self,
1210 params: VectorSearchParams,
1211 ) -> Result<(Vec<VectorSearchResult>, SearchStats)> {
1212 let mut stats = SearchStats::default();
1213 let (results, stats) = self.search_internal(params, &mut stats)?;
1214 Ok((results, stats))
1215 }
1216
1217 fn search_internal(
1218 &self,
1219 params: VectorSearchParams,
1220 stats: &mut SearchStats,
1221 ) -> Result<(Vec<VectorSearchResult>, SearchStats)> {
1222 if params.top_k == 0 {
1223 return Ok((Vec::new(), stats.clone()));
1224 }
1225 if params.query.len() != self.config.dimension {
1226 return Err(Error::DimensionMismatch {
1227 expected: self.config.dimension,
1228 actual: params.query.len(),
1229 });
1230 }
1231 if contains_nan_or_inf(¶ms.query) {
1232 return Err(Error::InvalidVector {
1233 index: 0,
1234 reason: "query contains NaN or Inf".into(),
1235 });
1236 }
1237
1238 let mut candidates: Vec<VectorSearchResult> = Vec::new();
1239 let query_norm = params.query.iter().map(|v| v * v).sum::<f32>().sqrt();
1240 let mut row_offset = 0u64;
1241 for segment in &self.segments {
1242 if segment.statistics.deletion_ratio >= 1.0 {
1243 stats.segments_pruned += 1;
1244 row_offset += segment.num_vectors;
1245 continue;
1246 }
1247 if query_norm < segment.statistics.norm_min || query_norm > segment.statistics.norm_max
1249 {
1250 stats.segments_pruned += 1;
1251 row_offset += segment.num_vectors;
1252 continue;
1253 }
1254 stats.segments_scanned += 1;
1255 stats.rows_scanned = stats.rows_scanned.saturating_add(segment.num_vectors);
1256 let decoded = segment.decode_vectors()?;
1257 let decoded_keys = segment.decode_keys()?;
1258 let metadata = decode_metadata(&segment.metadata, segment.num_vectors as usize)?;
1259 let kernel = select_kernel();
1260 let mask = params.filter_mask.as_ref();
1261 for (idx, chunk) in decoded.chunks(self.config.dimension).enumerate() {
1262 if segment.deleted.get(idx) {
1264 continue;
1265 }
1266 if let Some(mask_vec) = mask {
1267 let global_idx = row_offset as usize + idx;
1268 if global_idx >= mask_vec.len() || !mask_vec[global_idx] {
1269 continue;
1270 }
1271 }
1272 let score = match params.metric {
1273 Metric::Cosine => kernel.cosine(¶ms.query, chunk),
1274 Metric::L2 => kernel.l2(¶ms.query, chunk),
1275 Metric::InnerProduct => kernel.inner_product(¶ms.query, chunk),
1276 };
1277 let row_id = *decoded_keys
1278 .get(idx)
1279 .ok_or_else(|| Error::InvalidFormat("missing key".into()))?;
1280 let columns = if let Some(proj) = ¶ms.projection {
1281 let mut cols = Vec::with_capacity(proj.len());
1282 for &p in proj {
1283 let col = metadata.get(p).ok_or_else(|| {
1284 Error::InvalidFormat("projection out of bounds".into())
1285 })?;
1286 cols.push(col.get(idx).cloned().ok_or_else(|| {
1287 Error::InvalidFormat("projection row out of bounds".into())
1288 })?);
1289 }
1290 cols
1291 } else {
1292 Vec::new()
1293 };
1294 candidates.push(VectorSearchResult {
1295 row_id,
1296 score,
1297 columns,
1298 });
1299 stats.rows_matched += 1;
1300 }
1301 row_offset += segment.num_vectors;
1302 }
1303
1304 candidates.sort_by(|a, b| {
1305 b.score
1306 .partial_cmp(&a.score)
1307 .unwrap_or(std::cmp::Ordering::Equal)
1308 .then_with(|| a.row_id.cmp(&b.row_id))
1309 });
1310 candidates.truncate(params.top_k);
1311 Ok((candidates, stats.clone()))
1312 }
1313
1314 fn build_segment(&mut self, keys: &[i64], vectors: &[Vec<f32>]) -> Result<VectorSegment> {
1315 let mut flattened = Vec::with_capacity(vectors.len() * self.config.dimension);
1316 for v in vectors {
1317 flattened.extend_from_slice(v);
1318 }
1319 let vec_enc = encode_generic_column(
1320 Column::Float32(flattened),
1321 None,
1322 LogicalType::Float32,
1323 self.config.encoding,
1324 )?;
1325 let key_enc = encode_generic_column(
1326 Column::Int64(keys.to_vec()),
1327 None,
1328 LogicalType::Int64,
1329 EncodingV2::Plain,
1330 )?;
1331 let deleted = Bitmap::new_zeroed(keys.len());
1332 let stats = compute_stats(vectors, Some(&deleted));
1333 let segment = VectorSegment {
1334 segment_id: self.next_segment_id,
1335 dimension: self.config.dimension,
1336 metric: self.config.metric,
1337 num_vectors: keys.len() as u64,
1338 vectors: vec_enc,
1339 keys: key_enc,
1340 deleted,
1341 metadata: None,
1342 statistics: stats,
1343 };
1344 Ok(segment)
1345 }
1346}
1347
1348fn compute_stats(vectors: &[Vec<f32>], deleted: Option<&Bitmap>) -> VectorSegmentStatistics {
1349 let row_count = vectors.len() as u64;
1350 let null_count = 0;
1351 let deleted_count = (0..vectors.len())
1352 .filter(|&i| deleted.is_some_and(|bm| bm.get(i)))
1353 .count() as u64;
1354 let active_count = row_count.saturating_sub(deleted_count);
1355 let mut norm_min = f32::MAX;
1356 let mut norm_max = f32::MIN;
1357 for (idx, v) in vectors.iter().enumerate() {
1358 if deleted.is_some_and(|bm| bm.get(idx)) {
1359 continue;
1360 }
1361 let norm = v.iter().map(|x| x * x).sum::<f32>().sqrt();
1362 norm_min = norm_min.min(norm);
1363 norm_max = norm_max.max(norm);
1364 }
1365 if active_count == 0 {
1366 norm_min = 0.0;
1367 norm_max = 0.0;
1368 }
1369 let deletion_ratio = if row_count > 0 {
1370 deleted_count as f32 / row_count as f32
1371 } else {
1372 0.0
1373 };
1374
1375 VectorSegmentStatistics {
1376 row_count,
1377 null_count,
1378 active_count,
1379 deleted_count,
1380 deletion_ratio,
1381 norm_min,
1382 norm_max,
1383 min_values: Vec::new(),
1384 max_values: Vec::new(),
1385 created_at: 0,
1386 }
1387}
1388
1389fn contains_nan_or_inf(vec: &[f32]) -> bool {
1390 vec.iter().any(|v| !v.is_finite())
1391}
1392
1393fn decode_metadata(
1394 metadata: &Option<Vec<EncodedColumn>>,
1395 rows: usize,
1396) -> Result<Vec<Vec<ScalarValue>>> {
1397 if let Some(cols) = metadata {
1398 let mut decoded_cols = Vec::with_capacity(cols.len());
1399 for col in cols {
1400 let decoder = create_decoder(col.encoding);
1401 let (column, _) = decoder
1402 .decode(&col.data, col.num_values as usize, col.logical_type)
1403 .map_err(|e| Error::InvalidFormat(e.to_string()))?;
1404 let values = column_to_scalar_values(column)?;
1405 if values.len() != rows {
1406 return Err(Error::InvalidFormat(
1407 "metadata column length mismatch num_vectors".into(),
1408 ));
1409 }
1410 decoded_cols.push(values);
1411 }
1412 Ok(decoded_cols)
1413 } else {
1414 Ok(Vec::new())
1415 }
1416}
1417
1418fn column_to_scalar_values(column: Column) -> Result<Vec<ScalarValue>> {
1419 Ok(match column {
1420 Column::Int64(v) => v.into_iter().map(ScalarValue::Int64).collect(),
1421 Column::Float32(v) => v.into_iter().map(ScalarValue::Float32).collect(),
1422 Column::Float64(v) => v.into_iter().map(ScalarValue::Float64).collect(),
1423 Column::Bool(v) => v.into_iter().map(ScalarValue::Bool).collect(),
1424 Column::Binary(v) => v.into_iter().map(ScalarValue::Binary).collect(),
1425 Column::Fixed { values, .. } => values.into_iter().map(ScalarValue::Binary).collect(),
1426 })
1427}
1428
1429#[cfg(all(test, not(target_arch = "wasm32")))]
1430mod tests {
1431 use super::*;
1432 use crate::columnar::encoding_v2::EncodingV2;
1433 use crate::kv::{KVStore, KVTransaction};
1434 use crate::txn::TxnManager;
1435 use crate::types::TxnMode;
1436 use crate::vector::simd::DistanceKernel;
1437 use crate::MemoryKV;
1438 use crate::ScalarKernel;
1439 use std::future::Future;
1440 use std::sync::Arc;
1441 use std::task::{Context, Poll, Wake, Waker};
1442
1443 fn encode_f32(values: &[f32]) -> EncodedColumn {
1444 let encoder = create_encoder(EncodingV2::ByteStreamSplit);
1445 let data = encoder
1446 .encode(&Column::Float32(values.to_vec()), None)
1447 .unwrap();
1448 EncodedColumn {
1449 logical_type: LogicalType::Float32,
1450 encoding: EncodingV2::ByteStreamSplit,
1451 num_values: values.len() as u64,
1452 data,
1453 null_bitmap: None,
1454 }
1455 }
1456
1457 fn encode_i64(values: &[i64]) -> EncodedColumn {
1458 let encoder = create_encoder(EncodingV2::Plain);
1459 let data = encoder
1460 .encode(&Column::Int64(values.to_vec()), None)
1461 .unwrap();
1462 EncodedColumn {
1463 logical_type: LogicalType::Int64,
1464 encoding: EncodingV2::Plain,
1465 num_values: values.len() as u64,
1466 data,
1467 null_bitmap: None,
1468 }
1469 }
1470
1471 fn sample_segment() -> VectorSegment {
1472 let vectors = vec![1.0f32, 2.0, 3.0, 4.0];
1473 VectorSegment {
1474 segment_id: 42,
1475 dimension: 4,
1476 metric: Metric::Cosine,
1477 num_vectors: 1,
1478 vectors: encode_f32(&vectors),
1479 keys: encode_i64(&[0]),
1480 deleted: Bitmap::new_zeroed(1),
1481 metadata: None,
1482 statistics: VectorSegmentStatistics {
1483 row_count: 1,
1484 null_count: 0,
1485 active_count: 1,
1486 deleted_count: 0,
1487 deletion_ratio: 0.0,
1488 norm_min: 0.0,
1489 norm_max: 0.0,
1490 min_values: Vec::new(),
1491 max_values: Vec::new(),
1492 created_at: 1_735_000_000,
1493 },
1494 }
1495 }
1496
1497 #[test]
1498 fn roundtrip_with_checksum_and_segment_v2() {
1499 let seg = sample_segment();
1500 let bytes = seg.to_bytes().unwrap();
1501 let restored = VectorSegment::from_bytes(&bytes).unwrap();
1502 assert_eq!(restored.segment_id, seg.segment_id);
1503 assert_eq!(restored.dimension, seg.dimension);
1504 assert_eq!(restored.metric, seg.metric);
1505 assert_eq!(restored.num_vectors, seg.num_vectors);
1506 assert_eq!(restored.vectors.logical_type, LogicalType::Float32);
1507 assert_eq!(restored.keys.logical_type, LogicalType::Int64);
1508 assert_eq!(restored.deleted, seg.deleted);
1509 assert_eq!(restored.statistics.row_count, seg.statistics.row_count);
1510 }
1511
1512 #[test]
1513 fn checksum_mismatch_detected() {
1514 let seg = sample_segment();
1515 let mut bytes = seg.to_bytes().unwrap();
1516 let last = bytes.len() - 1;
1517 bytes[last] ^= 0xAA;
1518 let err = VectorSegment::from_bytes(&bytes).unwrap_err();
1519 assert!(matches!(err, Error::ChecksumMismatch));
1520 }
1521
1522 #[test]
1523 fn vector_segment_key_layout() {
1524 let key = key_layout::vector_segment_key(123);
1525 assert_eq!(key, b"vector_segment:123");
1526 }
1527
1528 #[test]
1529 fn validate_rejects_mismatched_lengths() {
1530 let mut seg = sample_segment();
1531 seg.num_vectors = 2; let err = seg.to_bytes().unwrap_err();
1533 assert!(matches!(err, Error::InvalidFormat(_)));
1534 }
1535
1536 #[test]
1537 fn compute_stats_updates_norms_and_counts() {
1538 let vectors = vec![vec![3.0f32, 4.0], vec![0.0f32, 0.0]];
1539 let stats = compute_stats(&vectors, None);
1540 assert_eq!(stats.row_count, 2);
1541 assert_eq!(stats.active_count, 2);
1542 assert_eq!(stats.deleted_count, 0);
1543 assert!((stats.norm_min - 0.0).abs() < 1e-6);
1545 assert!((stats.norm_max - 5.0).abs() < 1e-6);
1546 }
1547
1548 #[test]
1549 fn compute_stats_respects_deleted_bitmap() {
1550 let vectors = vec![vec![1.0f32], vec![2.0f32]];
1551 let mut deleted = Bitmap::new_zeroed(2);
1552 deleted.set(1, true);
1553
1554 let stats = compute_stats(&vectors, Some(&deleted));
1555 assert_eq!(stats.row_count, 2);
1556 assert_eq!(stats.active_count, 1);
1557 assert_eq!(stats.deleted_count, 1);
1558 assert!((stats.deletion_ratio - 0.5).abs() < 1e-6);
1559 assert!((stats.norm_min - 1.0).abs() < 1e-6);
1560 assert!((stats.norm_max - 1.0).abs() < 1e-6);
1561 }
1562
1563 #[test]
1564 fn delete_batch_marks_keys_and_updates_stats() {
1565 let mut mgr = VectorStoreManager::new(VectorStoreConfig {
1566 dimension: 2,
1567 metric: Metric::InnerProduct,
1568 segment_max_vectors: 2,
1569 ..Default::default()
1570 });
1571 let keys = vec![10, 11, 12];
1572 let vecs = vec![vec![1.0, 0.0], vec![0.5, 0.5], vec![0.0, 1.0]];
1573 block_on(mgr.append_batch(&keys, &vecs)).unwrap();
1574 let seg0_id = mgr.segments[0].segment_id;
1575 let seg1_id = mgr.segments[1].segment_id;
1576
1577 if let Some(seg) = mgr.segments.get_mut(0) {
1579 seg.deleted.set(1, true);
1580 seg.recompute_deletion_stats();
1581 }
1582
1583 let res = block_on(mgr.delete_batch(&[11, 12, 999])).unwrap();
1584 assert_eq!(res.vectors_deleted, 1);
1585 assert_eq!(res.segments_modified, vec![seg1_id]);
1586
1587 assert_eq!(mgr.segments[0].segment_id, seg0_id);
1589 assert_eq!(mgr.segments[0].statistics.deleted_count, 1);
1590 assert_eq!(mgr.segments[0].statistics.active_count, 1);
1591 assert_eq!(mgr.segments[1].segment_id, seg1_id);
1592 assert_eq!(mgr.segments[1].statistics.deleted_count, 1);
1593 assert_eq!(mgr.segments[1].statistics.active_count, 0);
1594 assert!((mgr.segments[1].statistics.deletion_ratio - 1.0).abs() < 1e-6);
1595 }
1596
1597 #[test]
1598 fn delete_batch_empty_input_noop() {
1599 let mut mgr = VectorStoreManager::new(VectorStoreConfig {
1600 dimension: 2,
1601 metric: Metric::InnerProduct,
1602 segment_max_vectors: 2,
1603 ..Default::default()
1604 });
1605 let keys = vec![10, 11];
1606 let vecs = vec![vec![1.0, 0.0], vec![0.0, 1.0]];
1607 block_on(mgr.append_batch(&keys, &vecs)).unwrap();
1608
1609 let res = block_on(mgr.delete_batch(&[])).unwrap();
1610 assert_eq!(res.vectors_deleted, 0);
1611 assert!(res.segments_modified.is_empty());
1612 assert_eq!(mgr.segments[0].statistics.deleted_count, 0);
1614 assert_eq!(mgr.segments[0].statistics.active_count, 2);
1615 }
1616
1617 #[test]
1618 fn delete_batch_ignores_nonexistent_and_already_deleted() {
1619 let mut mgr = VectorStoreManager::new(VectorStoreConfig {
1620 dimension: 2,
1621 metric: Metric::InnerProduct,
1622 segment_max_vectors: 2,
1623 ..Default::default()
1624 });
1625 let keys = vec![1, 2, 3];
1626 let vecs = vec![vec![1.0, 0.0], vec![0.0, 1.0], vec![0.2, 0.8]];
1627 block_on(mgr.append_batch(&keys, &vecs)).unwrap();
1628
1629 mgr.segments[0].deleted.set(1, true);
1631 mgr.segments[0].recompute_deletion_stats();
1632
1633 let res = block_on(mgr.delete_batch(&[2, 3, 999])).unwrap();
1634 assert_eq!(res.vectors_deleted, 1); assert_eq!(res.segments_modified, vec![mgr.segments[1].segment_id]);
1636
1637 assert_eq!(mgr.segments[0].statistics.deleted_count, 1);
1639 assert_eq!(mgr.segments[0].statistics.active_count, 1);
1640 assert_eq!(mgr.segments[1].statistics.deleted_count, 1);
1641 assert_eq!(mgr.segments[1].statistics.active_count, 0);
1642 }
1643
1644 #[test]
1645 fn segments_needing_compaction_respects_thresholds() {
1646 let mut mgr = VectorStoreManager::new(VectorStoreConfig {
1647 dimension: 2,
1648 metric: Metric::InnerProduct,
1649 segment_max_vectors: 2,
1650 compaction_threshold: 0.5,
1651 ..Default::default()
1652 });
1653 let keys = vec![1, 2, 3, 4];
1654 let vecs = vec![
1655 vec![1.0, 0.0],
1656 vec![0.0, 1.0],
1657 vec![0.5, 0.5],
1658 vec![0.2, 0.8],
1659 ];
1660 block_on(mgr.append_batch(&keys, &vecs)).unwrap();
1661 let seg0 = mgr.segments[0].segment_id;
1662
1663 if let Some(seg) = mgr.segments.get_mut(0) {
1665 seg.deleted.set(0, true);
1666 seg.recompute_deletion_stats();
1667 }
1668
1669 let mut ids = mgr.segments_needing_compaction();
1670 assert_eq!(ids, vec![seg0]);
1671
1672 mgr.config.compaction_threshold = 1.0;
1673 assert!(mgr.segments_needing_compaction().is_empty());
1674
1675 mgr.config.compaction_threshold = 0.0;
1676 ids = mgr.segments_needing_compaction();
1677 assert_eq!(ids, vec![seg0]);
1678
1679 block_on(mgr.compact_segment(seg0)).unwrap();
1681 mgr.config.compaction_threshold = 0.5;
1682 assert!(mgr.segments_needing_compaction().is_empty());
1683 }
1684
1685 #[test]
1686 fn compact_segment_removes_deleted_and_resets_stats() {
1687 let mut mgr = VectorStoreManager::new(VectorStoreConfig {
1688 dimension: 2,
1689 metric: Metric::InnerProduct,
1690 segment_max_vectors: 4,
1691 ..Default::default()
1692 });
1693 let keys = vec![1, 2, 3];
1694 let vecs = vec![vec![1.0, 0.0], vec![0.5, 0.5], vec![0.0, 1.0]];
1695 block_on(mgr.append_batch(&keys, &vecs)).unwrap();
1696 let old_id = mgr.segments[0].segment_id;
1697 mgr.segments[0].deleted.set(1, true);
1698 mgr.segments[0].recompute_deletion_stats();
1699
1700 let res = block_on(mgr.compact_segment(old_id)).unwrap();
1701 let new_id = res.new_segment_id.expect("new segment");
1702 assert_eq!(res.old_segment_id, old_id);
1703 assert_eq!(res.vectors_removed, 1);
1704
1705 let new_seg = mgr
1706 .segments
1707 .iter()
1708 .find(|s| s.segment_id == new_id)
1709 .expect("segment exists");
1710 assert_eq!(new_seg.num_vectors, 2);
1711 assert_eq!(new_seg.statistics.deleted_count, 0);
1712 assert_eq!(new_seg.statistics.active_count, 2);
1713 assert_eq!(new_seg.statistics.deletion_ratio, 0.0);
1714 }
1715
1716 #[test]
1717 fn compact_segment_handles_all_deleted() {
1718 let mut mgr = VectorStoreManager::new(VectorStoreConfig {
1719 dimension: 2,
1720 metric: Metric::InnerProduct,
1721 segment_max_vectors: 4,
1722 ..Default::default()
1723 });
1724 let keys = vec![1, 2];
1725 let vecs = vec![vec![1.0, 0.0], vec![0.0, 1.0]];
1726 block_on(mgr.append_batch(&keys, &vecs)).unwrap();
1727 let old_id = mgr.segments[0].segment_id;
1728 mgr.segments[0].deleted.set(0, true);
1729 mgr.segments[0].deleted.set(1, true);
1730 mgr.segments[0].recompute_deletion_stats();
1731
1732 let res = block_on(mgr.compact_segment(old_id)).unwrap();
1733 assert_eq!(res.old_segment_id, old_id);
1734 assert_eq!(res.new_segment_id, None);
1735 assert_eq!(res.vectors_removed, 2);
1736 assert!(mgr.segments.iter().all(|s| s.segment_id != old_id));
1737 }
1738
1739 #[test]
1740 fn compact_segment_errors_on_missing() {
1741 let mut mgr = VectorStoreManager::new(VectorStoreConfig {
1742 dimension: 2,
1743 metric: Metric::InnerProduct,
1744 ..Default::default()
1745 });
1746 let err = block_on(mgr.compact_segment(999)).unwrap_err();
1747 assert!(matches!(err, Error::NotFound));
1748 }
1749
1750 #[test]
1751 fn search_skips_deleted_rows_and_prunes_empty_segments() {
1752 let mut mgr = VectorStoreManager::new(VectorStoreConfig {
1753 dimension: 2,
1754 metric: Metric::InnerProduct,
1755 segment_max_vectors: 2,
1756 ..Default::default()
1757 });
1758 let keys = vec![1, 2, 3, 4];
1759 let vecs = vec![
1760 vec![1.0, 0.0], vec![0.0, 1.0], vec![0.5, 0.5], vec![0.2, 0.8], ];
1765 block_on(mgr.append_batch(&keys, &vecs)).unwrap();
1766
1767 mgr.segments[0].deleted.set(0, true);
1769 mgr.segments[0].deleted.set(1, true);
1770 mgr.segments[0].recompute_deletion_stats();
1771 mgr.segments[1].deleted.set(1, true);
1772 mgr.segments[1].recompute_deletion_stats();
1773
1774 let params = VectorSearchParams {
1775 query: vec![0.5, 0.5],
1776 metric: Metric::InnerProduct,
1777 top_k: 10,
1778 projection: None,
1779 filter_mask: None,
1780 };
1781 let (results, stats) = mgr.search_with_stats(params).unwrap();
1782
1783 assert_eq!(stats.segments_pruned, 1);
1785 assert_eq!(stats.segments_scanned, 1);
1786 assert_eq!(stats.rows_scanned, 2); assert_eq!(stats.rows_matched, 1); assert_eq!(results.len(), 1);
1789 assert_eq!(results[0].row_id, 3);
1790 }
1791
1792 #[test]
1793 fn delete_compact_search_flow() {
1794 let mut mgr = VectorStoreManager::new(VectorStoreConfig {
1795 dimension: 2,
1796 metric: Metric::InnerProduct,
1797 segment_max_vectors: 10,
1798 ..Default::default()
1799 });
1800 let keys = vec![1, 2, 3];
1801 let vecs = vec![vec![1.0, 0.0], vec![0.0, 1.0], vec![0.5, 0.5]];
1802 block_on(mgr.append_batch(&keys, &vecs)).unwrap();
1803 let seg_id = mgr.segments[0].segment_id;
1804
1805 let params = VectorSearchParams {
1807 query: vec![1.0, 0.0],
1808 metric: Metric::InnerProduct,
1809 top_k: 10,
1810 projection: None,
1811 filter_mask: None,
1812 };
1813 let (results, stats) = mgr.search_with_stats(params.clone()).unwrap();
1814 assert_eq!(results.len(), 3);
1815 assert_eq!(stats.segments_scanned, 1);
1816 assert_eq!(stats.rows_matched, 3);
1817
1818 let del_res = block_on(mgr.delete_batch(&[2])).unwrap();
1819 assert_eq!(del_res.vectors_deleted, 1);
1820 assert_eq!(mgr.segments[0].statistics.deleted_count, 1);
1821 assert_eq!(mgr.segments[0].statistics.active_count, 2);
1822
1823 let (results_after_del, stats_after_del) = mgr.search_with_stats(params.clone()).unwrap();
1824 assert_eq!(results_after_del.len(), 2);
1825 let ids: Vec<_> = results_after_del.iter().map(|r| r.row_id).collect();
1826 assert_eq!(ids, vec![1, 3]);
1827 assert_eq!(stats_after_del.rows_matched, 2);
1828
1829 let comp_res = block_on(mgr.compact_segment(seg_id)).unwrap();
1831 let new_id = comp_res.new_segment_id.expect("new segment");
1832 assert_eq!(comp_res.vectors_removed, 1);
1833 let seg = mgr
1834 .segments
1835 .iter()
1836 .find(|s| s.segment_id == new_id)
1837 .unwrap();
1838 assert_eq!(seg.statistics.deleted_count, 0);
1839 assert_eq!(seg.statistics.active_count, 2);
1840 assert_eq!(seg.statistics.deletion_ratio, 0.0);
1841
1842 let (results_after_compact, _) = mgr.search_with_stats(params.clone()).unwrap();
1843 let ids: Vec<_> = results_after_compact.iter().map(|r| r.row_id).collect();
1844 assert_eq!(ids, vec![1, 3]);
1845
1846 block_on(mgr.delete_batch(&[1, 3])).unwrap();
1848 let comp_res2 = block_on(mgr.compact_segment(new_id)).unwrap();
1849 assert_eq!(comp_res2.new_segment_id, None);
1850 assert!(mgr.segments.is_empty());
1851
1852 let (results_final, stats_final) = mgr.search_with_stats(params).unwrap();
1853 assert!(results_final.is_empty());
1854 assert_eq!(stats_final.segments_scanned, 0);
1855 }
1856
1857 #[test]
1858 fn vector_store_append_and_search_with_filter_and_projection() {
1859 let mut mgr = VectorStoreManager::new(VectorStoreConfig {
1860 dimension: 2,
1861 metric: Metric::InnerProduct,
1862 segment_max_vectors: 2,
1863 ..Default::default()
1864 });
1865 let keys = vec![10, 11, 12];
1866 let vecs = vec![vec![1.0, 0.0], vec![0.5, 0.5], vec![0.0, 1.0]];
1867 block_on(mgr.append_batch(&keys, &vecs)).unwrap();
1868
1869 if let Some(seg) = mgr.segments.get_mut(0) {
1871 let meta_col = encode_generic_column(
1872 Column::Int64(vec![100, 200]),
1873 None,
1874 LogicalType::Int64,
1875 EncodingV2::Plain,
1876 )
1877 .unwrap();
1878 seg.metadata = Some(vec![meta_col]);
1879 }
1880 if let Some(seg) = mgr.segments.get_mut(1) {
1881 let meta_col = encode_generic_column(
1882 Column::Int64(vec![300]),
1883 None,
1884 LogicalType::Int64,
1885 EncodingV2::Plain,
1886 )
1887 .unwrap();
1888 seg.metadata = Some(vec![meta_col]);
1889 }
1890
1891 let params = VectorSearchParams {
1893 query: vec![1.0, 0.0],
1894 metric: Metric::InnerProduct,
1895 top_k: 3,
1896 projection: Some(vec![0]),
1897 filter_mask: Some(vec![true, false, true]),
1898 };
1899 let (results, stats) = mgr.search_with_stats(params).unwrap();
1900 assert_eq!(stats.rows_scanned, 3);
1901 assert_eq!(stats.segments_scanned, 2);
1902 assert_eq!(stats.rows_matched, 2);
1903 assert_eq!(results.len(), 2);
1904 assert_eq!(results[0].row_id, 10);
1906 assert_eq!(results[0].columns, vec![ScalarValue::Int64(100)]);
1907 }
1908
1909 #[test]
1910 fn vector_store_topk_is_deterministic_on_ties() {
1911 let mut mgr = VectorStoreManager::new(VectorStoreConfig {
1912 dimension: 2,
1913 metric: Metric::InnerProduct,
1914 segment_max_vectors: 3,
1915 ..Default::default()
1916 });
1917 let keys = vec![20, 10, 30];
1919 let vecs = vec![vec![1.0, 0.0], vec![1.0, 0.0], vec![1.0, 0.0]];
1920 block_on(mgr.append_batch(&keys, &vecs)).unwrap();
1921
1922 let params = VectorSearchParams {
1923 query: vec![1.0, 0.0],
1924 metric: Metric::InnerProduct,
1925 top_k: 3,
1926 projection: None,
1927 filter_mask: None,
1928 };
1929 let (results, stats) = mgr.search_with_stats(params).unwrap();
1930 assert_eq!(stats.rows_scanned, 3);
1931 assert_eq!(results.len(), 3);
1932 let row_ids: Vec<_> = results.iter().map(|r| r.row_id).collect();
1933 assert_eq!(row_ids, vec![10, 20, 30]);
1934 }
1935
1936 #[test]
1937 fn vector_store_end_to_end_with_kvs_roundtrip() {
1938 let mut mgr = VectorStoreManager::new(VectorStoreConfig {
1939 dimension: 2,
1940 metric: Metric::InnerProduct,
1941 segment_max_vectors: 2,
1942 ..Default::default()
1943 });
1944 let keys = vec![1, 2, 3];
1945 let vecs = vec![vec![1.0, 0.0], vec![0.0, 1.0], vec![0.6, 0.8]];
1946 block_on(mgr.append_batch(&keys, &vecs)).unwrap();
1947
1948 if let Some(seg) = mgr.segments.get_mut(0) {
1950 let meta_col = encode_generic_column(
1951 Column::Int64(vec![100, 200]),
1952 None,
1953 LogicalType::Int64,
1954 EncodingV2::Plain,
1955 )
1956 .unwrap();
1957 seg.metadata = Some(vec![meta_col]);
1958 }
1959 if let Some(seg) = mgr.segments.get_mut(1) {
1960 let meta_col = encode_generic_column(
1961 Column::Int64(vec![300]),
1962 None,
1963 LogicalType::Int64,
1964 EncodingV2::Plain,
1965 )
1966 .unwrap();
1967 seg.metadata = Some(vec![meta_col]);
1968 }
1969
1970 let store = MemoryKV::new();
1972 {
1973 let manager = store.txn_manager();
1974 let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
1975 for seg in &mgr.segments {
1976 let key = key_layout::vector_segment_key(seg.segment_id);
1977 let bytes = seg.to_bytes().unwrap();
1978 txn.put(key, bytes).unwrap();
1979 }
1980 manager.commit(txn).unwrap();
1981 }
1982
1983 let mut restored = VectorStoreManager::new(mgr.config.clone());
1985 restored.next_segment_id = mgr.next_segment_id;
1986 {
1987 let mut txn = store.begin(TxnMode::ReadOnly).unwrap();
1988 for seg in &mgr.segments {
1989 let key = key_layout::vector_segment_key(seg.segment_id);
1990 let bytes = txn.get(&key).unwrap().unwrap();
1991 let decoded = VectorSegment::from_bytes(&bytes).unwrap();
1992 restored.segments.push(decoded);
1993 }
1994 }
1995
1996 let params = VectorSearchParams {
1997 query: vec![1.0, 0.0],
1998 metric: Metric::InnerProduct,
1999 top_k: 3,
2000 projection: Some(vec![0]),
2001 filter_mask: Some(vec![true, true, true]),
2002 };
2003 let (results, _stats) = restored.search_with_stats(params.clone()).unwrap();
2004 assert_eq!(results.len(), 3);
2005 let scalar = ScalarKernel;
2007 let expected = vec![
2008 (
2009 keys[0],
2010 scalar.inner_product(¶ms.query, &vecs[0]),
2011 ScalarValue::Int64(100),
2012 ),
2013 (
2014 keys[1],
2015 scalar.inner_product(¶ms.query, &vecs[1]),
2016 ScalarValue::Int64(200),
2017 ),
2018 (
2019 keys[2],
2020 scalar.inner_product(¶ms.query, &vecs[2]),
2021 ScalarValue::Int64(300),
2022 ),
2023 ];
2024 let mut expected_sorted = expected.clone();
2025 expected_sorted.sort_by(|a, b| {
2026 b.1.partial_cmp(&a.1)
2027 .unwrap_or(std::cmp::Ordering::Equal)
2028 .then_with(|| a.0.cmp(&b.0))
2029 });
2030
2031 for ((exp_id, _, exp_col), got) in expected_sorted.iter().zip(results.iter()) {
2032 assert_eq!(got.row_id, *exp_id);
2033 assert_same_scalar(exp_col, got.columns.first().unwrap());
2034 }
2035 }
2036
2037 fn assert_same_scalar(expected: &ScalarValue, actual: &ScalarValue) {
2038 match (expected, actual) {
2039 (ScalarValue::Int64(a), ScalarValue::Int64(b)) => assert_eq!(a, b),
2040 (ScalarValue::Float32(a), ScalarValue::Float32(b)) => assert!((a - b).abs() < 1e-5),
2041 (ScalarValue::Float64(a), ScalarValue::Float64(b)) => assert!((a - b).abs() < 1e-8),
2042 (ScalarValue::Bool(a), ScalarValue::Bool(b)) => assert_eq!(a, b),
2043 (ScalarValue::Binary(a), ScalarValue::Binary(b)) => assert_eq!(a, b),
2044 other => panic!("scalar mismatch: {:?}", other),
2045 }
2046 }
2047
2048 fn block_on<F: Future>(fut: F) -> F::Output {
2049 struct Noop;
2050 impl Wake for Noop {
2051 fn wake(self: Arc<Self>) {}
2052 fn wake_by_ref(self: &Arc<Self>) {}
2053 }
2054 let waker = Waker::from(Arc::new(Noop));
2055 let mut cx = Context::from_waker(&waker);
2056 let mut fut = std::pin::pin!(fut);
2057 loop {
2058 match fut.as_mut().poll(&mut cx) {
2059 Poll::Ready(val) => return val,
2060 Poll::Pending => std::thread::yield_now(),
2061 }
2062 }
2063 }
2064}