Skip to main content

lance_index/vector/
quantizer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use core::fmt;
5use std::str::FromStr;
6use std::sync::Arc;
7use std::{collections::HashMap, fmt::Debug};
8
9use arrow::{array::AsArray, compute::concat_batches, datatypes::UInt64Type};
10use arrow_array::{Array, ArrayRef, FixedSizeListArray, RecordBatch, UInt32Array, UInt64Array};
11use arrow_schema::Field;
12use async_trait::async_trait;
13use bytes::Bytes;
14use deepsize::DeepSizeOf;
15use lance_arrow::RecordBatchExt;
16use lance_core::{Error, ROW_ID, Result};
17use lance_file::previous::reader::FileReader as PreviousFileReader;
18use lance_io::traits::Reader;
19use lance_linalg::distance::DistanceType;
20use lance_table::format::SelfDescribingFileReader;
21use serde::{Deserialize, Serialize};
22
23use super::flat::index::{FlatBinQuantizer, FlatQuantizer};
24use super::pq::ProductQuantizer;
25use super::{ivf::storage::IvfModel, sq::ScalarQuantizer, storage::VectorStore};
26use crate::frag_reuse::FragReuseIndex;
27use crate::vector::bq::builder::RabitQuantizer;
28use crate::{INDEX_METADATA_SCHEMA_KEY, IndexMetadata};
29
30pub trait Quantization:
31    Send
32    + Sync
33    + Clone
34    + Debug
35    + DeepSizeOf
36    + Into<Quantizer>
37    + TryFrom<Quantizer, Error = lance_core::Error>
38{
39    type BuildParams: QuantizerBuildParams + Send + Sync;
40    type Metadata: QuantizerMetadata + Send + Sync;
41    type Storage: QuantizerStorage<Metadata = Self::Metadata> + Debug;
42
43    fn build(
44        data: &dyn Array,
45        distance_type: DistanceType,
46        params: &Self::BuildParams,
47    ) -> Result<Self>;
48    fn retrain(&mut self, data: &dyn Array) -> Result<()>;
49    fn code_dim(&self) -> usize;
50    fn column(&self) -> &'static str;
51    fn use_residual(_: DistanceType) -> bool {
52        false
53    }
54    fn quantize(&self, vectors: &dyn Array) -> Result<ArrayRef>;
55    fn metadata_key() -> &'static str;
56    fn quantization_type() -> QuantizationType;
57    fn metadata(&self, _: Option<QuantizationMetadata>) -> Self::Metadata;
58    fn from_metadata(metadata: &Self::Metadata, distance_type: DistanceType) -> Result<Quantizer>;
59    fn field(&self) -> Field;
60    fn extra_fields(&self) -> Vec<Field> {
61        vec![]
62    }
63}
64
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub enum QuantizationType {
67    Flat,
68    FlatBin,
69    Product,
70    Scalar,
71    Rabit,
72}
73
74impl FromStr for QuantizationType {
75    type Err = Error;
76
77    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
78        match s {
79            "FLAT" => Ok(Self::Flat),
80            "FLATBIN" => Ok(Self::FlatBin),
81            "PQ" => Ok(Self::Product),
82            "SQ" => Ok(Self::Scalar),
83            "RABIT" => Ok(Self::Rabit),
84            _ => Err(Error::index(format!("Unknown quantization type: {}", s))),
85        }
86    }
87}
88
89impl std::fmt::Display for QuantizationType {
90    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91        match self {
92            Self::Flat => write!(f, "FLAT"),
93            Self::FlatBin => write!(f, "FLATBIN"),
94            Self::Product => write!(f, "PQ"),
95            Self::Scalar => write!(f, "SQ"),
96            Self::Rabit => write!(f, "RQ"),
97        }
98    }
99}
100
101pub trait QuantizerBuildParams: Send + Sync {
102    fn sample_size(&self) -> usize;
103    fn use_residual(_: DistanceType) -> bool {
104        false
105    }
106}
107
108impl QuantizerBuildParams for () {
109    fn sample_size(&self) -> usize {
110        0
111    }
112}
113
114/// Quantization Method.
115///
116/// <section class="warning">
117/// Internal use only. End-user does not use this directly.
118/// </section>
119#[derive(Debug, Clone, DeepSizeOf)]
120pub enum Quantizer {
121    Flat(FlatQuantizer),
122    FlatBin(FlatBinQuantizer),
123    Product(ProductQuantizer),
124    Scalar(ScalarQuantizer),
125    Rabit(RabitQuantizer),
126}
127
128impl Quantizer {
129    pub fn code_dim(&self) -> usize {
130        match self {
131            Self::Flat(fq) => fq.code_dim(),
132            Self::FlatBin(fq) => fq.code_dim(),
133            Self::Product(pq) => pq.code_dim(),
134            Self::Scalar(sq) => sq.code_dim(),
135            Self::Rabit(rq) => rq.code_dim(),
136        }
137    }
138
139    pub fn column(&self) -> &'static str {
140        match self {
141            Self::Flat(fq) => fq.column(),
142            Self::FlatBin(fq) => fq.column(),
143            Self::Product(pq) => pq.column(),
144            Self::Scalar(sq) => sq.column(),
145            Self::Rabit(rq) => rq.column(),
146        }
147    }
148
149    pub fn metadata_key(&self) -> &'static str {
150        match self {
151            Self::Flat(_) => FlatQuantizer::metadata_key(),
152            Self::FlatBin(_) => FlatBinQuantizer::metadata_key(),
153            Self::Product(_) => ProductQuantizer::metadata_key(),
154            Self::Scalar(_) => ScalarQuantizer::metadata_key(),
155            Self::Rabit(_) => RabitQuantizer::metadata_key(),
156        }
157    }
158
159    pub fn quantization_type(&self) -> QuantizationType {
160        match self {
161            Self::Flat(_) => QuantizationType::Flat,
162            Self::FlatBin(_) => QuantizationType::FlatBin,
163            Self::Product(_) => QuantizationType::Product,
164            Self::Scalar(_) => QuantizationType::Scalar,
165            Self::Rabit(_) => QuantizationType::Rabit,
166        }
167    }
168
169    pub fn metadata(&self, args: Option<QuantizationMetadata>) -> Result<serde_json::Value> {
170        let metadata = match self {
171            Self::Flat(fq) => serde_json::to_value(fq.metadata(args))?,
172            Self::FlatBin(fq) => serde_json::to_value(fq.metadata(args))?,
173            Self::Product(pq) => serde_json::to_value(pq.metadata(args))?,
174            Self::Scalar(sq) => serde_json::to_value(sq.metadata(args))?,
175            Self::Rabit(rq) => serde_json::to_value(rq.metadata(args))?,
176        };
177        Ok(metadata)
178    }
179}
180
181impl From<ProductQuantizer> for Quantizer {
182    fn from(pq: ProductQuantizer) -> Self {
183        Self::Product(pq)
184    }
185}
186
187impl From<ScalarQuantizer> for Quantizer {
188    fn from(sq: ScalarQuantizer) -> Self {
189        Self::Scalar(sq)
190    }
191}
192
193#[derive(Debug, Clone, Default)]
194pub struct QuantizationMetadata {
195    // For PQ
196    pub codebook_position: Option<usize>,
197    pub codebook: Option<FixedSizeListArray>,
198    pub transposed: bool,
199}
200
201#[async_trait]
202pub trait QuantizerMetadata:
203    fmt::Debug + Clone + Sized + DeepSizeOf + for<'a> Deserialize<'a> + Serialize
204{
205    // the extra metadata index in global buffer
206    fn buffer_index(&self) -> Option<u32> {
207        None
208    }
209
210    fn set_buffer_index(&mut self, _: u32) {
211        // do nothing
212    }
213
214    // parse the extra metadata bytes from global buffer,
215    // and set the metadata fields
216    fn parse_buffer(&mut self, _bytes: Bytes) -> Result<()> {
217        Ok(())
218    }
219
220    // the metadata that should be stored in global buffer
221    fn extra_metadata(&self) -> Result<Option<Bytes>> {
222        Ok(None)
223    }
224
225    async fn load(reader: &PreviousFileReader) -> Result<Self>;
226}
227
228#[async_trait::async_trait]
229pub trait QuantizerStorage: Clone + Sized + DeepSizeOf + VectorStore {
230    type Metadata: QuantizerMetadata;
231
232    /// Create a [QuantizerStorage] from a [RecordBatch].
233    /// The batch should consist of row IDs and quantized vector.
234    fn try_from_batch(
235        batch: RecordBatch,
236        metadata: &Self::Metadata,
237        distance_type: DistanceType,
238        frag_reuse_index: Option<Arc<FragReuseIndex>>,
239    ) -> Result<Self>;
240
241    fn metadata(&self) -> &Self::Metadata;
242
243    fn remap(&self, mapping: &HashMap<u64, Option<u64>>) -> Result<Self> {
244        let batches = self
245            .to_batches()?
246            .map(|b| {
247                let mut indices = Vec::with_capacity(b.num_rows());
248                let mut new_row_ids = Vec::with_capacity(b.num_rows());
249
250                let row_ids = b.column(0).as_primitive::<UInt64Type>().values();
251                for (i, row_id) in row_ids.iter().enumerate() {
252                    match mapping.get(row_id) {
253                        Some(Some(new_id)) => {
254                            indices.push(i as u32);
255                            new_row_ids.push(*new_id);
256                        }
257                        Some(None) => {}
258                        None => {
259                            indices.push(i as u32);
260                            new_row_ids.push(*row_id);
261                        }
262                    }
263                }
264
265                let indices = UInt32Array::from(indices);
266                let new_row_ids = UInt64Array::from(new_row_ids);
267                let b = b
268                    .take(&indices)?
269                    .replace_column_by_name(ROW_ID, Arc::new(new_row_ids))?;
270                Ok(b)
271            })
272            .collect::<Result<Vec<_>>>()?;
273
274        let batch = concat_batches(self.schema(), batches.iter())?;
275        Self::try_from_batch(batch, self.metadata(), self.distance_type(), None)
276    }
277
278    async fn load_partition(
279        reader: &PreviousFileReader,
280        range: std::ops::Range<usize>,
281        distance_type: DistanceType,
282        metadata: &Self::Metadata,
283        frag_reuse_index: Option<Arc<FragReuseIndex>>,
284    ) -> Result<Self>;
285}
286
287/// Loader to load partitioned [VectorStore] from disk.
288pub struct IvfQuantizationStorage<Q: Quantization> {
289    reader: PreviousFileReader,
290
291    distance_type: DistanceType,
292    quantizer: Quantizer,
293    metadata: Q::Metadata,
294
295    ivf: IvfModel,
296}
297
298impl<Q: Quantization> DeepSizeOf for IvfQuantizationStorage<Q> {
299    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
300        self.reader.deep_size_of_children(context)
301            + self.quantizer.deep_size_of_children(context)
302            + self.metadata.deep_size_of_children(context)
303            + self.ivf.deep_size_of_children(context)
304    }
305}
306
307impl<Q: Quantization> Clone for IvfQuantizationStorage<Q> {
308    fn clone(&self) -> Self {
309        Self {
310            reader: self.reader.clone(),
311            distance_type: self.distance_type,
312            quantizer: self.quantizer.clone(),
313            metadata: self.metadata.clone(),
314            ivf: self.ivf.clone(),
315        }
316    }
317}
318
319impl<Q: Quantization> IvfQuantizationStorage<Q> {
320    /// Open a Loader.
321    ///
322    ///
323    pub async fn open(reader: Arc<dyn Reader>) -> Result<Self> {
324        let reader = PreviousFileReader::try_new_self_described_from_reader(reader, None).await?;
325        let schema = reader.schema();
326
327        let metadata_str = schema
328            .metadata
329            .get(INDEX_METADATA_SCHEMA_KEY)
330            .ok_or(Error::index(format!(
331                "Reading quantization storage: index key {} not found",
332                INDEX_METADATA_SCHEMA_KEY
333            )))?;
334        let index_metadata: IndexMetadata = serde_json::from_str(metadata_str).map_err(|_| {
335            Error::index(format!("Failed to parse index metadata: {}", metadata_str))
336        })?;
337        let distance_type = DistanceType::try_from(index_metadata.distance_type.as_str())?;
338
339        let ivf_data = IvfModel::load(&reader).await?;
340
341        let metadata = Q::Metadata::load(&reader).await?;
342        let quantizer = Q::from_metadata(&metadata, distance_type)?;
343        Ok(Self {
344            reader,
345            distance_type,
346            quantizer,
347            metadata,
348            ivf: ivf_data,
349        })
350    }
351
352    pub fn distance_type(&self) -> DistanceType {
353        self.distance_type
354    }
355
356    pub fn quantizer(&self) -> &Quantizer {
357        &self.quantizer
358    }
359
360    pub fn metadata(&self) -> &Q::Metadata {
361        &self.metadata
362    }
363
364    /// Get the number of partitions in the storage.
365    pub fn num_partitions(&self) -> usize {
366        self.ivf.num_partitions()
367    }
368
369    /// Load one partition of vector storage.
370    ///
371    /// # Parameters
372    /// - `part_id`, partition id
373    ///
374    ///
375    pub async fn load_partition(&self, part_id: usize) -> Result<Q::Storage> {
376        let range = self.ivf.row_range(part_id);
377        Q::Storage::load_partition(
378            &self.reader,
379            range,
380            self.distance_type,
381            &self.metadata,
382            None,
383        )
384        .await
385    }
386}