Skip to main content

lance_index/vector/
storage.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Vector Storage, holding (quantized) vectors and providing distance calculation.
5
6use crate::vector::quantizer::QuantizerStorage;
7use arrow::compute::concat_batches;
8use arrow_array::{ArrayRef, RecordBatch};
9use arrow_schema::SchemaRef;
10use deepsize::DeepSizeOf;
11use futures::prelude::stream::TryStreamExt;
12use lance_arrow::RecordBatchExt;
13use lance_core::{Error, ROW_ID, Result};
14use lance_encoding::decoder::FilterExpression;
15use lance_file::reader::FileReader;
16use lance_io::ReadBatchParams;
17use lance_linalg::distance::DistanceType;
18use prost::Message;
19use std::{any::Any, sync::Arc};
20
21use crate::frag_reuse::FragReuseIndex;
22use crate::{
23    pb,
24    vector::{
25        ivf::storage::{IVF_METADATA_KEY, IvfModel},
26        quantizer::Quantization,
27    },
28};
29
30use super::DISTANCE_TYPE_KEY;
31use super::graph::OrderedFloat;
32use super::graph::OrderedNode;
33use super::quantizer::{Quantizer, QuantizerMetadata};
34
35/// <section class="warning">
36///  Internal API
37///
38///  API stability is not guaranteed
39/// </section>
40pub trait DistCalculator {
41    fn distance(&self, id: u32) -> f32;
42
43    // return the distances of all rows
44    // k_hint is a hint that can be used for optimization
45    fn distance_all(&self, k_hint: usize) -> Vec<f32>;
46
47    // Write the distances of all rows into caller-owned scratch buffers.
48    fn distance_all_with_scratch(
49        &self,
50        k_hint: usize,
51        dists: &mut Vec<f32>,
52        _u16_scratch: &mut Vec<u16>,
53        _u8_scratch: &mut Vec<u8>,
54    ) {
55        dists.clear();
56        dists.extend(self.distance_all(k_hint));
57    }
58
59    fn prefetch(&self, _id: u32) {}
60}
61
62pub const STORAGE_METADATA_KEY: &str = "storage_metadata";
63
64/// Vector Storage is the abstraction to store the vectors.
65///
66/// It can be in-memory or on-disk, raw vector or quantized vectors.
67///
68/// It abstracts away the logic to compute the distance between vectors.
69///
70/// TODO: should we rename this to "VectorDistance"?;
71///
72/// <section class="warning">
73///  Internal API
74///
75///  API stability is not guaranteed
76/// </section>
77pub trait VectorStore: Send + Sync + Sized + Clone {
78    type DistanceCalculator<'a>: DistCalculator
79    where
80        Self: 'a;
81
82    fn as_any(&self) -> &dyn Any;
83
84    fn schema(&self) -> &SchemaRef;
85
86    fn to_batches(&self) -> Result<impl Iterator<Item = RecordBatch> + Send>;
87
88    fn len(&self) -> usize;
89
90    /// Returns true if this graph is empty.
91    fn is_empty(&self) -> bool {
92        self.len() == 0
93    }
94
95    /// Return [DistanceType].
96    fn distance_type(&self) -> DistanceType;
97
98    /// Get the lance ROW ID from one vector.
99    fn row_id(&self, id: u32) -> u64;
100
101    fn row_ids(&self) -> impl Iterator<Item = &u64>;
102
103    /// Append Raw [RecordBatch] into the Storage.
104    /// The storage implement will perform quantization if necessary.
105    fn append_batch(&self, batch: RecordBatch, vector_column: &str) -> Result<Self>;
106
107    /// Create a [DistCalculator] to compute the distance between the query.
108    ///
109    /// Using dist calculator can be more efficient as it can pre-compute some
110    /// values.
111    fn dist_calculator(&self, query: ArrayRef, dist_q_c: f32) -> Self::DistanceCalculator<'_>;
112
113    fn dist_calculator_from_id(&self, id: u32) -> Self::DistanceCalculator<'_>;
114
115    fn dist_between(&self, u: u32, v: u32) -> f32 {
116        let dist_cal_u = self.dist_calculator_from_id(u);
117        dist_cal_u.distance(v)
118    }
119
120    fn prefers_candidate(&self, candidate: &OrderedNode, selected: &[OrderedNode]) -> bool {
121        let dist_cal_candidate = self.dist_calculator_from_id(candidate.id);
122        selected
123            .iter()
124            .all(|other| candidate.dist < OrderedFloat(dist_cal_candidate.distance(other.id)))
125    }
126}
127
128pub struct StorageBuilder<Q: Quantization> {
129    vector_column: String,
130    distance_type: DistanceType,
131    quantizer: Q,
132
133    frag_reuse_index: Option<Arc<FragReuseIndex>>,
134}
135
136impl<Q: Quantization> StorageBuilder<Q> {
137    pub fn new(
138        vector_column: String,
139        distance_type: DistanceType,
140        quantizer: Q,
141        frag_reuse_index: Option<Arc<FragReuseIndex>>,
142    ) -> Result<Self> {
143        Ok(Self {
144            vector_column,
145            distance_type,
146            quantizer,
147            frag_reuse_index,
148        })
149    }
150
151    pub fn build(&self, batches: Vec<RecordBatch>) -> Result<Q::Storage> {
152        let mut batch = concat_batches(batches[0].schema_ref(), batches.iter())?;
153
154        if batch.column_by_name(self.quantizer.column()).is_none() {
155            let vectors = batch
156                .column_by_name(&self.vector_column)
157                .ok_or(Error::index(format!(
158                    "Vector column {} not found in batch",
159                    self.vector_column
160                )))?;
161            let codes = self.quantizer.quantize(vectors)?;
162            batch = batch.drop_column(&self.vector_column)?.try_with_column(
163                arrow_schema::Field::new(self.quantizer.column(), codes.data_type().clone(), true),
164                codes,
165            )?;
166        }
167
168        debug_assert!(batch.column_by_name(ROW_ID).is_some());
169        debug_assert!(batch.column_by_name(self.quantizer.column()).is_some());
170
171        Q::Storage::try_from_batch(
172            batch,
173            &self.quantizer.metadata(None),
174            self.distance_type,
175            self.frag_reuse_index.clone(),
176        )
177    }
178}
179
180/// Loader to load partitioned PQ storage from disk.
181#[derive(Debug)]
182pub struct IvfQuantizationStorage<Q: Quantization> {
183    reader: FileReader,
184
185    distance_type: DistanceType,
186    metadata: Q::Metadata,
187
188    ivf: IvfModel,
189    frag_reuse_index: Option<Arc<FragReuseIndex>>,
190}
191
192impl<Q: Quantization> DeepSizeOf for IvfQuantizationStorage<Q> {
193    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
194        self.metadata.deep_size_of_children(context) + self.ivf.deep_size_of_children(context)
195    }
196}
197
198impl<Q: Quantization> IvfQuantizationStorage<Q> {
199    /// Open a Loader.
200    ///
201    ///
202    pub async fn try_new(
203        reader: FileReader,
204        frag_reuse_index: Option<Arc<FragReuseIndex>>,
205    ) -> Result<Self> {
206        let schema = reader.schema();
207
208        let distance_type = DistanceType::try_from(
209            schema
210                .metadata
211                .get(DISTANCE_TYPE_KEY)
212                .ok_or(Error::index(format!("{} not found", DISTANCE_TYPE_KEY)))?
213                .as_str(),
214        )?;
215
216        let ivf_pos = schema
217            .metadata
218            .get(IVF_METADATA_KEY)
219            .ok_or(Error::index(format!("{} not found", IVF_METADATA_KEY)))?
220            .parse()
221            .map_err(|e| Error::index(format!("Failed to decode IVF metadata: {}", e)))?;
222        let ivf_bytes = reader.read_global_buffer(ivf_pos).await?;
223        let ivf = IvfModel::try_from(pb::Ivf::decode(ivf_bytes)?)?;
224
225        let mut metadata: Vec<String> = serde_json::from_str(
226            schema
227                .metadata
228                .get(STORAGE_METADATA_KEY)
229                .ok_or(Error::index(format!("{} not found", STORAGE_METADATA_KEY)))?
230                .as_str(),
231        )?;
232        debug_assert_eq!(metadata.len(), 1);
233        // for now the metadata is the same for all partitions, so we just store one
234        let metadata = metadata
235            .pop()
236            .ok_or(Error::index("metadata is empty".to_string()))?;
237        let mut metadata: Q::Metadata = serde_json::from_str(&metadata)?;
238        // we store large metadata (e.g. PQ codebook) in global buffer,
239        // and the schema metadata just contains a pointer to the buffer
240        if let Some(pos) = metadata.buffer_index() {
241            let bytes = reader.read_global_buffer(pos).await?;
242            metadata.parse_buffer(bytes)?;
243        }
244
245        Ok(Self {
246            reader,
247            distance_type,
248            metadata,
249            ivf,
250            frag_reuse_index,
251        })
252    }
253
254    /// Construct from pre-parsed metadata, skipping global buffer reads.
255    /// Used when reconstructing from a disk cache.
256    pub fn from_cached(
257        reader: FileReader,
258        ivf: IvfModel,
259        metadata: Q::Metadata,
260        distance_type: DistanceType,
261        frag_reuse_index: Option<Arc<FragReuseIndex>>,
262    ) -> Self {
263        Self {
264            reader,
265            distance_type,
266            metadata,
267            ivf,
268            frag_reuse_index,
269        }
270    }
271
272    pub fn reader(&self) -> &FileReader {
273        &self.reader
274    }
275
276    pub fn ivf(&self) -> &IvfModel {
277        &self.ivf
278    }
279
280    pub fn num_rows(&self) -> u64 {
281        self.reader.num_rows()
282    }
283
284    pub fn partition_size(&self, part_id: usize) -> usize {
285        self.ivf.partition_size(part_id)
286    }
287
288    pub fn quantizer(&self) -> Result<Quantizer> {
289        let metadata = self.metadata();
290        Q::from_metadata(metadata, self.distance_type)
291    }
292
293    pub fn metadata(&self) -> &Q::Metadata {
294        &self.metadata
295    }
296
297    pub fn distance_type(&self) -> DistanceType {
298        self.distance_type
299    }
300
301    pub fn schema(&self) -> SchemaRef {
302        Arc::new(self.reader.schema().as_ref().into())
303    }
304
305    /// Get the number of partitions in the storage.
306    pub fn num_partitions(&self) -> usize {
307        self.ivf.num_partitions()
308    }
309
310    pub async fn load_partition(&self, part_id: usize) -> Result<Q::Storage> {
311        let range = self.ivf.row_range(part_id);
312        let batch = if range.is_empty() {
313            let schema = self.reader.schema();
314            let arrow_schema = arrow_schema::Schema::from(schema.as_ref());
315            RecordBatch::new_empty(Arc::new(arrow_schema))
316        } else {
317            let batches = self
318                .reader
319                .read_stream(
320                    ReadBatchParams::Range(range),
321                    u32::MAX,
322                    1,
323                    FilterExpression::no_filter(),
324                )
325                .await?
326                .try_collect::<Vec<_>>()
327                .await?;
328            let schema = Arc::new(self.reader.schema().as_ref().into());
329            concat_batches(&schema, batches.iter())?
330        };
331        Q::Storage::try_from_batch(
332            batch,
333            self.metadata(),
334            self.distance_type,
335            self.frag_reuse_index.clone(),
336        )
337    }
338}