1use core::fmt;
5use std::str::FromStr;
6use std::sync::Arc;
7use std::{collections::HashMap, fmt::Debug};
8
9use arrow::{array::AsArray, compute::concat_batches, datatypes::UInt64Type};
10use arrow_array::{Array, ArrayRef, FixedSizeListArray, RecordBatch, UInt32Array, UInt64Array};
11use arrow_schema::Field;
12use async_trait::async_trait;
13use bytes::Bytes;
14use deepsize::DeepSizeOf;
15use lance_arrow::RecordBatchExt;
16use lance_core::{Error, ROW_ID, Result};
17use lance_file::previous::reader::FileReader as PreviousFileReader;
18use lance_io::traits::Reader;
19use lance_linalg::distance::DistanceType;
20use lance_table::format::SelfDescribingFileReader;
21use serde::{Deserialize, Serialize};
22
23use super::flat::index::{FlatBinQuantizer, FlatQuantizer};
24use super::pq::ProductQuantizer;
25use super::{ivf::storage::IvfModel, sq::ScalarQuantizer, storage::VectorStore};
26use crate::frag_reuse::FragReuseIndex;
27use crate::vector::bq::builder::RabitQuantizer;
28use crate::{INDEX_METADATA_SCHEMA_KEY, IndexMetadata};
29
30pub trait Quantization:
31 Send
32 + Sync
33 + Clone
34 + Debug
35 + DeepSizeOf
36 + Into<Quantizer>
37 + TryFrom<Quantizer, Error = lance_core::Error>
38{
39 type BuildParams: QuantizerBuildParams + Send + Sync;
40 type Metadata: QuantizerMetadata + Send + Sync;
41 type Storage: QuantizerStorage<Metadata = Self::Metadata> + Debug;
42
43 fn build(
44 data: &dyn Array,
45 distance_type: DistanceType,
46 params: &Self::BuildParams,
47 ) -> Result<Self>;
48 fn retrain(&mut self, data: &dyn Array) -> Result<()>;
49 fn code_dim(&self) -> usize;
50 fn column(&self) -> &'static str;
51 fn use_residual(_: DistanceType) -> bool {
52 false
53 }
54 fn quantize(&self, vectors: &dyn Array) -> Result<ArrayRef>;
55 fn metadata_key() -> &'static str;
56 fn quantization_type() -> QuantizationType;
57 fn metadata(&self, _: Option<QuantizationMetadata>) -> Self::Metadata;
58 fn from_metadata(metadata: &Self::Metadata, distance_type: DistanceType) -> Result<Quantizer>;
59 fn field(&self) -> Field;
60 fn extra_fields(&self) -> Vec<Field> {
61 vec![]
62 }
63}
64
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub enum QuantizationType {
67 Flat,
68 FlatBin,
69 Product,
70 Scalar,
71 Rabit,
72}
73
74impl FromStr for QuantizationType {
75 type Err = Error;
76
77 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
78 match s {
79 "FLAT" => Ok(Self::Flat),
80 "FLATBIN" => Ok(Self::FlatBin),
81 "PQ" => Ok(Self::Product),
82 "SQ" => Ok(Self::Scalar),
83 "RABIT" => Ok(Self::Rabit),
84 _ => Err(Error::index(format!("Unknown quantization type: {}", s))),
85 }
86 }
87}
88
89impl std::fmt::Display for QuantizationType {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 match self {
92 Self::Flat => write!(f, "FLAT"),
93 Self::FlatBin => write!(f, "FLATBIN"),
94 Self::Product => write!(f, "PQ"),
95 Self::Scalar => write!(f, "SQ"),
96 Self::Rabit => write!(f, "RQ"),
97 }
98 }
99}
100
101pub trait QuantizerBuildParams: Send + Sync {
102 fn sample_size(&self) -> usize;
103 fn use_residual(_: DistanceType) -> bool {
104 false
105 }
106}
107
108impl QuantizerBuildParams for () {
109 fn sample_size(&self) -> usize {
110 0
111 }
112}
113
114#[derive(Debug, Clone, DeepSizeOf)]
120pub enum Quantizer {
121 Flat(FlatQuantizer),
122 FlatBin(FlatBinQuantizer),
123 Product(ProductQuantizer),
124 Scalar(ScalarQuantizer),
125 Rabit(RabitQuantizer),
126}
127
128impl Quantizer {
129 pub fn code_dim(&self) -> usize {
130 match self {
131 Self::Flat(fq) => fq.code_dim(),
132 Self::FlatBin(fq) => fq.code_dim(),
133 Self::Product(pq) => pq.code_dim(),
134 Self::Scalar(sq) => sq.code_dim(),
135 Self::Rabit(rq) => rq.code_dim(),
136 }
137 }
138
139 pub fn column(&self) -> &'static str {
140 match self {
141 Self::Flat(fq) => fq.column(),
142 Self::FlatBin(fq) => fq.column(),
143 Self::Product(pq) => pq.column(),
144 Self::Scalar(sq) => sq.column(),
145 Self::Rabit(rq) => rq.column(),
146 }
147 }
148
149 pub fn metadata_key(&self) -> &'static str {
150 match self {
151 Self::Flat(_) => FlatQuantizer::metadata_key(),
152 Self::FlatBin(_) => FlatBinQuantizer::metadata_key(),
153 Self::Product(_) => ProductQuantizer::metadata_key(),
154 Self::Scalar(_) => ScalarQuantizer::metadata_key(),
155 Self::Rabit(_) => RabitQuantizer::metadata_key(),
156 }
157 }
158
159 pub fn quantization_type(&self) -> QuantizationType {
160 match self {
161 Self::Flat(_) => QuantizationType::Flat,
162 Self::FlatBin(_) => QuantizationType::FlatBin,
163 Self::Product(_) => QuantizationType::Product,
164 Self::Scalar(_) => QuantizationType::Scalar,
165 Self::Rabit(_) => QuantizationType::Rabit,
166 }
167 }
168
169 pub fn metadata(&self, args: Option<QuantizationMetadata>) -> Result<serde_json::Value> {
170 let metadata = match self {
171 Self::Flat(fq) => serde_json::to_value(fq.metadata(args))?,
172 Self::FlatBin(fq) => serde_json::to_value(fq.metadata(args))?,
173 Self::Product(pq) => serde_json::to_value(pq.metadata(args))?,
174 Self::Scalar(sq) => serde_json::to_value(sq.metadata(args))?,
175 Self::Rabit(rq) => serde_json::to_value(rq.metadata(args))?,
176 };
177 Ok(metadata)
178 }
179}
180
181impl From<ProductQuantizer> for Quantizer {
182 fn from(pq: ProductQuantizer) -> Self {
183 Self::Product(pq)
184 }
185}
186
187impl From<ScalarQuantizer> for Quantizer {
188 fn from(sq: ScalarQuantizer) -> Self {
189 Self::Scalar(sq)
190 }
191}
192
193#[derive(Debug, Clone, Default)]
194pub struct QuantizationMetadata {
195 pub codebook_position: Option<usize>,
197 pub codebook: Option<FixedSizeListArray>,
198 pub transposed: bool,
199}
200
201#[async_trait]
202pub trait QuantizerMetadata:
203 fmt::Debug + Clone + Sized + DeepSizeOf + for<'a> Deserialize<'a> + Serialize
204{
205 fn buffer_index(&self) -> Option<u32> {
207 None
208 }
209
210 fn set_buffer_index(&mut self, _: u32) {
211 }
213
214 fn parse_buffer(&mut self, _bytes: Bytes) -> Result<()> {
217 Ok(())
218 }
219
220 fn extra_metadata(&self) -> Result<Option<Bytes>> {
222 Ok(None)
223 }
224
225 async fn load(reader: &PreviousFileReader) -> Result<Self>;
226}
227
228#[async_trait::async_trait]
229pub trait QuantizerStorage: Clone + Sized + DeepSizeOf + VectorStore {
230 type Metadata: QuantizerMetadata;
231
232 fn try_from_batch(
235 batch: RecordBatch,
236 metadata: &Self::Metadata,
237 distance_type: DistanceType,
238 frag_reuse_index: Option<Arc<FragReuseIndex>>,
239 ) -> Result<Self>;
240
241 fn metadata(&self) -> &Self::Metadata;
242
243 fn remap(&self, mapping: &HashMap<u64, Option<u64>>) -> Result<Self> {
244 let batches = self
245 .to_batches()?
246 .map(|b| {
247 let mut indices = Vec::with_capacity(b.num_rows());
248 let mut new_row_ids = Vec::with_capacity(b.num_rows());
249
250 let row_ids = b.column(0).as_primitive::<UInt64Type>().values();
251 for (i, row_id) in row_ids.iter().enumerate() {
252 match mapping.get(row_id) {
253 Some(Some(new_id)) => {
254 indices.push(i as u32);
255 new_row_ids.push(*new_id);
256 }
257 Some(None) => {}
258 None => {
259 indices.push(i as u32);
260 new_row_ids.push(*row_id);
261 }
262 }
263 }
264
265 let indices = UInt32Array::from(indices);
266 let new_row_ids = UInt64Array::from(new_row_ids);
267 let b = b
268 .take(&indices)?
269 .replace_column_by_name(ROW_ID, Arc::new(new_row_ids))?;
270 Ok(b)
271 })
272 .collect::<Result<Vec<_>>>()?;
273
274 let batch = concat_batches(self.schema(), batches.iter())?;
275 Self::try_from_batch(batch, self.metadata(), self.distance_type(), None)
276 }
277
278 async fn load_partition(
279 reader: &PreviousFileReader,
280 range: std::ops::Range<usize>,
281 distance_type: DistanceType,
282 metadata: &Self::Metadata,
283 frag_reuse_index: Option<Arc<FragReuseIndex>>,
284 ) -> Result<Self>;
285}
286
287pub struct IvfQuantizationStorage<Q: Quantization> {
289 reader: PreviousFileReader,
290
291 distance_type: DistanceType,
292 quantizer: Quantizer,
293 metadata: Q::Metadata,
294
295 ivf: IvfModel,
296}
297
298impl<Q: Quantization> DeepSizeOf for IvfQuantizationStorage<Q> {
299 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
300 self.reader.deep_size_of_children(context)
301 + self.quantizer.deep_size_of_children(context)
302 + self.metadata.deep_size_of_children(context)
303 + self.ivf.deep_size_of_children(context)
304 }
305}
306
307impl<Q: Quantization> Clone for IvfQuantizationStorage<Q> {
308 fn clone(&self) -> Self {
309 Self {
310 reader: self.reader.clone(),
311 distance_type: self.distance_type,
312 quantizer: self.quantizer.clone(),
313 metadata: self.metadata.clone(),
314 ivf: self.ivf.clone(),
315 }
316 }
317}
318
319impl<Q: Quantization> IvfQuantizationStorage<Q> {
320 pub async fn open(reader: Arc<dyn Reader>) -> Result<Self> {
324 let reader = PreviousFileReader::try_new_self_described_from_reader(reader, None).await?;
325 let schema = reader.schema();
326
327 let metadata_str = schema
328 .metadata
329 .get(INDEX_METADATA_SCHEMA_KEY)
330 .ok_or(Error::index(format!(
331 "Reading quantization storage: index key {} not found",
332 INDEX_METADATA_SCHEMA_KEY
333 )))?;
334 let index_metadata: IndexMetadata = serde_json::from_str(metadata_str).map_err(|_| {
335 Error::index(format!("Failed to parse index metadata: {}", metadata_str))
336 })?;
337 let distance_type = DistanceType::try_from(index_metadata.distance_type.as_str())?;
338
339 let ivf_data = IvfModel::load(&reader).await?;
340
341 let metadata = Q::Metadata::load(&reader).await?;
342 let quantizer = Q::from_metadata(&metadata, distance_type)?;
343 Ok(Self {
344 reader,
345 distance_type,
346 quantizer,
347 metadata,
348 ivf: ivf_data,
349 })
350 }
351
352 pub fn distance_type(&self) -> DistanceType {
353 self.distance_type
354 }
355
356 pub fn quantizer(&self) -> &Quantizer {
357 &self.quantizer
358 }
359
360 pub fn metadata(&self) -> &Q::Metadata {
361 &self.metadata
362 }
363
364 pub fn num_partitions(&self) -> usize {
366 self.ivf.num_partitions()
367 }
368
369 pub async fn load_partition(&self, part_id: usize) -> Result<Q::Storage> {
376 let range = self.ivf.row_range(part_id);
377 Q::Storage::load_partition(
378 &self.reader,
379 range,
380 self.distance_type,
381 &self.metadata,
382 None,
383 )
384 .await
385 }
386}