alopex_core/vector/
columnar.rs

1//! VectorSegment とエンコード済みカラムのシリアライズ補助。
2//!
3//! Phase3 タスク: VectorSegment構造体定義・シリアライズ実装・KVSキー設計。
4
5use crc32fast::Hasher;
6use serde::{Deserialize, Serialize};
7
8use crate::columnar::encoding::{Column, LogicalType};
9use crate::columnar::encoding_v2::{create_decoder, create_encoder, Bitmap, EncodingV2};
10use crate::columnar::segment_v2::{
11    ColumnSchema, ColumnSegmentV2, InMemorySegmentSource, RecordBatch, Schema, SegmentReaderV2,
12    SegmentWriterV2,
13};
14use crate::columnar::statistics::{ScalarValue, VectorSegmentStatistics};
15use crate::storage::compression::CompressionV2;
16use crate::vector::simd::select_kernel;
17use crate::vector::{CompactionResult, DeleteResult, Metric};
18use crate::{Error, Result};
19use std::collections::HashSet;
20
21const VECTOR_SEGMENT_VERSION: u8 = 1;
22
23#[derive(Serialize, Deserialize)]
24struct VectorSegmentEnvelope {
25    version: u8,
26    segment_id: u64,
27    dimension: usize,
28    metric: Metric,
29    statistics: VectorSegmentStatistics,
30    segment: ColumnSegmentV2,
31}
32
33/// エンコード済みカラムのメタデータとペイロード。
34#[derive(Clone, Debug, Serialize, Deserialize)]
35pub struct EncodedColumn {
36    /// 論理型。
37    pub logical_type: LogicalType,
38    /// エンコーディング種別。
39    pub encoding: crate::columnar::encoding_v2::EncodingV2,
40    /// 値の個数。
41    pub num_values: u64,
42    /// エンコード済みペイロード。
43    pub data: Vec<u8>,
44    /// Null ビットマップ(任意)。
45    pub null_bitmap: Option<Bitmap>,
46}
47
48/// ベクトル専用カラムナセグメント。
49#[derive(Clone, Debug, Serialize, Deserialize)]
50pub struct VectorSegment {
51    /// セグメントID。
52    pub segment_id: u64,
53    /// ベクトル次元。
54    pub dimension: usize,
55    /// 採用メトリック。
56    pub metric: Metric,
57    /// ベクトル総数。
58    pub num_vectors: u64,
59    /// ベクトル本体(Float32連続配列をエンコード)。
60    pub vectors: EncodedColumn,
61    /// ベクトル識別子列。
62    pub keys: EncodedColumn,
63    /// 論理削除フラグ。
64    pub deleted: Bitmap,
65    /// メタデータ列(任意)。
66    pub metadata: Option<Vec<EncodedColumn>>,
67    /// 統計情報。
68    pub statistics: VectorSegmentStatistics,
69}
70
71impl VectorSegment {
72    /// ColumnSegmentV2 を埋め込んだエンベロープをチェックサム付きでシリアライズする。
73    pub fn to_bytes(&self) -> Result<Vec<u8>> {
74        self.validate()?;
75        let envelope = VectorSegmentEnvelope {
76            version: VECTOR_SEGMENT_VERSION,
77            segment_id: self.segment_id,
78            dimension: self.dimension,
79            metric: self.metric,
80            statistics: self.statistics.clone(),
81            segment: self.build_column_segment()?,
82        };
83
84        let mut payload =
85            bincode::serialize(&envelope).map_err(|e| Error::InvalidFormat(e.to_string()))?;
86        let mut hasher = Hasher::new();
87        hasher.update(&payload);
88        let checksum = hasher.finalize();
89        payload.extend_from_slice(&checksum.to_le_bytes());
90        Ok(payload)
91    }
92
93    /// チェックサム検証込みでデシリアライズする。
94    pub fn from_bytes(bytes: &[u8]) -> Result<Self> {
95        if bytes.len() < 4 {
96            return Err(Error::InvalidFormat("VectorSegment bytes too short".into()));
97        }
98        let (payload, checksum_bytes) = bytes.split_at(bytes.len() - 4);
99        let expected =
100            u32::from_le_bytes(checksum_bytes.try_into().expect("split gives 4-byte slice"));
101
102        let mut hasher = Hasher::new();
103        hasher.update(payload);
104        let computed = hasher.finalize();
105        if computed != expected {
106            return Err(Error::ChecksumMismatch);
107        }
108
109        let envelope: VectorSegmentEnvelope =
110            bincode::deserialize(payload).map_err(|e| Error::InvalidFormat(e.to_string()))?;
111        if envelope.version != VECTOR_SEGMENT_VERSION {
112            return Err(Error::InvalidFormat(
113                "unsupported VectorSegment version".into(),
114            ));
115        }
116
117        let segment = Self::from_column_segment(envelope)?;
118        segment.validate()?;
119        Ok(segment)
120    }
121
122    /// 内部整合性チェック。
123    fn validate(&self) -> Result<()> {
124        if self.dimension == 0 {
125            return Err(Error::InvalidFormat("dimension must be > 0".into()));
126        }
127        let n = self.num_vectors as usize;
128
129        // vectors: Float32 かつ総要素数一致(num_vectors * dimension)
130        if self.vectors.logical_type != LogicalType::Float32 {
131            return Err(Error::InvalidFormat(
132                "vectors.logical_type must be Float32".into(),
133            ));
134        }
135        let expected_values = n
136            .checked_mul(self.dimension)
137            .ok_or_else(|| Error::InvalidFormat("num_vectors * dimension overflow".into()))?;
138        if self.vectors.num_values as usize != expected_values {
139            return Err(Error::InvalidFormat(
140                "vectors.num_values mismatch num_vectors * dimension".into(),
141            ));
142        }
143        if let Some(bm) = &self.vectors.null_bitmap {
144            if bm.len() != expected_values {
145                return Err(Error::InvalidFormat(
146                    "vectors.null_bitmap length mismatch".into(),
147                ));
148            }
149        }
150
151        // keys: Int64 かつ行数一致
152        if self.keys.logical_type != LogicalType::Int64 {
153            return Err(Error::InvalidFormat(
154                "keys.logical_type must be Int64".into(),
155            ));
156        }
157        if self.keys.num_values as usize != n {
158            return Err(Error::InvalidFormat(
159                "keys.num_values mismatch num_vectors".into(),
160            ));
161        }
162        if let Some(bm) = &self.keys.null_bitmap {
163            if bm.len() != n {
164                return Err(Error::InvalidFormat(
165                    "keys.null_bitmap length mismatch".into(),
166                ));
167            }
168        }
169        // deleted bitmap 長さ
170        if self.deleted.len() != n {
171            return Err(Error::InvalidFormat(
172                "deleted bitmap length mismatch num_vectors".into(),
173            ));
174        }
175        let mut deleted_count = 0u64;
176        let mut active_count = 0u64;
177        for idx in 0..n {
178            if self.deleted.get(idx) {
179                deleted_count += 1;
180            } else {
181                active_count += 1;
182            }
183        }
184
185        // metadata 各列の行数整合
186        if let Some(meta_cols) = &self.metadata {
187            for (idx, col) in meta_cols.iter().enumerate() {
188                if col.num_values as usize != n {
189                    return Err(Error::InvalidFormat(format!(
190                        "metadata column {} num_values mismatch num_vectors",
191                        idx
192                    )));
193                }
194                if let Some(bm) = &col.null_bitmap {
195                    if bm.len() != n {
196                        return Err(Error::InvalidFormat(format!(
197                            "metadata column {} null_bitmap length mismatch",
198                            idx
199                        )));
200                    }
201                }
202            }
203        }
204
205        // statistics 整合性
206        if self.statistics.row_count != self.num_vectors {
207            return Err(Error::InvalidFormat(
208                "statistics.row_count mismatch num_vectors".into(),
209            ));
210        }
211        let active_deleted = self
212            .statistics
213            .active_count
214            .saturating_add(self.statistics.deleted_count);
215        if active_deleted != self.num_vectors {
216            return Err(Error::InvalidFormat(
217                "statistics.active_count + deleted_count mismatch num_vectors".into(),
218            ));
219        }
220        if self.statistics.deleted_count != deleted_count {
221            return Err(Error::InvalidFormat(
222                "statistics.deleted_count mismatch deleted bitmap".into(),
223            ));
224        }
225        if self.statistics.active_count != active_count {
226            return Err(Error::InvalidFormat(
227                "statistics.active_count mismatch deleted bitmap".into(),
228            ));
229        }
230        if self.statistics.row_count > 0 {
231            let expected_ratio =
232                (self.statistics.deleted_count as f32) / (self.statistics.row_count as f32);
233            if (self.statistics.deletion_ratio - expected_ratio).abs() > 1e-6 {
234                return Err(Error::InvalidFormat(
235                    "statistics.deletion_ratio mismatch deleted_count/row_count".into(),
236                ));
237            }
238        } else if self.statistics.deletion_ratio != 0.0 {
239            return Err(Error::InvalidFormat(
240                "statistics.deletion_ratio must be 0 when row_count is 0".into(),
241            ));
242        }
243
244        Ok(())
245    }
246
247    /// ベクトルデータをデコード(FlattenされたFloat32配列)。
248    pub fn decode_vectors(&self) -> Result<Vec<f32>> {
249        let decoder = create_decoder(self.vectors.encoding);
250        let (col, _) = decoder
251            .decode(
252                &self.vectors.data,
253                self.vectors.num_values as usize,
254                self.vectors.logical_type,
255            )
256            .map_err(|e| Error::InvalidFormat(e.to_string()))?;
257        match col {
258            Column::Float32(v) => Ok(v),
259            other => Err(Error::InvalidFormat(format!(
260                "vectors column decoded to unexpected type {:?}",
261                other
262            ))),
263        }
264    }
265
266    /// キー列をデコード。
267    pub fn decode_keys(&self) -> Result<Vec<i64>> {
268        let decoder = create_decoder(self.keys.encoding);
269        let (col, _) = decoder
270            .decode(
271                &self.keys.data,
272                self.keys.num_values as usize,
273                self.keys.logical_type,
274            )
275            .map_err(|e| Error::InvalidFormat(e.to_string()))?;
276        match col {
277            Column::Int64(v) => Ok(v),
278            other => Err(Error::InvalidFormat(format!(
279                "keys column decoded to unexpected type {:?}",
280                other
281            ))),
282        }
283    }
284
285    /// 削除統計を deleted ビットマップから再計算する(norm は不変)。
286    fn recompute_deletion_stats(&mut self) {
287        let row_count = self.num_vectors;
288        let deleted_count = (0..row_count as usize)
289            .filter(|&i| self.deleted.get(i))
290            .count() as u64;
291        let active_count = row_count.saturating_sub(deleted_count);
292        self.statistics.row_count = row_count;
293        self.statistics.deleted_count = deleted_count;
294        self.statistics.active_count = active_count;
295        self.statistics.deletion_ratio = if row_count > 0 {
296            deleted_count as f32 / row_count as f32
297        } else {
298            0.0
299        };
300    }
301
302    /// EncodedColumn 群を ColumnSegmentV2 へ書き出す。
303    fn build_column_segment(&self) -> Result<ColumnSegmentV2> {
304        use crate::columnar::segment_v2::SegmentConfigV2;
305
306        let n = self.num_vectors as usize;
307        let dim = self.dimension;
308        let compression = CompressionV2::None;
309
310        // vectors -> Column::Fixed (byte packed per vector)
311        let (vec_col_decoded, vec_bm) = self.decode_column(&self.vectors)?;
312        let floats = match vec_col_decoded {
313            Column::Float32(v) => v,
314            other => {
315                return Err(Error::InvalidFormat(format!(
316                    "vectors column must decode to Float32, got {:?}",
317                    other
318                )))
319            }
320        };
321        if floats.len() != n * dim {
322            return Err(Error::InvalidFormat(
323                "decoded vectors length mismatch dimension".into(),
324            ));
325        }
326        let fixed_len = dim
327            .checked_mul(4)
328            .ok_or_else(|| Error::InvalidFormat("dimension overflow".into()))?;
329        if fixed_len > u16::MAX as usize {
330            return Err(Error::InvalidFormat("dimension too large for Fixed".into()));
331        }
332        let mut fixed_values = Vec::with_capacity(n);
333        for chunk in floats.chunks(dim) {
334            let mut buf = Vec::with_capacity(fixed_len);
335            for v in chunk {
336                buf.extend_from_slice(&v.to_le_bytes());
337            }
338            fixed_values.push(buf);
339        }
340        let vectors_column = Column::Binary(fixed_values);
341
342        // keys
343        let (keys_col_decoded, keys_bm) = self.decode_column(&self.keys)?;
344        let keys_column = match keys_col_decoded {
345            Column::Int64(v) => {
346                if v.len() != n {
347                    return Err(Error::InvalidFormat(
348                        "keys length mismatch num_vectors".into(),
349                    ));
350                }
351                Column::Int64(v)
352            }
353            other => {
354                return Err(Error::InvalidFormat(format!(
355                    "keys column must decode to Int64, got {:?}",
356                    other
357                )))
358            }
359        };
360
361        // deleted bitmap -> Column::Bool
362        let deleted_column = Column::Bool((0..n).map(|i| self.deleted.get(i)).collect());
363
364        // metadata
365        let mut metadata_columns = Vec::new();
366        let mut metadata_bitmaps = Vec::new();
367        if let Some(meta_cols) = &self.metadata {
368            for col in meta_cols {
369                let (decoded, bm) = self.decode_column(col)?;
370                if column_length(&decoded) != n {
371                    return Err(Error::InvalidFormat(
372                        "metadata length mismatch num_vectors".into(),
373                    ));
374                }
375                let normalized = if let LogicalType::Fixed(len) = col.logical_type {
376                    ensure_fixed_column(decoded, len as usize)?
377                } else {
378                    decoded
379                };
380                metadata_columns.push(normalized);
381                metadata_bitmaps.push(bm);
382            }
383        }
384
385        let mut schema_columns = Vec::new();
386        let mut columns = Vec::new();
387        let mut bitmaps = Vec::new();
388
389        schema_columns.push(ColumnSchema {
390            name: "vectors".into(),
391            logical_type: LogicalType::Binary,
392            nullable: vec_bm.is_some(),
393            fixed_len: Some(fixed_len as u32),
394        });
395        columns.push(vectors_column);
396        bitmaps.push(vec_bm);
397
398        schema_columns.push(ColumnSchema {
399            name: "keys".into(),
400            logical_type: LogicalType::Int64,
401            nullable: keys_bm.is_some(),
402            fixed_len: None,
403        });
404        columns.push(keys_column);
405        bitmaps.push(keys_bm);
406
407        schema_columns.push(ColumnSchema {
408            name: "deleted".into(),
409            logical_type: LogicalType::Bool,
410            nullable: false,
411            fixed_len: None,
412        });
413        columns.push(deleted_column);
414        bitmaps.push(None);
415
416        for (idx, col) in metadata_columns.into_iter().enumerate() {
417            let bm = metadata_bitmaps.get(idx).cloned().unwrap_or(None);
418            schema_columns.push(ColumnSchema {
419                name: format!("meta_{idx}"),
420                logical_type: column_logical_type(&col)?,
421                nullable: bm.is_some(),
422                fixed_len: match &col {
423                    Column::Fixed { len, .. } => Some(*len as u32),
424                    _ => None,
425                },
426            });
427            columns.push(col);
428            bitmaps.push(bm);
429        }
430
431        let schema = Schema {
432            columns: schema_columns,
433        };
434        let batch = RecordBatch::new(schema, columns, bitmaps);
435
436        let mut writer = SegmentWriterV2::new(SegmentConfigV2 {
437            compression,
438            ..Default::default()
439        });
440        writer
441            .write_batch(batch)
442            .map_err(|e| Error::InvalidFormat(e.to_string()))?;
443        writer
444            .finish()
445            .map_err(|e| Error::InvalidFormat(e.to_string()))
446    }
447
448    /// ColumnSegmentV2 から VectorSegment を復元する。
449    fn from_column_segment(envelope: VectorSegmentEnvelope) -> Result<Self> {
450        let num_vectors = envelope.segment.meta.num_rows;
451
452        // 読み出し
453        let reader = SegmentReaderV2::open(Box::new(InMemorySegmentSource::new(
454            envelope.segment.data.clone(),
455        )))
456        .map_err(|e| Error::InvalidFormat(e.to_string()))?;
457
458        let column_count = envelope.segment.meta.schema.column_count();
459        let mut combined_columns: Vec<Option<Column>> = vec![None; column_count];
460        let mut combined_bitmaps: Vec<Option<Bitmap>> = vec![None; column_count];
461
462        for batch in reader
463            .iter_row_groups()
464            .collect::<std::result::Result<Vec<_>, _>>()
465            .map_err(|e| Error::InvalidFormat(e.to_string()))?
466        {
467            for (idx, col) in batch.columns.iter().enumerate() {
468                if idx >= combined_columns.len() {
469                    return Err(Error::InvalidFormat("column index out of bounds".into()));
470                }
471                combined_columns[idx] =
472                    Some(append_column(combined_columns[idx].take(), col.clone())?);
473            }
474            for (idx, bm) in batch.null_bitmaps.iter().enumerate() {
475                if idx >= combined_bitmaps.len() {
476                    return Err(Error::InvalidFormat("bitmap index out of bounds".into()));
477                }
478                combined_bitmaps[idx] = append_bitmap(combined_bitmaps[idx].take(), bm.clone());
479            }
480        }
481
482        // vectors (index 0)
483        let vectors_col = combined_columns
484            .first()
485            .and_then(|c| c.clone())
486            .ok_or_else(|| Error::InvalidFormat("missing vectors column".into()))?;
487        let vec_bitmap = combined_bitmaps.first().cloned().unwrap_or(None);
488        let vectors =
489            encode_vectors_from_fixed(vectors_col, vec_bitmap.clone(), envelope.dimension)?;
490
491        // keys (index 1)
492        let keys_col = combined_columns
493            .get(1)
494            .and_then(|c| c.clone())
495            .ok_or_else(|| Error::InvalidFormat("missing keys column".into()))?;
496        let keys_bitmap = combined_bitmaps.get(1).cloned().unwrap_or(None);
497        let keys =
498            encode_generic_column(keys_col, keys_bitmap, LogicalType::Int64, EncodingV2::Plain)?;
499
500        // deleted (index 2)
501        let deleted_col = combined_columns
502            .get(2)
503            .and_then(|c| c.clone())
504            .ok_or_else(|| Error::InvalidFormat("missing deleted column".into()))?;
505        let deleted = column_to_bitmap(deleted_col, num_vectors as usize)?;
506
507        // metadata
508        let mut metadata_cols = Vec::new();
509        for (idx, col_opt) in combined_columns.iter().enumerate().skip(3) {
510            let col = col_opt
511                .clone()
512                .ok_or_else(|| Error::InvalidFormat("missing metadata column".into()))?;
513            let bm = combined_bitmaps.get(idx).cloned().unwrap_or(None);
514            let logical_type = column_logical_type(&col)?;
515            let encoded = encode_generic_column(col, bm, logical_type, EncodingV2::Plain)?;
516            metadata_cols.push(encoded);
517        }
518
519        let segment = VectorSegment {
520            segment_id: envelope.segment_id,
521            dimension: envelope.dimension,
522            metric: envelope.metric,
523            num_vectors,
524            vectors,
525            keys,
526            deleted,
527            metadata: if metadata_cols.is_empty() {
528                None
529            } else {
530                Some(metadata_cols)
531            },
532            statistics: envelope.statistics,
533        };
534        segment.validate()?;
535        Ok(segment)
536    }
537
538    fn decode_column(&self, col: &EncodedColumn) -> Result<(Column, Option<Bitmap>)> {
539        let decoder = create_decoder(col.encoding);
540        let encoded_bytes = col.data.clone();
541
542        decoder
543            .decode(&encoded_bytes, col.num_values as usize, col.logical_type)
544            .map_err(|e| Error::InvalidFormat(e.to_string()))
545    }
546}
547
548fn column_logical_type(col: &Column) -> Result<LogicalType> {
549    match col {
550        Column::Int64(_) => Ok(LogicalType::Int64),
551        Column::Float32(_) => Ok(LogicalType::Float32),
552        Column::Float64(_) => Ok(LogicalType::Float64),
553        Column::Bool(_) => Ok(LogicalType::Bool),
554        Column::Binary(_) => Ok(LogicalType::Binary),
555        Column::Fixed { len, .. } => {
556            Ok(LogicalType::Fixed((*len).try_into().map_err(|_| {
557                Error::InvalidFormat("fixed length too large".into())
558            })?))
559        }
560    }
561}
562
563fn column_length(col: &Column) -> usize {
564    match col {
565        Column::Int64(v) => v.len(),
566        Column::Float32(v) => v.len(),
567        Column::Float64(v) => v.len(),
568        Column::Bool(v) => v.len(),
569        Column::Binary(v) => v.len(),
570        Column::Fixed { values, .. } => values.len(),
571    }
572}
573
574fn append_column(current: Option<Column>, next: Column) -> Result<Column> {
575    match (current, next) {
576        (None, n) => Ok(n),
577        (Some(Column::Int64(mut a)), Column::Int64(b)) => {
578            a.extend_from_slice(&b);
579            Ok(Column::Int64(a))
580        }
581        (Some(Column::Float32(mut a)), Column::Float32(b)) => {
582            a.extend_from_slice(&b);
583            Ok(Column::Float32(a))
584        }
585        (Some(Column::Float64(mut a)), Column::Float64(b)) => {
586            a.extend_from_slice(&b);
587            Ok(Column::Float64(a))
588        }
589        (Some(Column::Bool(mut a)), Column::Bool(b)) => {
590            a.extend_from_slice(&b);
591            Ok(Column::Bool(a))
592        }
593        (Some(Column::Binary(mut a)), Column::Binary(b)) => {
594            a.extend_from_slice(&b);
595            Ok(Column::Binary(a))
596        }
597        (
598            Some(Column::Fixed { len, mut values }),
599            Column::Fixed {
600                len: len2,
601                values: v,
602            },
603        ) => {
604            if len != len2 {
605                return Err(Error::InvalidFormat("fixed length mismatch".into()));
606            }
607            values.extend_from_slice(&v);
608            Ok(Column::Fixed { len, values })
609        }
610        _ => Err(Error::InvalidFormat(
611            "column type mismatch when merging row groups".into(),
612        )),
613    }
614}
615
616fn append_bitmap(current: Option<Bitmap>, next: Option<Bitmap>) -> Option<Bitmap> {
617    match (current, next) {
618        (None, None) => None,
619        (Some(b), None) => Some(b),
620        (None, Some(b)) => Some(b),
621        (Some(a), Some(b)) => {
622            let mut merged: Vec<bool> = Vec::with_capacity(a.len() + b.len());
623            for i in 0..a.len() {
624                merged.push(a.get(i));
625            }
626            for i in 0..b.len() {
627                merged.push(b.get(i));
628            }
629            Some(Bitmap::from_bools(&merged))
630        }
631    }
632}
633
634fn encode_vectors_from_fixed(
635    col: Column,
636    bitmap: Option<Bitmap>,
637    dimension: usize,
638) -> Result<EncodedColumn> {
639    let values = match col {
640        Column::Binary(values) => values,
641        Column::Fixed { values, len } => {
642            if len != dimension * 4 {
643                return Err(Error::InvalidFormat(
644                    "vectors fixed length mismatch dimension".into(),
645                ));
646            }
647            values
648        }
649        other => {
650            return Err(Error::InvalidFormat(format!(
651                "vectors column must be Binary/Fixed, got {:?}",
652                other
653            )))
654        }
655    };
656    let expected_len = dimension
657        .checked_mul(4)
658        .ok_or_else(|| Error::InvalidFormat("dimension overflow".into()))?;
659
660    let mut floats = Vec::with_capacity(values.len() * dimension);
661    for chunk in values {
662        if chunk.len() != expected_len {
663            return Err(Error::InvalidFormat(
664                "vector payload length mismatch".into(),
665            ));
666        }
667        for bytes in chunk.chunks_exact(4) {
668            floats.push(f32::from_le_bytes(
669                bytes
670                    .try_into()
671                    .map_err(|_| Error::InvalidFormat("vector chunk".into()))?,
672            ));
673        }
674    }
675
676    let encoder = create_encoder(EncodingV2::ByteStreamSplit);
677    let encoded = encoder
678        .encode(&Column::Float32(floats.clone()), bitmap.as_ref())
679        .map_err(|e| Error::InvalidFormat(e.to_string()))?;
680
681    Ok(EncodedColumn {
682        logical_type: LogicalType::Float32,
683        encoding: EncodingV2::ByteStreamSplit,
684        num_values: floats.len() as u64,
685        data: encoded,
686        null_bitmap: bitmap,
687    })
688}
689
690fn encode_generic_column(
691    col: Column,
692    bitmap: Option<Bitmap>,
693    logical_type: LogicalType,
694    encoding: EncodingV2,
695) -> Result<EncodedColumn> {
696    let col = match logical_type {
697        LogicalType::Fixed(len) => ensure_fixed_column(col, len as usize)?,
698        _ => col,
699    };
700    let encoder = create_encoder(encoding);
701    let encoded = encoder
702        .encode(&col, bitmap.as_ref())
703        .map_err(|e| Error::InvalidFormat(e.to_string()))?;
704
705    Ok(EncodedColumn {
706        logical_type,
707        encoding,
708        num_values: column_length(&col) as u64,
709        data: encoded,
710        null_bitmap: bitmap,
711    })
712}
713
714fn slice_column(col: &Column, indices: &[usize]) -> Result<Column> {
715    Ok(match col {
716        Column::Int64(v) => Column::Int64(take_indices(v, indices)?),
717        Column::Float32(v) => Column::Float32(take_indices(v, indices)?),
718        Column::Float64(v) => Column::Float64(take_indices(v, indices)?),
719        Column::Bool(v) => Column::Bool(take_indices(v, indices)?),
720        Column::Binary(v) => Column::Binary(take_indices(v, indices)?),
721        Column::Fixed { len, values } => Column::Fixed {
722            len: *len,
723            values: take_indices(values, indices)?,
724        },
725    })
726}
727
728fn slice_bitmap(bm: Option<Bitmap>, indices: &[usize]) -> Option<Bitmap> {
729    bm.map(|source| {
730        let mut sliced = Bitmap::new_zeroed(indices.len());
731        for (dst_idx, src_idx) in indices.iter().enumerate() {
732            if source.get(*src_idx) {
733                sliced.set(dst_idx, true);
734            }
735        }
736        sliced
737    })
738}
739
740fn take_indices<T: Clone>(values: &[T], indices: &[usize]) -> Result<Vec<T>> {
741    let mut out = Vec::with_capacity(indices.len());
742    for &idx in indices {
743        out.push(
744            values
745                .get(idx)
746                .cloned()
747                .ok_or_else(|| Error::InvalidFormat("index out of bounds".into()))?,
748        );
749    }
750    Ok(out)
751}
752
753fn column_to_bitmap(col: Column, expected_len: usize) -> Result<Bitmap> {
754    match col {
755        Column::Bool(values) => {
756            if values.len() != expected_len {
757                return Err(Error::InvalidFormat(
758                    "deleted length mismatch num_vectors".into(),
759                ));
760            }
761            Ok(if values.iter().all(|v| !*v) {
762                Bitmap::new(expected_len)
763            } else if values.iter().all(|v| *v) {
764                Bitmap::all_valid(expected_len)
765            } else {
766                Bitmap::from_bools(&values)
767            })
768        }
769        other => Err(Error::InvalidFormat(format!(
770            "deleted column must be Bool, got {:?}",
771            other
772        ))),
773    }
774}
775
776fn ensure_fixed_column(col: Column, len: usize) -> Result<Column> {
777    match col {
778        Column::Fixed { len: l, values } => {
779            if l != len {
780                return Err(Error::InvalidFormat(
781                    "fixed column length mismatch expected length".into(),
782                ));
783            }
784            Ok(Column::Fixed { len, values })
785        }
786        Column::Binary(values) => {
787            if values.iter().any(|v| v.len() != len) {
788                return Err(Error::InvalidFormat(
789                    "binary column has variable-length values for Fixed type".into(),
790                ));
791            }
792            Ok(Column::Fixed { len, values })
793        }
794        other => Err(Error::InvalidFormat(format!(
795            "column must be Fixed/Binary for Fixed logical type, got {:?}",
796            other
797        ))),
798    }
799}
800
801/// KVS キーレイアウト。
802pub mod key_layout {
803    /// `vector_segment:{segment_id}` 形式のキーを生成する。
804    pub fn vector_segment_key(segment_id: u64) -> Vec<u8> {
805        format!("vector_segment:{segment_id}").into_bytes()
806    }
807}
808
809/// VectorStore の設定。
810///
811/// # Examples
812/// ```
813/// use alopex_core::vector::{VectorStoreConfig, Metric};
814/// let cfg = VectorStoreConfig { dimension: 128, metric: Metric::Cosine, ..Default::default() };
815/// assert_eq!(cfg.dimension, 128);
816/// ```
817#[derive(Clone, Debug, Serialize, Deserialize)]
818pub struct VectorStoreConfig {
819    /// ベクトル次元。
820    pub dimension: usize,
821    /// デフォルトメトリック。
822    pub metric: Metric,
823    /// 1 セグメントあたりの最大ベクトル数。
824    pub segment_max_vectors: usize,
825    /// 将来のコンパクション閾値(現状は設定のみ)。
826    pub compaction_threshold: f32,
827    /// ベクトルエンコーディング方式。
828    pub encoding: EncodingV2,
829}
830
831impl Default for VectorStoreConfig {
832    fn default() -> Self {
833        Self {
834            dimension: 128,
835            metric: Metric::Cosine,
836            segment_max_vectors: 65_536,
837            compaction_threshold: 0.3,
838            encoding: EncodingV2::ByteStreamSplit,
839        }
840    }
841}
842
843/// ベクトル追加結果。
844#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
845pub struct AppendResult {
846    /// 追加されたベクトル数。
847    pub vectors_added: usize,
848    /// 新規作成されたセグメント数。
849    pub segments_created: usize,
850}
851
852/// 検索パラメータ。
853#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
854pub struct VectorSearchParams {
855    /// クエリベクトル。
856    pub query: Vec<f32>,
857    /// 使用するメトリック。
858    pub metric: Metric,
859    /// 取得する Top-K 件数。
860    pub top_k: usize,
861    /// メタデータ列のプロジェクション(0-based、metadata 配列のインデックス)。
862    pub projection: Option<Vec<usize>>,
863    /// フィルタマスク(行単位、true=通過)。
864    pub filter_mask: Option<Vec<bool>>,
865}
866
867/// 検索結果。
868#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
869pub struct VectorSearchResult {
870    /// ベクトル識別子。
871    pub row_id: i64,
872    /// スコア(DESCソート)。
873    pub score: f32,
874    /// 投影されたカラム(現状空配列)。
875    pub columns: Vec<ScalarValue>,
876}
877
878/// 検索統計。
879#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
880pub struct SearchStats {
881    /// 走査したセグメント数。
882    pub segments_scanned: u64,
883    /// プルーニングでスキップしたセグメント数。
884    pub segments_pruned: u64,
885    /// 走査した行数(削除済み含む)。
886    pub rows_scanned: u64,
887    /// スコア計算した行数。
888    pub rows_matched: u64,
889}
890
891/// VectorStore のシンプルなインメモリ実装。
892#[derive(Debug)]
893pub struct VectorStoreManager {
894    config: VectorStoreConfig,
895    segments: Vec<VectorSegment>,
896    next_segment_id: u64,
897}
898
899impl VectorStoreManager {
900    /// 新しい VectorStoreManager を生成。
901    pub fn new(config: VectorStoreConfig) -> Self {
902        Self {
903            config,
904            segments: Vec::new(),
905            next_segment_id: 0,
906        }
907    }
908
909    /// 現在保持しているセグメント一覧(読み取り専用)を返す。
910    ///
911    /// # 注意(Disk モード復元時の前提)
912    /// - `VectorStoreManager` は「セグメントの順序」を `row_id` の割当てに利用します(検索時に `row_offset` を積み上げる)。
913    /// - `from_segments` で復元する場合は、永続化時と同じ順序(通常は古い→新しい)で `segments` を渡してください。
914    pub fn segments(&self) -> &[VectorSegment] {
915        &self.segments
916    }
917
918    /// 設定を返す(Disk モードの復元用)。
919    ///
920    /// # 注意(Disk モード復元時の前提)
921    /// - `segments` 内の各セグメントは、この `config` と整合している必要があります(例: `dimension`/`metric`)。
922    pub fn config(&self) -> &VectorStoreConfig {
923        &self.config
924    }
925
926    /// 次に割り当てられるセグメントIDを返す(永続化用)。
927    ///
928    /// # 注意(Disk モード復元時の前提)
929    /// - `from_segments` に渡す `next_segment_id` は、通常 `max(segment_id) + 1` 以上である必要があります。
930    pub fn next_segment_id(&self) -> u64 {
931        self.next_segment_id
932    }
933
934    /// 永続化済みセグメントから `VectorStoreManager` を復元する(Disk モード向け)。
935    ///
936    /// # 注意(呼び出し側が満たすべき前提)
937    /// - `segments` は永続化時と同じ順序(通常は古い→新しい)で渡すこと。
938    /// - `segments` は重複しない `segment_id` を持つこと。
939    /// - `config` と `segments` の整合(dimension/metric など)を保つこと。
940    /// - `next_segment_id` は `max(segment_id) + 1` 以上であること(将来の追加/コンパクションで重複IDを避けるため)。
941    pub fn from_segments(
942        config: VectorStoreConfig,
943        segments: Vec<VectorSegment>,
944        next_segment_id: u64,
945    ) -> Self {
946        Self {
947            config,
948            segments,
949            next_segment_id,
950        }
951    }
952
953    /// ベクトルバッチを追加する。
954    ///
955    /// # Errors
956    /// - `DimensionMismatch`: 入力ベクトルの次元が設定と異なる場合
957    /// - `InvalidVector`: NaN/Inf を含む場合
958    ///
959    /// # Examples
960    ///
961    /// ```no_run
962    /// use alopex_core::vector::{VectorStoreManager, VectorStoreConfig};
963    /// # use alopex_core::Result;
964    /// # async fn demo() -> Result<()> {
965    /// let mut mgr = VectorStoreManager::new(VectorStoreConfig { dimension: 2, ..Default::default() });
966    /// let keys = vec![1, 2];
967    /// let vecs = vec![vec![1.0, 0.0], vec![0.0, 1.0]];
968    /// mgr.append_batch(&keys, &vecs).await?;
969    /// # Ok(()) }
970    /// ```
971    pub async fn append_batch(
972        &mut self,
973        keys: &[i64],
974        vectors: &[Vec<f32>],
975    ) -> Result<AppendResult> {
976        if keys.len() != vectors.len() {
977            return Err(Error::InvalidFormat("keys/vectors length mismatch".into()));
978        }
979        if vectors.is_empty() {
980            return Ok(AppendResult::default());
981        }
982        let dim = self.config.dimension;
983        for (idx, v) in vectors.iter().enumerate() {
984            if v.len() != dim {
985                return Err(Error::DimensionMismatch {
986                    expected: dim,
987                    actual: v.len(),
988                });
989            }
990            if contains_nan_or_inf(v) {
991                return Err(Error::InvalidVector {
992                    index: idx,
993                    reason: "vector contains NaN or Inf".into(),
994                });
995            }
996        }
997
998        let mut vectors_added = 0usize;
999        let mut segments_created = 0usize;
1000        let mut start = 0usize;
1001        while start < vectors.len() {
1002            let end = usize::min(start + self.config.segment_max_vectors, vectors.len());
1003            let slice = &vectors[start..end];
1004            let key_slice = &keys[start..end];
1005
1006            let segment = self.build_segment(key_slice, slice)?;
1007            self.segments.push(segment);
1008            self.next_segment_id += 1;
1009            vectors_added += slice.len();
1010            segments_created += 1;
1011            start = end;
1012        }
1013
1014        Ok(AppendResult {
1015            vectors_added,
1016            segments_created,
1017        })
1018    }
1019
1020    /// コンパクション対象セグメントを取得する。
1021    ///
1022    /// `deletion_ratio` が `compaction_threshold` 以上かつ > 0 のセグメントIDを返す。
1023    /// `threshold >= 1.0` の場合は常に空。
1024    ///
1025    /// # Examples
1026    /// ```ignore
1027    /// # use alopex_core::vector::{VectorStoreManager, VectorStoreConfig, Metric};
1028    /// # let mut mgr = VectorStoreManager::new(VectorStoreConfig { compaction_threshold: 0.5, ..Default::default() });
1029    /// # futures::executor::block_on(mgr.append_batch(&[1,2], &[vec![1.0,0.0], vec![0.0,1.0]])).unwrap();
1030    /// assert!(mgr.segments_needing_compaction().is_empty()); // no deletions yet
1031    /// // Mark one row deleted -> deletion_ratio = 0.5, meets threshold
1032    /// mgr.segments[0].deleted.set(0, true);
1033    /// mgr.segments[0].recompute_deletion_stats();
1034    /// assert_eq!(mgr.segments_needing_compaction(), vec![mgr.segments[0].segment_id]);
1035    /// ```
1036    pub fn segments_needing_compaction(&self) -> Vec<u64> {
1037        let threshold = self.config.compaction_threshold;
1038        if threshold >= 1.0 {
1039            return Vec::new();
1040        }
1041        let mut ids = Vec::new();
1042        for seg in &self.segments {
1043            if seg.statistics.deletion_ratio >= threshold && seg.statistics.deletion_ratio > 0.0 {
1044                ids.push(seg.segment_id);
1045            }
1046        }
1047        ids
1048    }
1049
1050    /// 指定キーのベクトルを論理削除する(in-memory)。
1051    ///
1052    /// # Errors
1053    /// - セグメントのデコードに失敗した場合 `InvalidFormat`
1054    ///
1055    /// # Examples
1056    /// ```ignore
1057    /// # use alopex_core::vector::{VectorStoreManager, VectorStoreConfig};
1058    /// # let mut mgr = VectorStoreManager::new(VectorStoreConfig { dimension: 2, ..Default::default() });
1059    /// # futures::executor::block_on(mgr.append_batch(&[1], &[vec![1.0, 0.0]])).unwrap();
1060    /// let res = futures::executor::block_on(mgr.delete_batch(&[1])).unwrap();
1061    /// assert_eq!(res.vectors_deleted, 1);
1062    /// ```
1063    pub async fn delete_batch(&mut self, keys: &[i64]) -> Result<DeleteResult> {
1064        if keys.is_empty() {
1065            return Ok(DeleteResult::default());
1066        }
1067        let key_set: HashSet<i64> = keys.iter().copied().collect();
1068        let mut result = DeleteResult::default();
1069
1070        for segment in &mut self.segments {
1071            let decoded_keys = segment.decode_keys()?;
1072            let mut modified = false;
1073            for (idx, key) in decoded_keys.iter().enumerate() {
1074                if !key_set.contains(key) {
1075                    continue;
1076                }
1077                if !segment.deleted.get(idx) {
1078                    segment.deleted.set(idx, true);
1079                    result.vectors_deleted = result.vectors_deleted.saturating_add(1);
1080                    modified = true;
1081                }
1082            }
1083
1084            if modified {
1085                segment.recompute_deletion_stats();
1086                result.segments_modified.push(segment.segment_id);
1087            }
1088        }
1089
1090        Ok(result)
1091    }
1092
1093    /// セグメントをコンパクションして新セグメントに置換する。
1094    ///
1095    /// 削除済み行を物理的に取り除き、新セグメントを構築する(全削除時はセグメントを削除)。
1096    ///
1097    /// # Errors
1098    /// - `Error::NotFound` 指定IDが存在しない場合
1099    /// - `InvalidFormat` セグメントのデコード/再構成に失敗した場合
1100    ///
1101    /// # Examples
1102    /// ```ignore
1103    /// # use alopex_core::vector::{VectorStoreManager, VectorStoreConfig};
1104    /// # let mut mgr = VectorStoreManager::new(VectorStoreConfig { dimension: 2, ..Default::default() });
1105    /// # futures::executor::block_on(mgr.append_batch(&[1,2], &[vec![1.0,0.0], vec![0.0,1.0]])).unwrap();
1106    /// # futures::executor::block_on(mgr.delete_batch(&[1])).unwrap();
1107    /// let seg_id = mgr.segments[0].segment_id;
1108    /// let res = futures::executor::block_on(mgr.compact_segment(seg_id)).unwrap();
1109    /// assert!(res.new_segment_id.is_some());
1110    /// ```
1111    pub async fn compact_segment(&mut self, segment_id: u64) -> Result<CompactionResult> {
1112        let pos = self
1113            .segments
1114            .iter()
1115            .position(|s| s.segment_id == segment_id)
1116            .ok_or(Error::NotFound)?;
1117
1118        let old = self.segments.get(pos).cloned().ok_or(Error::NotFound)?;
1119        let old_size = old.to_bytes().map(|b| b.len() as u64).unwrap_or(0);
1120
1121        let active_indices: Vec<usize> = (0..old.num_vectors as usize)
1122            .filter(|&i| !old.deleted.get(i))
1123            .collect();
1124
1125        if active_indices.is_empty() {
1126            self.segments.remove(pos);
1127            return Ok(CompactionResult {
1128                old_segment_id: segment_id,
1129                new_segment_id: None,
1130                vectors_removed: old.num_vectors,
1131                space_reclaimed: old_size,
1132            });
1133        }
1134
1135        let decoded_vectors = old.decode_vectors()?;
1136        let decoded_keys = old.decode_keys()?;
1137
1138        let mut new_vecs = Vec::with_capacity(active_indices.len());
1139        for &idx in &active_indices {
1140            let start = idx * self.config.dimension;
1141            let end = start + self.config.dimension;
1142            new_vecs.push(decoded_vectors[start..end].to_vec());
1143        }
1144        let new_keys: Vec<i64> = active_indices
1145            .iter()
1146            .map(|&i| {
1147                decoded_keys
1148                    .get(i)
1149                    .copied()
1150                    .ok_or_else(|| Error::InvalidFormat("missing key".into()))
1151            })
1152            .collect::<Result<_>>()?;
1153
1154        let mut new_segment = self.build_segment(&new_keys, &new_vecs)?;
1155
1156        // metadata を再構成(存在する場合)。
1157        if let Some(meta_cols) = &old.metadata {
1158            let mut new_meta = Vec::with_capacity(meta_cols.len());
1159            for col in meta_cols {
1160                let (decoded_col, bitmap) = old.decode_column(col)?;
1161                let sliced_col = slice_column(&decoded_col, &active_indices)?;
1162                let sliced_bitmap = slice_bitmap(bitmap, &active_indices);
1163                let encoded = encode_generic_column(
1164                    sliced_col,
1165                    sliced_bitmap,
1166                    col.logical_type,
1167                    col.encoding,
1168                )?;
1169                new_meta.push(encoded);
1170            }
1171            if !new_meta.is_empty() {
1172                new_segment.metadata = Some(new_meta);
1173            }
1174        }
1175
1176        // 新セグメントIDを割当て、置換。
1177        let new_segment_id = self.next_segment_id;
1178        new_segment.segment_id = new_segment_id; // build_segment で設定した値と合わせるため明示
1179        self.next_segment_id += 1;
1180        let new_size = new_segment.to_bytes().map(|b| b.len() as u64).unwrap_or(0);
1181        let space_reclaimed = old_size.saturating_sub(new_size);
1182        let vectors_removed = old.num_vectors.saturating_sub(new_segment.num_vectors);
1183
1184        self.segments[pos] = new_segment;
1185
1186        Ok(CompactionResult {
1187            old_segment_id: segment_id,
1188            new_segment_id: Some(new_segment_id),
1189            vectors_removed,
1190            space_reclaimed,
1191        })
1192    }
1193
1194    /// ベクトル検索。
1195    ///
1196    /// # Errors
1197    /// - `DimensionMismatch`: クエリ次元が設定と異なる場合。
1198    /// - `InvalidVector`: クエリに NaN/Inf が含まれる場合。
1199    pub fn search(&self, params: VectorSearchParams) -> Result<Vec<VectorSearchResult>> {
1200        let mut stats = SearchStats::default();
1201        let (results, _) = self.search_internal(params, &mut stats)?;
1202        Ok(results)
1203    }
1204
1205    /// 統計付き検索。
1206    ///
1207    /// `search` と同じ結果に加え、走査/プルーニング件数を返す。
1208    pub fn search_with_stats(
1209        &self,
1210        params: VectorSearchParams,
1211    ) -> Result<(Vec<VectorSearchResult>, SearchStats)> {
1212        let mut stats = SearchStats::default();
1213        let (results, stats) = self.search_internal(params, &mut stats)?;
1214        Ok((results, stats))
1215    }
1216
1217    fn search_internal(
1218        &self,
1219        params: VectorSearchParams,
1220        stats: &mut SearchStats,
1221    ) -> Result<(Vec<VectorSearchResult>, SearchStats)> {
1222        if params.top_k == 0 {
1223            return Ok((Vec::new(), stats.clone()));
1224        }
1225        if params.query.len() != self.config.dimension {
1226            return Err(Error::DimensionMismatch {
1227                expected: self.config.dimension,
1228                actual: params.query.len(),
1229            });
1230        }
1231        if contains_nan_or_inf(&params.query) {
1232            return Err(Error::InvalidVector {
1233                index: 0,
1234                reason: "query contains NaN or Inf".into(),
1235            });
1236        }
1237
1238        let mut candidates: Vec<VectorSearchResult> = Vec::new();
1239        let query_norm = params.query.iter().map(|v| v * v).sum::<f32>().sqrt();
1240        let mut row_offset = 0u64;
1241        for segment in &self.segments {
1242            if segment.statistics.deletion_ratio >= 1.0 {
1243                stats.segments_pruned += 1;
1244                row_offset += segment.num_vectors;
1245                continue;
1246            }
1247            // pruning by norm range
1248            if query_norm < segment.statistics.norm_min || query_norm > segment.statistics.norm_max
1249            {
1250                stats.segments_pruned += 1;
1251                row_offset += segment.num_vectors;
1252                continue;
1253            }
1254            stats.segments_scanned += 1;
1255            stats.rows_scanned = stats.rows_scanned.saturating_add(segment.num_vectors);
1256            let decoded = segment.decode_vectors()?;
1257            let decoded_keys = segment.decode_keys()?;
1258            let metadata = decode_metadata(&segment.metadata, segment.num_vectors as usize)?;
1259            let kernel = select_kernel();
1260            let mask = params.filter_mask.as_ref();
1261            for (idx, chunk) in decoded.chunks(self.config.dimension).enumerate() {
1262                // deleted bitmap uses `true` to mean logically deleted.
1263                if segment.deleted.get(idx) {
1264                    continue;
1265                }
1266                if let Some(mask_vec) = mask {
1267                    let global_idx = row_offset as usize + idx;
1268                    if global_idx >= mask_vec.len() || !mask_vec[global_idx] {
1269                        continue;
1270                    }
1271                }
1272                let score = match params.metric {
1273                    Metric::Cosine => kernel.cosine(&params.query, chunk),
1274                    Metric::L2 => kernel.l2(&params.query, chunk),
1275                    Metric::InnerProduct => kernel.inner_product(&params.query, chunk),
1276                };
1277                let row_id = *decoded_keys
1278                    .get(idx)
1279                    .ok_or_else(|| Error::InvalidFormat("missing key".into()))?;
1280                let columns = if let Some(proj) = &params.projection {
1281                    let mut cols = Vec::with_capacity(proj.len());
1282                    for &p in proj {
1283                        let col = metadata.get(p).ok_or_else(|| {
1284                            Error::InvalidFormat("projection out of bounds".into())
1285                        })?;
1286                        cols.push(col.get(idx).cloned().ok_or_else(|| {
1287                            Error::InvalidFormat("projection row out of bounds".into())
1288                        })?);
1289                    }
1290                    cols
1291                } else {
1292                    Vec::new()
1293                };
1294                candidates.push(VectorSearchResult {
1295                    row_id,
1296                    score,
1297                    columns,
1298                });
1299                stats.rows_matched += 1;
1300            }
1301            row_offset += segment.num_vectors;
1302        }
1303
1304        candidates.sort_by(|a, b| {
1305            b.score
1306                .partial_cmp(&a.score)
1307                .unwrap_or(std::cmp::Ordering::Equal)
1308                .then_with(|| a.row_id.cmp(&b.row_id))
1309        });
1310        candidates.truncate(params.top_k);
1311        Ok((candidates, stats.clone()))
1312    }
1313
1314    fn build_segment(&mut self, keys: &[i64], vectors: &[Vec<f32>]) -> Result<VectorSegment> {
1315        let mut flattened = Vec::with_capacity(vectors.len() * self.config.dimension);
1316        for v in vectors {
1317            flattened.extend_from_slice(v);
1318        }
1319        let vec_enc = encode_generic_column(
1320            Column::Float32(flattened),
1321            None,
1322            LogicalType::Float32,
1323            self.config.encoding,
1324        )?;
1325        let key_enc = encode_generic_column(
1326            Column::Int64(keys.to_vec()),
1327            None,
1328            LogicalType::Int64,
1329            EncodingV2::Plain,
1330        )?;
1331        let deleted = Bitmap::new_zeroed(keys.len());
1332        let stats = compute_stats(vectors, Some(&deleted));
1333        let segment = VectorSegment {
1334            segment_id: self.next_segment_id,
1335            dimension: self.config.dimension,
1336            metric: self.config.metric,
1337            num_vectors: keys.len() as u64,
1338            vectors: vec_enc,
1339            keys: key_enc,
1340            deleted,
1341            metadata: None,
1342            statistics: stats,
1343        };
1344        Ok(segment)
1345    }
1346}
1347
1348fn compute_stats(vectors: &[Vec<f32>], deleted: Option<&Bitmap>) -> VectorSegmentStatistics {
1349    let row_count = vectors.len() as u64;
1350    let null_count = 0;
1351    let deleted_count = (0..vectors.len())
1352        .filter(|&i| deleted.is_some_and(|bm| bm.get(i)))
1353        .count() as u64;
1354    let active_count = row_count.saturating_sub(deleted_count);
1355    let mut norm_min = f32::MAX;
1356    let mut norm_max = f32::MIN;
1357    for (idx, v) in vectors.iter().enumerate() {
1358        if deleted.is_some_and(|bm| bm.get(idx)) {
1359            continue;
1360        }
1361        let norm = v.iter().map(|x| x * x).sum::<f32>().sqrt();
1362        norm_min = norm_min.min(norm);
1363        norm_max = norm_max.max(norm);
1364    }
1365    if active_count == 0 {
1366        norm_min = 0.0;
1367        norm_max = 0.0;
1368    }
1369    let deletion_ratio = if row_count > 0 {
1370        deleted_count as f32 / row_count as f32
1371    } else {
1372        0.0
1373    };
1374
1375    VectorSegmentStatistics {
1376        row_count,
1377        null_count,
1378        active_count,
1379        deleted_count,
1380        deletion_ratio,
1381        norm_min,
1382        norm_max,
1383        min_values: Vec::new(),
1384        max_values: Vec::new(),
1385        created_at: 0,
1386    }
1387}
1388
1389fn contains_nan_or_inf(vec: &[f32]) -> bool {
1390    vec.iter().any(|v| !v.is_finite())
1391}
1392
1393fn decode_metadata(
1394    metadata: &Option<Vec<EncodedColumn>>,
1395    rows: usize,
1396) -> Result<Vec<Vec<ScalarValue>>> {
1397    if let Some(cols) = metadata {
1398        let mut decoded_cols = Vec::with_capacity(cols.len());
1399        for col in cols {
1400            let decoder = create_decoder(col.encoding);
1401            let (column, _) = decoder
1402                .decode(&col.data, col.num_values as usize, col.logical_type)
1403                .map_err(|e| Error::InvalidFormat(e.to_string()))?;
1404            let values = column_to_scalar_values(column)?;
1405            if values.len() != rows {
1406                return Err(Error::InvalidFormat(
1407                    "metadata column length mismatch num_vectors".into(),
1408                ));
1409            }
1410            decoded_cols.push(values);
1411        }
1412        Ok(decoded_cols)
1413    } else {
1414        Ok(Vec::new())
1415    }
1416}
1417
1418fn column_to_scalar_values(column: Column) -> Result<Vec<ScalarValue>> {
1419    Ok(match column {
1420        Column::Int64(v) => v.into_iter().map(ScalarValue::Int64).collect(),
1421        Column::Float32(v) => v.into_iter().map(ScalarValue::Float32).collect(),
1422        Column::Float64(v) => v.into_iter().map(ScalarValue::Float64).collect(),
1423        Column::Bool(v) => v.into_iter().map(ScalarValue::Bool).collect(),
1424        Column::Binary(v) => v.into_iter().map(ScalarValue::Binary).collect(),
1425        Column::Fixed { values, .. } => values.into_iter().map(ScalarValue::Binary).collect(),
1426    })
1427}
1428
1429#[cfg(all(test, not(target_arch = "wasm32")))]
1430mod tests {
1431    use super::*;
1432    use crate::columnar::encoding_v2::EncodingV2;
1433    use crate::kv::{KVStore, KVTransaction};
1434    use crate::txn::TxnManager;
1435    use crate::types::TxnMode;
1436    use crate::vector::simd::DistanceKernel;
1437    use crate::MemoryKV;
1438    use crate::ScalarKernel;
1439    use std::future::Future;
1440    use std::sync::Arc;
1441    use std::task::{Context, Poll, Wake, Waker};
1442
1443    fn encode_f32(values: &[f32]) -> EncodedColumn {
1444        let encoder = create_encoder(EncodingV2::ByteStreamSplit);
1445        let data = encoder
1446            .encode(&Column::Float32(values.to_vec()), None)
1447            .unwrap();
1448        EncodedColumn {
1449            logical_type: LogicalType::Float32,
1450            encoding: EncodingV2::ByteStreamSplit,
1451            num_values: values.len() as u64,
1452            data,
1453            null_bitmap: None,
1454        }
1455    }
1456
1457    fn encode_i64(values: &[i64]) -> EncodedColumn {
1458        let encoder = create_encoder(EncodingV2::Plain);
1459        let data = encoder
1460            .encode(&Column::Int64(values.to_vec()), None)
1461            .unwrap();
1462        EncodedColumn {
1463            logical_type: LogicalType::Int64,
1464            encoding: EncodingV2::Plain,
1465            num_values: values.len() as u64,
1466            data,
1467            null_bitmap: None,
1468        }
1469    }
1470
1471    fn sample_segment() -> VectorSegment {
1472        let vectors = vec![1.0f32, 2.0, 3.0, 4.0];
1473        VectorSegment {
1474            segment_id: 42,
1475            dimension: 4,
1476            metric: Metric::Cosine,
1477            num_vectors: 1,
1478            vectors: encode_f32(&vectors),
1479            keys: encode_i64(&[0]),
1480            deleted: Bitmap::new_zeroed(1),
1481            metadata: None,
1482            statistics: VectorSegmentStatistics {
1483                row_count: 1,
1484                null_count: 0,
1485                active_count: 1,
1486                deleted_count: 0,
1487                deletion_ratio: 0.0,
1488                norm_min: 0.0,
1489                norm_max: 0.0,
1490                min_values: Vec::new(),
1491                max_values: Vec::new(),
1492                created_at: 1_735_000_000,
1493            },
1494        }
1495    }
1496
1497    #[test]
1498    fn roundtrip_with_checksum_and_segment_v2() {
1499        let seg = sample_segment();
1500        let bytes = seg.to_bytes().unwrap();
1501        let restored = VectorSegment::from_bytes(&bytes).unwrap();
1502        assert_eq!(restored.segment_id, seg.segment_id);
1503        assert_eq!(restored.dimension, seg.dimension);
1504        assert_eq!(restored.metric, seg.metric);
1505        assert_eq!(restored.num_vectors, seg.num_vectors);
1506        assert_eq!(restored.vectors.logical_type, LogicalType::Float32);
1507        assert_eq!(restored.keys.logical_type, LogicalType::Int64);
1508        assert_eq!(restored.deleted, seg.deleted);
1509        assert_eq!(restored.statistics.row_count, seg.statistics.row_count);
1510    }
1511
1512    #[test]
1513    fn checksum_mismatch_detected() {
1514        let seg = sample_segment();
1515        let mut bytes = seg.to_bytes().unwrap();
1516        let last = bytes.len() - 1;
1517        bytes[last] ^= 0xAA;
1518        let err = VectorSegment::from_bytes(&bytes).unwrap_err();
1519        assert!(matches!(err, Error::ChecksumMismatch));
1520    }
1521
1522    #[test]
1523    fn vector_segment_key_layout() {
1524        let key = key_layout::vector_segment_key(123);
1525        assert_eq!(key, b"vector_segment:123");
1526    }
1527
1528    #[test]
1529    fn validate_rejects_mismatched_lengths() {
1530        let mut seg = sample_segment();
1531        seg.num_vectors = 2; // mismatch
1532        let err = seg.to_bytes().unwrap_err();
1533        assert!(matches!(err, Error::InvalidFormat(_)));
1534    }
1535
1536    #[test]
1537    fn compute_stats_updates_norms_and_counts() {
1538        let vectors = vec![vec![3.0f32, 4.0], vec![0.0f32, 0.0]];
1539        let stats = compute_stats(&vectors, None);
1540        assert_eq!(stats.row_count, 2);
1541        assert_eq!(stats.active_count, 2);
1542        assert_eq!(stats.deleted_count, 0);
1543        // norms: 5.0 and 0.0
1544        assert!((stats.norm_min - 0.0).abs() < 1e-6);
1545        assert!((stats.norm_max - 5.0).abs() < 1e-6);
1546    }
1547
1548    #[test]
1549    fn compute_stats_respects_deleted_bitmap() {
1550        let vectors = vec![vec![1.0f32], vec![2.0f32]];
1551        let mut deleted = Bitmap::new_zeroed(2);
1552        deleted.set(1, true);
1553
1554        let stats = compute_stats(&vectors, Some(&deleted));
1555        assert_eq!(stats.row_count, 2);
1556        assert_eq!(stats.active_count, 1);
1557        assert_eq!(stats.deleted_count, 1);
1558        assert!((stats.deletion_ratio - 0.5).abs() < 1e-6);
1559        assert!((stats.norm_min - 1.0).abs() < 1e-6);
1560        assert!((stats.norm_max - 1.0).abs() < 1e-6);
1561    }
1562
1563    #[test]
1564    fn delete_batch_marks_keys_and_updates_stats() {
1565        let mut mgr = VectorStoreManager::new(VectorStoreConfig {
1566            dimension: 2,
1567            metric: Metric::InnerProduct,
1568            segment_max_vectors: 2,
1569            ..Default::default()
1570        });
1571        let keys = vec![10, 11, 12];
1572        let vecs = vec![vec![1.0, 0.0], vec![0.5, 0.5], vec![0.0, 1.0]];
1573        block_on(mgr.append_batch(&keys, &vecs)).unwrap();
1574        let seg0_id = mgr.segments[0].segment_id;
1575        let seg1_id = mgr.segments[1].segment_id;
1576
1577        // 事前に1行を削除済みにしておく(再カウント対象外)。
1578        if let Some(seg) = mgr.segments.get_mut(0) {
1579            seg.deleted.set(1, true);
1580            seg.recompute_deletion_stats();
1581        }
1582
1583        let res = block_on(mgr.delete_batch(&[11, 12, 999])).unwrap();
1584        assert_eq!(res.vectors_deleted, 1);
1585        assert_eq!(res.segments_modified, vec![seg1_id]);
1586
1587        // セグメント0は変化なし、セグメント1は全削除でdeletion_ratio=1.0。
1588        assert_eq!(mgr.segments[0].segment_id, seg0_id);
1589        assert_eq!(mgr.segments[0].statistics.deleted_count, 1);
1590        assert_eq!(mgr.segments[0].statistics.active_count, 1);
1591        assert_eq!(mgr.segments[1].segment_id, seg1_id);
1592        assert_eq!(mgr.segments[1].statistics.deleted_count, 1);
1593        assert_eq!(mgr.segments[1].statistics.active_count, 0);
1594        assert!((mgr.segments[1].statistics.deletion_ratio - 1.0).abs() < 1e-6);
1595    }
1596
1597    #[test]
1598    fn delete_batch_empty_input_noop() {
1599        let mut mgr = VectorStoreManager::new(VectorStoreConfig {
1600            dimension: 2,
1601            metric: Metric::InnerProduct,
1602            segment_max_vectors: 2,
1603            ..Default::default()
1604        });
1605        let keys = vec![10, 11];
1606        let vecs = vec![vec![1.0, 0.0], vec![0.0, 1.0]];
1607        block_on(mgr.append_batch(&keys, &vecs)).unwrap();
1608
1609        let res = block_on(mgr.delete_batch(&[])).unwrap();
1610        assert_eq!(res.vectors_deleted, 0);
1611        assert!(res.segments_modified.is_empty());
1612        // stats unchanged
1613        assert_eq!(mgr.segments[0].statistics.deleted_count, 0);
1614        assert_eq!(mgr.segments[0].statistics.active_count, 2);
1615    }
1616
1617    #[test]
1618    fn delete_batch_ignores_nonexistent_and_already_deleted() {
1619        let mut mgr = VectorStoreManager::new(VectorStoreConfig {
1620            dimension: 2,
1621            metric: Metric::InnerProduct,
1622            segment_max_vectors: 2,
1623            ..Default::default()
1624        });
1625        let keys = vec![1, 2, 3];
1626        let vecs = vec![vec![1.0, 0.0], vec![0.0, 1.0], vec![0.2, 0.8]];
1627        block_on(mgr.append_batch(&keys, &vecs)).unwrap();
1628
1629        // mark key 2 as already deleted
1630        mgr.segments[0].deleted.set(1, true);
1631        mgr.segments[0].recompute_deletion_stats();
1632
1633        let res = block_on(mgr.delete_batch(&[2, 3, 999])).unwrap();
1634        assert_eq!(res.vectors_deleted, 1); // only key 3 transitions false->true
1635        assert_eq!(res.segments_modified, vec![mgr.segments[1].segment_id]);
1636
1637        // stats reflect only new deletion for key 3
1638        assert_eq!(mgr.segments[0].statistics.deleted_count, 1);
1639        assert_eq!(mgr.segments[0].statistics.active_count, 1);
1640        assert_eq!(mgr.segments[1].statistics.deleted_count, 1);
1641        assert_eq!(mgr.segments[1].statistics.active_count, 0);
1642    }
1643
1644    #[test]
1645    fn segments_needing_compaction_respects_thresholds() {
1646        let mut mgr = VectorStoreManager::new(VectorStoreConfig {
1647            dimension: 2,
1648            metric: Metric::InnerProduct,
1649            segment_max_vectors: 2,
1650            compaction_threshold: 0.5,
1651            ..Default::default()
1652        });
1653        let keys = vec![1, 2, 3, 4];
1654        let vecs = vec![
1655            vec![1.0, 0.0],
1656            vec![0.0, 1.0],
1657            vec![0.5, 0.5],
1658            vec![0.2, 0.8],
1659        ];
1660        block_on(mgr.append_batch(&keys, &vecs)).unwrap();
1661        let seg0 = mgr.segments[0].segment_id;
1662
1663        // mark one row deleted in first segment -> deletion_ratio=0.5
1664        if let Some(seg) = mgr.segments.get_mut(0) {
1665            seg.deleted.set(0, true);
1666            seg.recompute_deletion_stats();
1667        }
1668
1669        let mut ids = mgr.segments_needing_compaction();
1670        assert_eq!(ids, vec![seg0]);
1671
1672        mgr.config.compaction_threshold = 1.0;
1673        assert!(mgr.segments_needing_compaction().is_empty());
1674
1675        mgr.config.compaction_threshold = 0.0;
1676        ids = mgr.segments_needing_compaction();
1677        assert_eq!(ids, vec![seg0]);
1678
1679        // compact and ensure it drops from the list
1680        block_on(mgr.compact_segment(seg0)).unwrap();
1681        mgr.config.compaction_threshold = 0.5;
1682        assert!(mgr.segments_needing_compaction().is_empty());
1683    }
1684
1685    #[test]
1686    fn compact_segment_removes_deleted_and_resets_stats() {
1687        let mut mgr = VectorStoreManager::new(VectorStoreConfig {
1688            dimension: 2,
1689            metric: Metric::InnerProduct,
1690            segment_max_vectors: 4,
1691            ..Default::default()
1692        });
1693        let keys = vec![1, 2, 3];
1694        let vecs = vec![vec![1.0, 0.0], vec![0.5, 0.5], vec![0.0, 1.0]];
1695        block_on(mgr.append_batch(&keys, &vecs)).unwrap();
1696        let old_id = mgr.segments[0].segment_id;
1697        mgr.segments[0].deleted.set(1, true);
1698        mgr.segments[0].recompute_deletion_stats();
1699
1700        let res = block_on(mgr.compact_segment(old_id)).unwrap();
1701        let new_id = res.new_segment_id.expect("new segment");
1702        assert_eq!(res.old_segment_id, old_id);
1703        assert_eq!(res.vectors_removed, 1);
1704
1705        let new_seg = mgr
1706            .segments
1707            .iter()
1708            .find(|s| s.segment_id == new_id)
1709            .expect("segment exists");
1710        assert_eq!(new_seg.num_vectors, 2);
1711        assert_eq!(new_seg.statistics.deleted_count, 0);
1712        assert_eq!(new_seg.statistics.active_count, 2);
1713        assert_eq!(new_seg.statistics.deletion_ratio, 0.0);
1714    }
1715
1716    #[test]
1717    fn compact_segment_handles_all_deleted() {
1718        let mut mgr = VectorStoreManager::new(VectorStoreConfig {
1719            dimension: 2,
1720            metric: Metric::InnerProduct,
1721            segment_max_vectors: 4,
1722            ..Default::default()
1723        });
1724        let keys = vec![1, 2];
1725        let vecs = vec![vec![1.0, 0.0], vec![0.0, 1.0]];
1726        block_on(mgr.append_batch(&keys, &vecs)).unwrap();
1727        let old_id = mgr.segments[0].segment_id;
1728        mgr.segments[0].deleted.set(0, true);
1729        mgr.segments[0].deleted.set(1, true);
1730        mgr.segments[0].recompute_deletion_stats();
1731
1732        let res = block_on(mgr.compact_segment(old_id)).unwrap();
1733        assert_eq!(res.old_segment_id, old_id);
1734        assert_eq!(res.new_segment_id, None);
1735        assert_eq!(res.vectors_removed, 2);
1736        assert!(mgr.segments.iter().all(|s| s.segment_id != old_id));
1737    }
1738
1739    #[test]
1740    fn compact_segment_errors_on_missing() {
1741        let mut mgr = VectorStoreManager::new(VectorStoreConfig {
1742            dimension: 2,
1743            metric: Metric::InnerProduct,
1744            ..Default::default()
1745        });
1746        let err = block_on(mgr.compact_segment(999)).unwrap_err();
1747        assert!(matches!(err, Error::NotFound));
1748    }
1749
1750    #[test]
1751    fn search_skips_deleted_rows_and_prunes_empty_segments() {
1752        let mut mgr = VectorStoreManager::new(VectorStoreConfig {
1753            dimension: 2,
1754            metric: Metric::InnerProduct,
1755            segment_max_vectors: 2,
1756            ..Default::default()
1757        });
1758        let keys = vec![1, 2, 3, 4];
1759        let vecs = vec![
1760            vec![1.0, 0.0], // seg0
1761            vec![0.0, 1.0], // seg0
1762            vec![0.5, 0.5], // seg1
1763            vec![0.2, 0.8], // seg1
1764        ];
1765        block_on(mgr.append_batch(&keys, &vecs)).unwrap();
1766
1767        // delete all in first segment and one in second segment
1768        mgr.segments[0].deleted.set(0, true);
1769        mgr.segments[0].deleted.set(1, true);
1770        mgr.segments[0].recompute_deletion_stats();
1771        mgr.segments[1].deleted.set(1, true);
1772        mgr.segments[1].recompute_deletion_stats();
1773
1774        let params = VectorSearchParams {
1775            query: vec![0.5, 0.5],
1776            metric: Metric::InnerProduct,
1777            top_k: 10,
1778            projection: None,
1779            filter_mask: None,
1780        };
1781        let (results, stats) = mgr.search_with_stats(params).unwrap();
1782
1783        // seg0 should be pruned (deletion_ratio==1.0), seg1 scanned with one active row.
1784        assert_eq!(stats.segments_pruned, 1);
1785        assert_eq!(stats.segments_scanned, 1);
1786        assert_eq!(stats.rows_scanned, 2); // segment size before deletion
1787        assert_eq!(stats.rows_matched, 1); // only non-deleted row remains
1788        assert_eq!(results.len(), 1);
1789        assert_eq!(results[0].row_id, 3);
1790    }
1791
1792    #[test]
1793    fn delete_compact_search_flow() {
1794        let mut mgr = VectorStoreManager::new(VectorStoreConfig {
1795            dimension: 2,
1796            metric: Metric::InnerProduct,
1797            segment_max_vectors: 10,
1798            ..Default::default()
1799        });
1800        let keys = vec![1, 2, 3];
1801        let vecs = vec![vec![1.0, 0.0], vec![0.0, 1.0], vec![0.5, 0.5]];
1802        block_on(mgr.append_batch(&keys, &vecs)).unwrap();
1803        let seg_id = mgr.segments[0].segment_id;
1804
1805        // Flow1: initial search then delete and confirm removal.
1806        let params = VectorSearchParams {
1807            query: vec![1.0, 0.0],
1808            metric: Metric::InnerProduct,
1809            top_k: 10,
1810            projection: None,
1811            filter_mask: None,
1812        };
1813        let (results, stats) = mgr.search_with_stats(params.clone()).unwrap();
1814        assert_eq!(results.len(), 3);
1815        assert_eq!(stats.segments_scanned, 1);
1816        assert_eq!(stats.rows_matched, 3);
1817
1818        let del_res = block_on(mgr.delete_batch(&[2])).unwrap();
1819        assert_eq!(del_res.vectors_deleted, 1);
1820        assert_eq!(mgr.segments[0].statistics.deleted_count, 1);
1821        assert_eq!(mgr.segments[0].statistics.active_count, 2);
1822
1823        let (results_after_del, stats_after_del) = mgr.search_with_stats(params.clone()).unwrap();
1824        assert_eq!(results_after_del.len(), 2);
1825        let ids: Vec<_> = results_after_del.iter().map(|r| r.row_id).collect();
1826        assert_eq!(ids, vec![1, 3]);
1827        assert_eq!(stats_after_del.rows_matched, 2);
1828
1829        // Flow2: compact removes deleted rows and resets stats.
1830        let comp_res = block_on(mgr.compact_segment(seg_id)).unwrap();
1831        let new_id = comp_res.new_segment_id.expect("new segment");
1832        assert_eq!(comp_res.vectors_removed, 1);
1833        let seg = mgr
1834            .segments
1835            .iter()
1836            .find(|s| s.segment_id == new_id)
1837            .unwrap();
1838        assert_eq!(seg.statistics.deleted_count, 0);
1839        assert_eq!(seg.statistics.active_count, 2);
1840        assert_eq!(seg.statistics.deletion_ratio, 0.0);
1841
1842        let (results_after_compact, _) = mgr.search_with_stats(params.clone()).unwrap();
1843        let ids: Vec<_> = results_after_compact.iter().map(|r| r.row_id).collect();
1844        assert_eq!(ids, vec![1, 3]);
1845
1846        // Flow3: delete remaining rows -> compact -> search empty.
1847        block_on(mgr.delete_batch(&[1, 3])).unwrap();
1848        let comp_res2 = block_on(mgr.compact_segment(new_id)).unwrap();
1849        assert_eq!(comp_res2.new_segment_id, None);
1850        assert!(mgr.segments.is_empty());
1851
1852        let (results_final, stats_final) = mgr.search_with_stats(params).unwrap();
1853        assert!(results_final.is_empty());
1854        assert_eq!(stats_final.segments_scanned, 0);
1855    }
1856
1857    #[test]
1858    fn vector_store_append_and_search_with_filter_and_projection() {
1859        let mut mgr = VectorStoreManager::new(VectorStoreConfig {
1860            dimension: 2,
1861            metric: Metric::InnerProduct,
1862            segment_max_vectors: 2,
1863            ..Default::default()
1864        });
1865        let keys = vec![10, 11, 12];
1866        let vecs = vec![vec![1.0, 0.0], vec![0.5, 0.5], vec![0.0, 1.0]];
1867        block_on(mgr.append_batch(&keys, &vecs)).unwrap();
1868
1869        // set metadata for projection (one column of ints)
1870        if let Some(seg) = mgr.segments.get_mut(0) {
1871            let meta_col = encode_generic_column(
1872                Column::Int64(vec![100, 200]),
1873                None,
1874                LogicalType::Int64,
1875                EncodingV2::Plain,
1876            )
1877            .unwrap();
1878            seg.metadata = Some(vec![meta_col]);
1879        }
1880        if let Some(seg) = mgr.segments.get_mut(1) {
1881            let meta_col = encode_generic_column(
1882                Column::Int64(vec![300]),
1883                None,
1884                LogicalType::Int64,
1885                EncodingV2::Plain,
1886            )
1887            .unwrap();
1888            seg.metadata = Some(vec![meta_col]);
1889        }
1890
1891        // filter out the middle row, project metadata column 0
1892        let params = VectorSearchParams {
1893            query: vec![1.0, 0.0],
1894            metric: Metric::InnerProduct,
1895            top_k: 3,
1896            projection: Some(vec![0]),
1897            filter_mask: Some(vec![true, false, true]),
1898        };
1899        let (results, stats) = mgr.search_with_stats(params).unwrap();
1900        assert_eq!(stats.rows_scanned, 3);
1901        assert_eq!(stats.segments_scanned, 2);
1902        assert_eq!(stats.rows_matched, 2);
1903        assert_eq!(results.len(), 2);
1904        // first result should be key 10 with metadata 100
1905        assert_eq!(results[0].row_id, 10);
1906        assert_eq!(results[0].columns, vec![ScalarValue::Int64(100)]);
1907    }
1908
1909    #[test]
1910    fn vector_store_topk_is_deterministic_on_ties() {
1911        let mut mgr = VectorStoreManager::new(VectorStoreConfig {
1912            dimension: 2,
1913            metric: Metric::InnerProduct,
1914            segment_max_vectors: 3,
1915            ..Default::default()
1916        });
1917        // All vectors have identical scores; ordering should fall back to row_id ascending.
1918        let keys = vec![20, 10, 30];
1919        let vecs = vec![vec![1.0, 0.0], vec![1.0, 0.0], vec![1.0, 0.0]];
1920        block_on(mgr.append_batch(&keys, &vecs)).unwrap();
1921
1922        let params = VectorSearchParams {
1923            query: vec![1.0, 0.0],
1924            metric: Metric::InnerProduct,
1925            top_k: 3,
1926            projection: None,
1927            filter_mask: None,
1928        };
1929        let (results, stats) = mgr.search_with_stats(params).unwrap();
1930        assert_eq!(stats.rows_scanned, 3);
1931        assert_eq!(results.len(), 3);
1932        let row_ids: Vec<_> = results.iter().map(|r| r.row_id).collect();
1933        assert_eq!(row_ids, vec![10, 20, 30]);
1934    }
1935
1936    #[test]
1937    fn vector_store_end_to_end_with_kvs_roundtrip() {
1938        let mut mgr = VectorStoreManager::new(VectorStoreConfig {
1939            dimension: 2,
1940            metric: Metric::InnerProduct,
1941            segment_max_vectors: 2,
1942            ..Default::default()
1943        });
1944        let keys = vec![1, 2, 3];
1945        let vecs = vec![vec![1.0, 0.0], vec![0.0, 1.0], vec![0.6, 0.8]];
1946        block_on(mgr.append_batch(&keys, &vecs)).unwrap();
1947
1948        // 簡易メタデータを各セグメントに付与(整数1列)。
1949        if let Some(seg) = mgr.segments.get_mut(0) {
1950            let meta_col = encode_generic_column(
1951                Column::Int64(vec![100, 200]),
1952                None,
1953                LogicalType::Int64,
1954                EncodingV2::Plain,
1955            )
1956            .unwrap();
1957            seg.metadata = Some(vec![meta_col]);
1958        }
1959        if let Some(seg) = mgr.segments.get_mut(1) {
1960            let meta_col = encode_generic_column(
1961                Column::Int64(vec![300]),
1962                None,
1963                LogicalType::Int64,
1964                EncodingV2::Plain,
1965            )
1966            .unwrap();
1967            seg.metadata = Some(vec![meta_col]);
1968        }
1969
1970        // 永続化: VectorSegment を KVS に保存。
1971        let store = MemoryKV::new();
1972        {
1973            let manager = store.txn_manager();
1974            let mut txn = store.begin(TxnMode::ReadWrite).unwrap();
1975            for seg in &mgr.segments {
1976                let key = key_layout::vector_segment_key(seg.segment_id);
1977                let bytes = seg.to_bytes().unwrap();
1978                txn.put(key, bytes).unwrap();
1979            }
1980            manager.commit(txn).unwrap();
1981        }
1982
1983        // 復元: KVS から VectorSegment を読み出して新しいマネージャに投入。
1984        let mut restored = VectorStoreManager::new(mgr.config.clone());
1985        restored.next_segment_id = mgr.next_segment_id;
1986        {
1987            let mut txn = store.begin(TxnMode::ReadOnly).unwrap();
1988            for seg in &mgr.segments {
1989                let key = key_layout::vector_segment_key(seg.segment_id);
1990                let bytes = txn.get(&key).unwrap().unwrap();
1991                let decoded = VectorSegment::from_bytes(&bytes).unwrap();
1992                restored.segments.push(decoded);
1993            }
1994        }
1995
1996        let params = VectorSearchParams {
1997            query: vec![1.0, 0.0],
1998            metric: Metric::InnerProduct,
1999            top_k: 3,
2000            projection: Some(vec![0]),
2001            filter_mask: Some(vec![true, true, true]),
2002        };
2003        let (results, _stats) = restored.search_with_stats(params.clone()).unwrap();
2004        assert_eq!(results.len(), 3);
2005        // 期待される並び(スコアDESC、同スコアはrow_id ASC)とプロジェクション値を計算。
2006        let scalar = ScalarKernel;
2007        let expected = vec![
2008            (
2009                keys[0],
2010                scalar.inner_product(&params.query, &vecs[0]),
2011                ScalarValue::Int64(100),
2012            ),
2013            (
2014                keys[1],
2015                scalar.inner_product(&params.query, &vecs[1]),
2016                ScalarValue::Int64(200),
2017            ),
2018            (
2019                keys[2],
2020                scalar.inner_product(&params.query, &vecs[2]),
2021                ScalarValue::Int64(300),
2022            ),
2023        ];
2024        let mut expected_sorted = expected.clone();
2025        expected_sorted.sort_by(|a, b| {
2026            b.1.partial_cmp(&a.1)
2027                .unwrap_or(std::cmp::Ordering::Equal)
2028                .then_with(|| a.0.cmp(&b.0))
2029        });
2030
2031        for ((exp_id, _, exp_col), got) in expected_sorted.iter().zip(results.iter()) {
2032            assert_eq!(got.row_id, *exp_id);
2033            assert_same_scalar(exp_col, got.columns.first().unwrap());
2034        }
2035    }
2036
2037    fn assert_same_scalar(expected: &ScalarValue, actual: &ScalarValue) {
2038        match (expected, actual) {
2039            (ScalarValue::Int64(a), ScalarValue::Int64(b)) => assert_eq!(a, b),
2040            (ScalarValue::Float32(a), ScalarValue::Float32(b)) => assert!((a - b).abs() < 1e-5),
2041            (ScalarValue::Float64(a), ScalarValue::Float64(b)) => assert!((a - b).abs() < 1e-8),
2042            (ScalarValue::Bool(a), ScalarValue::Bool(b)) => assert_eq!(a, b),
2043            (ScalarValue::Binary(a), ScalarValue::Binary(b)) => assert_eq!(a, b),
2044            other => panic!("scalar mismatch: {:?}", other),
2045        }
2046    }
2047
2048    fn block_on<F: Future>(fut: F) -> F::Output {
2049        struct Noop;
2050        impl Wake for Noop {
2051            fn wake(self: Arc<Self>) {}
2052            fn wake_by_ref(self: &Arc<Self>) {}
2053        }
2054        let waker = Waker::from(Arc::new(Noop));
2055        let mut cx = Context::from_waker(&waker);
2056        let mut fut = std::pin::pin!(fut);
2057        loop {
2058            match fut.as_mut().poll(&mut cx) {
2059                Poll::Ready(val) => return val,
2060                Poll::Pending => std::thread::yield_now(),
2061            }
2062        }
2063    }
2064}